Hello,
I would like to get help in implementing logic to partition/grouping records into distinct sets(ETS) by partition key and then flush them to files with Elixir Flow API.
Some background:
I’ve done first part of prototype which is well described here: elixir-language-performance-tuning-for-1-quadrillion-records-per-month
The result of that is set of files which have mixed users across all files. I need now to partition users into groups by their id and then emit each group into separate file.
I can visualize it like:
Input to the Flow 2 CSV file
File-1
12,someUser1
5,someUser2
4,someUser3
32,someUser4
24,someUser5
39,someUser6
File-2
45,someUser7
7,someUser8
40,someUser9
30,someUser10
17,someUser11
Output from the Flow 5 CSV files
File-part-00
5,someUser2
4,someUser3
7,someUser8
File-part-10
12,someUser1
17,someUser11
File-part-20
24,someUser5
File-part-30
32,someUser4
39,someUser6
30,someUser10
File-part-40
45,someUser7
40,someUser9
The partition key in this case is div(first_csv_column, 10)
I was able to do following basic prototyping:
def part do
Flow.from_enumerable(0..69)
|> Flow.partition( hash: fn e -> {e, div(e, 10)} end, stages: 7)
|> Flow.reduce(fn -> [] end, fn e, acc -> [e | acc] end)
|> Flow.emit(:state)
|> Flow.partition(stages: 7)
|> Flow.map(&write_to_file(&1))
|> Enum.to_list
end
def write_to_file(e) do
partition_key = div(e |> hd, 10) |> Integer.to_string
path = Path.join(System.user_home(), Path.join("temp", "account-#{partition_key}"))
File.write!(path, e |> Enum.map(&(Integer.to_string(&1) <> "\n")) |> Enum.to_list, [:utf8])
"#{path} Done"
end
but there are some blokes I can not resolve to get real implementation.
Questions:
- For accumulating I need to use ETS because my input data set is ~150 millions records in few tens of files. I don’t know how to create separate ETS per partition(stage) in the reduce function, also how to get a hash key per stage in reduce function to name ETS?
- How to get ETS in map function when I need to flush it to file? Also how to get hash key for that stage to name a destination file. Currently my workaround is to get 1st element from the list and recompute it.
- side question: when I do :ets.tab2list() this return the list and occupy memory. Is it possible to stream :ets.tab2list() directly to file?
Thank you for your time and all help is very appreciated.