Flow, Partitions, Departition and Friends

Hi everyone.

I have a Elixir flow question.

I’m taking a set of files and mapping the CSV in them into records that group on an id, the CSV data
contains duplication: Given M lines for a given ID , N lines will be duplication.

My record format is basically as follows:
unified(id, records, duplicates, extra)

o = Path.wildcard(wildcard)
|> Flow.from_enumerable(max_demand: 3)
|> Flow.map(&to_stream/1)
|> Flow.partition(window: Flow.Window.global, stages: 5, max_demand: 10)
|> Flow.flat_map(&to_csv/1)
|> Flow.partition( window: Flow.Window.global |> Flow.Window.trigger_every(10000, :reset), max_demand: 10, stages: 3)
|> Flow.filter(&exclude_lines_without_id/1)
|> Flow.partition( window: Flow.Window.global |> Flow.Window.trigger_every(1000, :reset), max_demand: 10, stages: 3)
|> Flow.reduce(&empty_map/0, &reduce_into_unified_records/2)

Then I departition the reduction to ensure that that any of the records generated in
each of the partitions have the duplication removed ( I move the duplicates to another part of the record for further analysis as required)

My “Done function” for Departition generates chunks which I know are going to be uniq sets

|> Flow.departition(&empty_map/0, &reduce_unified_records_from_map/2, fn(d) -> 
  Enum.map(Enum.chunk_every(d, 2000), &(Map.new(&1)))  # big big chunk of memory :frowning: 
end)

I then annotate additional data at the top level of the record that is
extra and necessary per ID (prefer to make non duplicate requests)
I need to limit the batch request for this extra data to 2000 IDs at a time.

|> Flow.map(fn(list) ->
    Enum.map(list,&merge_extra_data/1) # join
end)

I then write to Mnesia in a writer that ensures the records
associated with the unified ID are sorted.

|> Flow.each(fn(list) ->
  Enum.each(list,&storage_writer/1) #
end)
|> Flow.run

I have a decent runtime and memory foot-print over the data set - however the memory and runtime
would benefit from avoiding the departition

I suspect there is a better way to generate an additional partition of the data perhaps using a hashing function
that would have the effect of doing a departition operation without requiring the complete memory space.

Any Ideas?

This is my first time using flow if something obvious or non-obvious is escaping me please illuminate us all :slight_smile:

AHHH Water in the desert!

I can change the hash and key function for the partition to help index the duplicate together, use a join / merge operation for the extra data fields and so for storage…

Gen Stage & Flow ! Thank you José Valim.

1 Like