Hi!
We are trying to optimize the query and data handling for a very large database query (up to 10s of millions of rows). We are using streams both at the adapter level and Stream
methods.
Here is our usage
Repo.transaction(fn ->
Filter.construct_where_clause(filters)
|> Sql.stream()
|> Stream.map(&(transform_rows(&1)))
|> Stream.transform(
fn -> Handle.start() end,
fn chunk, acc -> Handle.iterate_chunk(chunk, acc) end,
fn acc -> Handle.end_all() end)
|> Stream.run
end, timeout: :infinity)
Where Sql.stream()
does
Ecto.Adapters.SQL.stream(Repo, sql, [], max_rows: 1_000)
We’re noticing that the reducer in the Stream.transform
can take a really long time (we are up to 40s) to do the initial chunk. Subsequent chunks are really fast. The start function in Stream.transform
happens really fast as well, it just hangs at the reducer.
Are we missing something here? The max_rows that we are loading is only 1,000 so we think it would be fast. It seems like the data is getting fully loaded before chunking in the Stream.transform
reducer.
Thank you!