Flow into Stream

This is my first time using Flow, and I want to make sure I’m understanding the paradigm correctly…

My understanding is that the following code will

  1. Divvy out the work of mapping over the CSV lines to be done in parallel by Flow (GenStage under the hood)
  2. The results will be collected back into a single string (of indeterminate order).
  3. The results will be chunked from that single stream into chunks of 1,000
  4. Each upsert of [up to] 1,000 record will then run concurrently in its own process (Task)
def import(path_to_csv) do
  path_to_csv
  |> File.stream!()
  |> CSV.parse_stream()
  |> Flow.from_enumerable()
  |> Flow.map(&do_some_work_and_set_up_desired_attrs/1)
  |> Stream.uniq_by(fn %{hash: hash} -> hash end)
  |> Stream.chunk_every(1_000)
  |> concurrent_upsert()
  |> Stream.run()
end

def concurrent_upsert(stream) do
  Task.Supervisor.async_stream_nolink(BatchImporter, stream, &upsert_batch(&1))
end

def upsert_batch(attrs_batch) do
  # do upsert
end

The reason I chose Flow for the mapping was because it seemed like a natural fit for handling 10s of thousands of rows from a CSV.

The reason I did not use Flow for the upserts is that, for example, when inserting a total of 15k rows, that becomes 15 batches. And Flow doesn’t seem designed to handle small numbers of items (15 in this example). I wanted to force each upsert into its own process, which seemed to be a better fit for something like Task.Supervisor.async_stream_nolink.

Can someone confirm if this reasoning is sound?