Handling a very large database query using streams

Hi!

We are trying to optimize the query and data handling for a very large database query (up to 10s of millions of rows). We are using streams both at the adapter level and Stream methods.

Here is our usage

    Repo.transaction(fn ->
      Filter.construct_where_clause(filters)
      |> Sql.stream()
      |> Stream.map(&(transform_rows(&1)))
      |> Stream.transform(
        fn -> Handle.start() end,
        fn chunk, acc -> Handle.iterate_chunk(chunk, acc) end,
        fn acc -> Handle.end_all() end)
      |> Stream.run
    end, timeout: :infinity)

Where Sql.stream() does

Ecto.Adapters.SQL.stream(Repo, sql, [], max_rows: 1_000)

We’re noticing that the reducer in the Stream.transform can take a really long time (we are up to 40s) to do the initial chunk. Subsequent chunks are really fast. The start function in Stream.transform happens really fast as well, it just hangs at the reducer.

Are we missing something here? The max_rows that we are loading is only 1,000 so we think it would be fast. It seems like the data is getting fully loaded before chunking in the Stream.transform reducer.

Thank you!

Ecto.Adapters.SQL.stream is using a cursor under the hood; this sounds like a consequence of that.

1 Like

An alternative approach would be to paginate using the seek method or, if using Postgres, to use the COPY ... TO STDIN command with Ecto.Adapters.SQL.stream. Haven’t used the latter but it should work.

I would pick pagination.

Regardless of the elixir coding constructs, a database like Oracle or Postgres receives a query and will start retrieving rows. But when the query must perform a full table scan, it can take some time. Or it has an order-by clause, it will not return before it has completed the job for 100%. The database is the bottleneck, streams are doing nothing when the database is busy.

You need to show us your sql query and the indexes defined on your table, otherwise it’s hard to help.

Thanks. Yeah this clarified a lot. I wasn’t looking necessarily for a solution here but just an explanation as to what the bottleneck is.