Is there a way to stream data TO the database with Ecto? I’m working on using the COPY command to load CSV data directly into Postgres.
What I’ve written so far is loosely based on Import a CSV into Postgres using Elixir, but I run into trouble when trying to write tests for it. It also just feels like it would be cleaner if it was possible via Ecto or if there is at least a way to get the pid() from Ecto’s connection pool. I’ve been scanning through documentation and trying various things. Just wondering if anyone has done this before?
Perhaps unsurprisingly, the pid from Repo.start_link is not interchangeable with the pid from Postgrex.start_link.
def load(stream) do
statement = """
COPY readings (recorded_at, element_id, value, payload_id)
FROM STDIN
WITH (FORMAT csv, HEADER false)
"""
{:ok, :ok} =
Repo.transaction(
fn ->
stream
|> Stream.chunk_every(2000, 2000, [])
|> Stream.into(Ecto.Adapters.SQL.stream(Repo, statement))
|> Stream.run()
end,
timeout: :timer.seconds(60)
)
:ok
end
end
The chunk_every may be vestigial honestly, this code is super old. But the main advantage here is that since it’s using your Ecto pool it should work perfectly fine with tests.
At the expense of parsing the CSV into cells in Elixir, you can rely on Ecto.Adapters.SQL.query/4 and use prepared statements. There is an example in the query/4 docs on how to use positional parameters.
Thanks Ben! This works perfectly and gets my test passing.
The chunk_every is still helpful. I’m loading a file with 2.5 million rows. Without the chunk every it seems to stream very small chunks and eventually timeout, but I set it to 100_000 rows, which is working well (at least locally).