How do I avoid DB disconnect on chunked streaming of large amount of data?

So let’s say I want to export a whole bunch of data from a database and stream it back to a user as a CSV, totally hypothetical. :slight_smile:

I’m taking in 40k ids, for example, and want to load those records and return the data as CSV. This is for a substantially wide data set, and a reasonably large table (millions of rows) so loading all 40k and saving as CSV take some time. I converted to loading the records in chunks, and streaming the results using the Plug.Conn.chunk and this increased my output before timeout from about 3k records to about 20k records. So far so good.

But I still eventually get an error if it takes too long:

[error] Postgrex.Protocol (#PID<0.432.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.1160.0> exited

How do I avoid that? I don’t necessarily want to set a global connection checkout time to infinite if I can avoid it. Ideally I would be able to do something that would say, “Hey, as long as we’re still doing work keep going” or “Give me a new connection for each Repo.all” so the timeout would be per query and then I could play with the chunk size to optimize overall throughput.

Thanks in advance.

Example code below:

  def index(conn, %{"id" => id}) do
    ids = Query.decode_multi(id)

    conn
    |> put_resp_content_type("text/csv")
    |> put_resp_header("content-disposition", ~s[attachment; filename="feed.csv"])
    |> send_stream(ids)
  end

  # This can be a lot of data, so we're doing a stream to a chunked response
  # which requires us to interleave the loading of data and the streaming of
  # the response.
  defp send_stream(conn, ids) do
    streams =
      ids
      |> Enum.sort()
      |> Enum.chunk_every(250)
      |> Stream.map(fn(ids) ->
        Feed.Query.by_id(ids)
      end)

    streams
    |> Stream.with_index()
    |> Stream.map(fn({s, i}) ->
      {Repo.all(s), i}
    end)
    |> Stream.flat_map(&chunk_to_csv/1)
    |> Enum.reduce_while(Plug.Conn.send_chunked(conn, 200), fn chunk, conn ->
      case Plug.Conn.chunk(conn, chunk) do
        {:ok, conn} -> {:cont, conn}
        {:error, :closed} -> {:halt, conn}
      end
    end)
  end
  
   defp chunk_to_csv({feeds, i}) do
     # ...
   end
2 Likes

You can pass options to Repo.all assuming that’s the one that’s timing out. Repo.all(timeout: 15000) is default.

But the timeout is not per request, it’s per query. If you’re sure a query is timing out, it’s because a single query is taking longer than 15s. Are you sure it’s not your request connection timing out? By default cowboy closes a connection after a period of inactivity, depending on whether some data was sent (you can configure this).

I’m not sure how to read that error message, but it does say that the client exited. The client in this case is the request? Again, not sure how to interpret it.

3 Likes

It is possible it’s cowboy related. The DB disconnection error happens almost exactly 60 seconds after the request starts. It’s consistently streaming data that whole time, sending chunks of data every 500ms - 2s give or take.

Definitely a cowboy timeout. Any logs you see about the database are because you have an active database connection going “hey! the process that was using me just died!”

It does seem to be connected to the Cowboy http protocol_options idle_timeout, which seems odd since it’s actively streaming a response.

The timeout doesn’t mean “quit if nothing happens for 60 seconds”, the timeout means “whatever is happening has at most 60 seconds to do it before I disconnect”.

OK. So if you wanted to send a large response through Phoenix that takes a long time to stream. What would you do? Just increase that timeout to something ridiculously large? That doesn’t seem optimal since it’s global for all requests.

Quick question, can you verify that you’re using Cowboy 2 (since that was when idle_timeout) was added?

From my reading it would seem that idle_timeout should work without customization

idle_timeout (60000)
  Time in ms with no data received before Cowboy closes the connection.

Unless it means “no data received from client” (instead of the server)

Unfortunately that’s exactly what it means. When that timeout is exceeded the connection is closed even if it’s still actively sending data.

This probably doesn’t help but I usually stream data over websocket and use paging. I’m not exactly a fan of elixir streams.