Aggregating Streams into multuple lists

Hi everyone. I have a pipeline that looks something like this:

Ecto.Repo.transaction(fn ->
  generate_ecto_stream(batch_size: 20_000)
  |> record_from_db ->

Now this works correctly in a sense that ecto fetches 20_000 records at a time, streams those records one-by-on to the next function that inserts them into redis cache. But, I would like to insert 20_000 records to cache at once, not one-by-one. So I need some “aggregation” function between generate_ecto_stream and steps. That function should aggregate only 20_000 records into a list, and not all 1 million + records that will be streamed from ecto stream.

Does anyone have any idea whats the best way of doing that? Thanks in advance :slight_smile:

Have you looked at Stream — Elixir v1.13.4?

1 Like

I missed that completely. Thanks a lot

1 Like