Here is an example from our code base using Ecto:
def load(stream) do
statement = """
COPY readings (recorded_at, element_id, value, payload_id)
FROM STDIN
WITH (FORMAT csv, HEADER false)
"""
{:ok, :ok} =
Repo.transaction(
fn ->
stream
|> Stream.chunk_every(2000, 2000, [])
|> Stream.into(Ecto.Adapters.SQL.stream(Repo, statement))
|> Stream.run()
end,
timeout: :timer.seconds(60)
)
:ok
end
end
The chunk_every
may be vestigial honestly, this code is super old. But the main advantage here is that since it’s using your Ecto pool it should work perfectly fine with tests.