Aggregating Streams into multuple lists

Hi everyone. I have a pipeline that looks something like this:

Ecto.Repo.transaction(fn ->
  generate_ecto_stream(batch_size: 20_000)
  |> Stream.map(fn record_from_db ->
      insert_into_cache(record_from_db)
  end)
  |> Stream.run()
end)

Now this works correctly in a sense that ecto fetches 20_000 records at a time, streams those records one-by-on to the next function that inserts them into redis cache. But, I would like to insert 20_000 records to cache at once, not one-by-one. So I need some “aggregation” function between generate_ecto_stream and Strem.map() steps. That function should aggregate only 20_000 records into a list, and not all 1 million + records that will be streamed from ecto stream.

Does anyone have any idea whats the best way of doing that? Thanks in advance :slight_smile:

Have you looked at Stream — Elixir v1.13.4?

1 Like

I missed that completely. Thanks a lot

1 Like