I just started using Flow
and I noticed that it doesn’t have a chunk_by
function. I wonder why.
Specifically, I’m reading a file containing a list of line items from multiple invoices, each line item containing an invoice_id
, and I want to group all line items of an invoice and process them. All line items in the file are sorted by invoice_id
, so I can do something like:
CsvParse.parse('invoice_file.csv') # this produces a stream of maps, each one representing csv record in the file
|> Stream.chunk_by(& &1.invoice_id)
|> Stream.flat_map(&process_invoice)
|> Stream.into(File.stream!("out.csv"))
All good. Now I want to make this parallel with Flow
. I was expecting to be able to write something like:
CsvParse.parse('invoice_file.csv') # this produces a stream of maps, each one representing csv record in the file
|> Flow.from_enumerable()
|> Flow.partition(key: {:key, :invoice_id})
|> Flow.chunk_by(& &1.invoice_id)
|> Flow.flat_map(&process_invoice)
|> Stream.into(File.stream!("out.csv"))
But alas, Flow.chunk_by
doesn’t exist
Does anyone know why this is the case?