COPY FROM STDIN through Repo

I’m found example of COPY FROM STDIN in postgrex.ex on github:

Postgrex.transaction(pid, fn(conn) ->
  query = Postgrex.prepare!(conn, "", "COPY posts FROM STDIN",[copy_data:true])
  stream = Postgrex.stream(conn, query, [])
  Enum.into(File.stream!("posts"), stream)
end)

How I can convert these example to work with through Repo?

Repo.transaction fn ->
  query = "some query"
  Ecto.Adapters.SQL.query!(Repo, query, [])
  #copy from stdin, how??
end

The only way I know is a bit ghetto on account of its direct access of the pdict, but that’s the easiest way I know. Happy to improve it if someone can suggest a better way.

  def load(stream, columns) do
    statement = """
    COPY readings (#{Enum.join(columns, ", ")})
    FROM STDIN
    WITH (FORMAT csv, HEADER false)
    """
    Repo.transaction(fn ->
      conn = get_conn(Repo)
      query = Postgrex.prepare!(conn, "", statement, [copy_data: true])

      stream
      |> Stream.chunk(2000, 2000, [])
      |> Stream.into(Postgrex.stream(conn, query, []))
      |> Stream.run
    end, timeout: 3_600_000)

    :ok
  end

  defp get_conn(repo) do
    {pool, _} = repo.__pool__
    Process.get({Ecto.Adapters.SQL, pool})
  end
1 Like

It is possible to run a COPY FROM STDIN using Ecto.Adapters.SQL.query!/4 but can’t use a collectable/stream:

Ecto.Adapters.SQL.query!(Repo, "COPY posts FROM STDIN", [data_as_(final)_parameter], [copy_data: true])

From Ecto 2.1 the above no longer works. Instead must use built in streams:

    stream = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN")
    TestRepo.transaction(fn -> Enum.into(data_enum, stream) end)
8 Likes

I’m trying to use COPY to bulk insert data (insert_all has a limit of the amount of columns you can insert), the following doesn’t work:

stream = Ecto.Adapters.SQL.stream(Repo, "COPY post(title, foobar) FROM STDIN")
Repo.transaction(fn -> Enum.into(data, stream) end)

Which results in:
#ERROR 22P04 (bad_copy_file_format): missing data for column "foobar"

Am I doing something obviously wrong?

edit:
I’m an idiot, it’s suppose to be like
"done\tbas\n"

I am running similar code and I noticed that when I set logging to :debug and not disable the logging on the stream that I do get exactly one log statement per line (edit: it seems I actually do get N + 2 lines per row) I copy into the table. Is this “normal” or may that be something that could slow down copying process (even when all logging is disabled)?

Here is the code I tested it with:

    out_stream = Ecto.Adapters.SQL.stream(repo, "copy numbers from stdin with csv")

    repo.transaction(fn ->
      repo.query!("truncate numbers")

      1..10
      |> Enum.map(fn i -> [i] end)
      |> CSV.encode()
      |> Enum.into(out_stream)
    end)

and I get this log output:

[debug] QUERY OK db=0.3ms idle=9457.8ms
begin []
[debug] QUERY OK db=0.5ms
truncate numbers []
[debug] QUERY OK db=0.1ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=40.2ms
copy numbers from stdin with csv []
[debug] QUERY OK db=4.0ms
commit []