Hi everyone.
I have a Elixir flow question.
I’m taking a set of files and mapping the CSV in them into records that group on an id, the CSV data
contains duplication: Given M lines for a given ID , N lines will be duplication.
My record format is basically as follows:
unified(id, records, duplicates, extra)
o = Path.wildcard(wildcard)
|> Flow.from_enumerable(max_demand: 3)
|> Flow.map(&to_stream/1)
|> Flow.partition(window: Flow.Window.global, stages: 5, max_demand: 10)
|> Flow.flat_map(&to_csv/1)
|> Flow.partition( window: Flow.Window.global |> Flow.Window.trigger_every(10000, :reset), max_demand: 10, stages: 3)
|> Flow.filter(&exclude_lines_without_id/1)
|> Flow.partition( window: Flow.Window.global |> Flow.Window.trigger_every(1000, :reset), max_demand: 10, stages: 3)
|> Flow.reduce(&empty_map/0, &reduce_into_unified_records/2)
Then I departition the reduction to ensure that that any of the records generated in
each of the partitions have the duplication removed ( I move the duplicates to another part of the record for further analysis as required)
My “Done function” for Departition generates chunks which I know are going to be uniq sets
|> Flow.departition(&empty_map/0, &reduce_unified_records_from_map/2, fn(d) ->
Enum.map(Enum.chunk_every(d, 2000), &(Map.new(&1))) # big big chunk of memory :frowning:
end)
I then annotate additional data at the top level of the record that is
extra and necessary per ID (prefer to make non duplicate requests)
I need to limit the batch request for this extra data to 2000 IDs at a time.
|> Flow.map(fn(list) ->
Enum.map(list,&merge_extra_data/1) # join
end)
I then write to Mnesia in a writer that ensures the records
associated with the unified ID are sorted.
|> Flow.each(fn(list) ->
Enum.each(list,&storage_writer/1) #
end)
|> Flow.run
I have a decent runtime and memory foot-print over the data set - however the memory and runtime
would benefit from avoiding the departition
I suspect there is a better way to generate an additional partition of the data perhaps using a hashing function
that would have the effect of doing a departition operation without requiring the complete memory space.
Any Ideas?
This is my first time using flow if something obvious or non-obvious is escaping me please illuminate us all