项目作者: treebee

项目描述 :
Experimental Elixir bindings for Apache Arrow including Parquet and DataFusion
高级语言: Rust
项目地址: git://github.com/treebee/elixir-arrow.git
创建时间: 2021-03-20T12:43:09Z
项目社区:https://github.com/treebee/elixir-arrow

开源协议:Apache License 2.0

下载


ArrowElixir

Elixir bindings for Apache Arrow. Uses the
Rust implementation via rustler.

DISCLAIMER:

  • It’s NOT an offical library of the Apache Arrow project
  • It’s an early WIP and mostly experimental, far from being production ready.
  • I’m quite new to Elixir and I’ve basically no experience with Rust, so cut me some slack, please :D.
  • It’s currently a 3-in-1 library, Arrow, Parquet and DataFusion, but in
    the future it would be nice to have those as separated libs

Installation

Make sure to have Rust installed.

  1. defp deps do
  2. [
  3. {:arrow, git: "https://github.com/treebee/elixir-arrow.git" }
  4. ]
  5. end

Arrow

For Arrow there’s already some basic support to create Array’s and RecordBatches:

  1. arr = Arrow.array([1, 3, 4, nil])
  2. #Arrow.Int64Array
  3. [1, 3, 4, nil]

RecordBatches are datasets of a number of contiguous arrays
with the same length including a schema:

  1. alias Arrow.RecordBatch
  2. record_batch = RecordBatch.new(%{cola: [3, 4, nil, 5], colb: [4.5, 3.5, nil, nil], colc: ["a", "b", "c", "d"]})
  3. #Arrow.Recordbatch
  4. cola: int64
  5. colb: float32
  6. colc: string
  7. RecordBatch.to_map(record_batch)
  8. %{
  9. "cola" => [3, 4, nil, 5],
  10. "colb" => [4.5, 3.5, nil, nil],
  11. "colc" => ["a", "b", "c", "d"]
  12. }
  13. RecordBatch.schema(record_batch)
  14. %Arrow.Schema{
  15. fields: [
  16. %Arrow.Field{
  17. data_type: {:s, 64},
  18. dict_id: 0,
  19. dict_is_ordered: false,
  20. metadata: nil,
  21. name: "cola",
  22. nullable: true
  23. },
  24. %Arrow.Field{
  25. data_type: {:f, 32},
  26. dict_id: 0,
  27. dict_is_ordered: false,
  28. metadata: nil,
  29. name: "colb",
  30. nullable: true
  31. },
  32. %Arrow.Field{
  33. data_type: {:utf8, 32},
  34. dict_id: 0,
  35. dict_is_ordered: false,
  36. metadata: nil,
  37. name: "colc",
  38. nullable: true
  39. }
  40. ],
  41. metadata: []
  42. }

Parquet

In addition to Arrow, the library comes with support for
reading and writing Parquet files.

  1. record_batch = RecordBatch.new(%{
  2. a: [1.0, 2.0, 3.0, nil, 5.0],
  3. b: [1.2, 5.5, 4.5, nil, nil],
  4. c: [0, 0, 1, 0, 1]
  5. })
  6. Arrow.Parquet.write_record_batches("/tmp/testdata.parquet")
  7. batches =
  8. Arrow.Parquet.File.open("/tmp/testdata.parquet")
  9. |> Arrow.Parquet.File.iter_batches()
  10. |> Enum.to_list()
  11. [#Arrow.RecordBatch
  12. a: Float64
  13. b: Float64
  14. c: Int64]
  15. batches =
  16. |> Enum.map(&RecordBatch.to_map/1)
  17. [
  18. %{
  19. "a" => [1.0, 2.0, 3.0, nil, 5.0],
  20. "b" => [1.2, 5.5, 4.5, nil, nil],
  21. "c" => [0, 0, 1, 0, 1]
  22. }
  23. ]

DataFusion

Using DataFusion for reading Parquet files (and also CSV) makes even more fun:

Querying Parquet Files With SQL

  1. alias Arrow.DataFusion.ExecutionContext
  2. batches =
  3. ExecutionContext.new()
  4. |> ExecutionContext.register_parquet("example", "/tmp/testdata.parquet")
  5. |> ExecutionContext.sql(ctx, "SELECT a, b, c FROM example")
  6. [#Arrow.RecordBatch
  7. a: Float64
  8. b: Float64
  9. c: Int64]
  10. batches |> hd() |> RecordBatch.to_map()
  11. %{
  12. "a" => [1.0, 2.0, 3.0, nil, 5.0],
  13. "b" => [1.2, 5.5, 4.5, nil, nil],
  14. "c" => [0, 0, 1, 0, 1]
  15. }

More SQL Features - GROUP BY

  1. batches =
  2. ExecutionContext.new()
  3. |> ExecutionContext.register_parquet("example", "/tmp/testdata.parquet")
  4. |> ExecutionContext.sql(ctx, "SELECT SUM(a) as sum_a, SUM(b) as sum_b, c FROM example GROUP BY")
  5. |> Enum.map(&RecordBatch.to_map/1)
  6. [%{"c" => [1, 0], "sum_a" => [8.0, 3.0], "sum_b" => [4.5, 6.7]}]

Mini Data Pipeline

Let’s load a CSV with a GROUP BY and save back the result as Parquet:

  1. ExecutionContext.new()
  2. |> ExecutionContext.register_csv("example", "/tmp/testdata.csv")
  3. |> ExecutionContext.sql(ctx, "SELECT SUM(a) as sum_a, SUM(b) as sum_b, c FROM example GROUP BY")
  4. |> Arrow.Parquet.write_record_batches("/tmp/testdata-result.parquet")

Next Steps?

What be nice to find some people interested in contributing. Really helpful
would be people with Rust experience, but everyone is welcome of course :)

Even given that the current Rust nif is not too bad, there are much more
things left to do than what is already implemented.
For example

  • support for more datatypes, for example dates and datetimes
  • more array operations
  • reading and writing Parquet files with different options (compression, row groups, …)
  • reading/writing multiple files, partitioning, …
  • a “Table” representation ?

Another thing, as already mentioned: Splitting Arrow, Parquet and DataFusion
into 3 different libs. (I made a short attempt to do this but ran into linker issues)

I also haven’t thought too much about providing a nice API, the current
goal was rather to make some first examples work (also, no error handling yet).
But it will probably make sense to think more about how the lib integrates
nicely with the Elixir ecosystem.

For DataFusion maybe some kind of Ecto adapter?