So let’s say I want to export a whole bunch of data from a database and stream it back to a user as a CSV, totally hypothetical.
I’m taking in 40k ids, for example, and want to load those records and return the data as CSV. This is for a substantially wide data set, and a reasonably large table (millions of rows) so loading all 40k and saving as CSV take some time. I converted to loading the records in chunks, and streaming the results using the Plug.Conn.chunk and this increased my output before timeout from about 3k records to about 20k records. So far so good.
But I still eventually get an error if it takes too long:
[error] Postgrex.Protocol (#PID<0.432.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.1160.0> exited
How do I avoid that? I don’t necessarily want to set a global connection checkout time to infinite if I can avoid it. Ideally I would be able to do something that would say, “Hey, as long as we’re still doing work keep going” or “Give me a new connection for each Repo.all” so the timeout would be per query and then I could play with the chunk size to optimize overall throughput.
Thanks in advance.
Example code below:
def index(conn, %{"id" => id}) do
ids = Query.decode_multi(id)
conn
|> put_resp_content_type("text/csv")
|> put_resp_header("content-disposition", ~s[attachment; filename="feed.csv"])
|> send_stream(ids)
end
# This can be a lot of data, so we're doing a stream to a chunked response
# which requires us to interleave the loading of data and the streaming of
# the response.
defp send_stream(conn, ids) do
streams =
ids
|> Enum.sort()
|> Enum.chunk_every(250)
|> Stream.map(fn(ids) ->
Feed.Query.by_id(ids)
end)
streams
|> Stream.with_index()
|> Stream.map(fn({s, i}) ->
{Repo.all(s), i}
end)
|> Stream.flat_map(&chunk_to_csv/1)
|> Enum.reduce_while(Plug.Conn.send_chunked(conn, 200), fn chunk, conn ->
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} -> {:cont, conn}
{:error, :closed} -> {:halt, conn}
end
end)
end
defp chunk_to_csv({feeds, i}) do
# ...
end