Is it possible to stream CSV data TO Postgres with Ecto?

Here is an example from our code base using Ecto:

  def load(stream) do
    statement = """
    COPY readings (recorded_at, element_id, value, payload_id)
    FROM STDIN
    WITH (FORMAT csv, HEADER false)
    """

    {:ok, :ok} =
      Repo.transaction(
        fn ->

          stream
          |> Stream.chunk_every(2000, 2000, [])
          |> Stream.into(Ecto.Adapters.SQL.stream(Repo, statement))
          |> Stream.run()
        end,
        timeout: :timer.seconds(60)
      )

    :ok
  end
end

The chunk_every may be vestigial honestly, this code is super old. But the main advantage here is that since it’s using your Ecto pool it should work perfectly fine with tests.

10 Likes