Why doesn't Flow have a chunk_by function?

I just started using Flow and I noticed that it doesn’t have a chunk_by function. I wonder why.

Specifically, I’m reading a file containing a list of line items from multiple invoices, each line item containing an invoice_id, and I want to group all line items of an invoice and process them. All line items in the file are sorted by invoice_id, so I can do something like:

CsvParse.parse('invoice_file.csv') # this produces a stream of maps, each one representing csv record in the file
|> Stream.chunk_by(& &1.invoice_id)
|> Stream.flat_map(&process_invoice) 
|> Stream.into(File.stream!("out.csv"))

All good. Now I want to make this parallel with Flow. I was expecting to be able to write something like:

CsvParse.parse('invoice_file.csv') # this produces a stream of maps, each one representing csv record in the file
|> Flow.from_enumerable()
|> Flow.partition(key: {:key, :invoice_id})
|> Flow.chunk_by(& &1.invoice_id)
|> Flow.flat_map(&process_invoice) 
|> Stream.into(File.stream!("out.csv"))

But alas, Flow.chunk_by doesn’t exist :crazy_face:

Does anyone know why this is the case?