This is my first time using Flow, and I want to make sure I’m understanding the paradigm correctly…
My understanding is that the following code will
- Divvy out the work of mapping over the CSV lines to be done in parallel by Flow (GenStage under the hood)
- The results will be collected back into a single string (of indeterminate order).
- The results will be chunked from that single stream into chunks of 1,000
- Each upsert of [up to] 1,000 record will then run concurrently in its own process (Task)
def import(path_to_csv) do
path_to_csv
|> File.stream!()
|> CSV.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(&do_some_work_and_set_up_desired_attrs/1)
|> Stream.uniq_by(fn %{hash: hash} -> hash end)
|> Stream.chunk_every(1_000)
|> concurrent_upsert()
|> Stream.run()
end
def concurrent_upsert(stream) do
Task.Supervisor.async_stream_nolink(BatchImporter, stream, &upsert_batch(&1))
end
def upsert_batch(attrs_batch) do
# do upsert
end
The reason I chose Flow for the mapping was because it seemed like a natural fit for handling 10s of thousands of rows from a CSV.
The reason I did not use Flow for the upserts is that, for example, when inserting a total of 15k rows, that becomes 15 batches. And Flow doesn’t seem designed to handle small numbers of items (15 in this example). I wanted to force each upsert into its own process, which seemed to be a better fit for something like Task.Supervisor.async_stream_nolink
.
Can someone confirm if this reasoning is sound?