Partition by hash key using Flow API

Hello,

I would like to get help in implementing logic to partition/grouping records into distinct sets(ETS) by partition key and then flush them to files with Elixir Flow API.

Some background:

I’ve done first part of prototype which is well described here: elixir-language-performance-tuning-for-1-quadrillion-records-per-month

The result of that is set of files which have mixed users across all files. I need now to partition users into groups by their id and then emit each group into separate file.

I can visualize it like:

Input to the Flow 2 CSV file

File-1
12,someUser1
5,someUser2
4,someUser3
32,someUser4
24,someUser5
39,someUser6

File-2
45,someUser7
7,someUser8
40,someUser9
30,someUser10
17,someUser11

Output from the Flow 5 CSV files

File-part-00
5,someUser2
4,someUser3
7,someUser8

File-part-10
12,someUser1
17,someUser11

File-part-20
24,someUser5

File-part-30
32,someUser4
39,someUser6
30,someUser10

File-part-40
45,someUser7
40,someUser9

The partition key in this case is div(first_csv_column, 10)

I was able to do following basic prototyping:

def part do

    Flow.from_enumerable(0..69)
    |> Flow.partition( hash: fn e -> {e, div(e, 10)} end, stages: 7)
    |> Flow.reduce(fn -> [] end, fn e, acc -> [e | acc] end)
    |> Flow.emit(:state)
    |> Flow.partition(stages: 7)
    |> Flow.map(&write_to_file(&1))
    |> Enum.to_list

  end


def write_to_file(e) do

    partition_key = div(e |> hd, 10) |> Integer.to_string
    path = Path.join(System.user_home(), Path.join("temp", "account-#{partition_key}"))
    File.write!(path, e |> Enum.map(&(Integer.to_string(&1) <> "\n")) |> Enum.to_list, [:utf8])

    "#{path} Done"
 end

but there are some blokes I can not resolve to get real implementation.

Questions:

  1. For accumulating I need to use ETS because my input data set is ~150 millions records in few tens of files. I don’t know how to create separate ETS per partition(stage) in the reduce function, also how to get a hash key per stage in reduce function to name ETS?
  2. How to get ETS in map function when I need to flush it to file? Also how to get hash key for that stage to name a destination file. Currently my workaround is to get 1st element from the list and recompute it.
  3. side question: when I do :ets.tab2list() this return the list and occupy memory. Is it possible to stream :ets.tab2list() directly to file?

Thank you for your time and all help is very appreciated.

  1. Not sure what the best approach is, but it seems you can use the :key option for partition to tell Flow how to partition the data.
  2. No idea of a better solution
  3. There’s tab2file to write the table directly to a file

Note, when you just want some side-effect, use Flow.each instead of Flow.map. Use Flow.run at the end, instead of Enum.to_list, to make it run.