Hi everyone. I have a pipeline that looks something like this:
Ecto.Repo.transaction(fn ->
generate_ecto_stream(batch_size: 20_000)
|> Stream.map(fn record_from_db ->
insert_into_cache(record_from_db)
end)
|> Stream.run()
end)
Now this works correctly in a sense that ecto fetches 20_000 records at a time, streams those records one-by-on to the next function that inserts them into redis cache. But, I would like to insert 20_000 records to cache at once, not one-by-one. So I need some “aggregation” function between generate_ecto_stream
and Strem.map()
steps. That function should aggregate only 20_000 records into a list, and not all 1 million + records that will be streamed from ecto stream
.
Does anyone have any idea whats the best way of doing that? Thanks in advance