I’m found example of COPY FROM STDIN in postgrex.ex on github:
Postgrex.transaction(pid, fn(conn) ->
query = Postgrex.prepare!(conn, "", "COPY posts FROM STDIN",[copy_data:true])
stream = Postgrex.stream(conn, query, [])
Enum.into(File.stream!("posts"), stream)
end)
How I can convert these example to work with through Repo?
Repo.transaction fn ->
query = "some query"
Ecto.Adapters.SQL.query!(Repo, query, [])
#copy from stdin, how??
end
The only way I know is a bit ghetto on account of its direct access of the pdict, but that’s the easiest way I know. Happy to improve it if someone can suggest a better way.
def load(stream, columns) do
statement = """
COPY readings (#{Enum.join(columns, ", ")})
FROM STDIN
WITH (FORMAT csv, HEADER false)
"""
Repo.transaction(fn ->
conn = get_conn(Repo)
query = Postgrex.prepare!(conn, "", statement, [copy_data: true])
stream
|> Stream.chunk(2000, 2000, [])
|> Stream.into(Postgrex.stream(conn, query, []))
|> Stream.run
end, timeout: 3_600_000)
:ok
end
defp get_conn(repo) do
{pool, _} = repo.__pool__
Process.get({Ecto.Adapters.SQL, pool})
end
1 Like
It is possible to run a COPY FROM STDIN
using Ecto.Adapters.SQL.query!/4
but can’t use a collectable/stream:
Ecto.Adapters.SQL.query!(Repo, "COPY posts FROM STDIN", [data_as_(final)_parameter], [copy_data: true])
From Ecto 2.1 the above no longer works. Instead must use built in streams:
stream = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN")
TestRepo.transaction(fn -> Enum.into(data_enum, stream) end)
8 Likes
I’m trying to use COPY to bulk insert data (insert_all has a limit of the amount of columns you can insert), the following doesn’t work:
stream = Ecto.Adapters.SQL.stream(Repo, "COPY post(title, foobar) FROM STDIN")
Repo.transaction(fn -> Enum.into(data, stream) end)
Which results in:
#ERROR 22P04 (bad_copy_file_format): missing data for column "foobar"
Am I doing something obviously wrong?
edit:
I’m an idiot, it’s suppose to be like
"done\tbas\n"
I am running similar code and I noticed that when I set logging to :debug
and not disable the logging on the stream that I do get exactly one log statement per line (edit: it seems I actually do get N + 2 lines per row) I copy into the table. Is this “normal” or may that be something that could slow down copying process (even when all logging is disabled)?
Here is the code I tested it with:
out_stream = Ecto.Adapters.SQL.stream(repo, "copy numbers from stdin with csv")
repo.transaction(fn ->
repo.query!("truncate numbers")
1..10
|> Enum.map(fn i -> [i] end)
|> CSV.encode()
|> Enum.into(out_stream)
end)
and I get this log output:
[debug] QUERY OK db=0.3ms idle=9457.8ms
begin []
[debug] QUERY OK db=0.5ms
truncate numbers []
[debug] QUERY OK db=0.1ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=0.0ms
copy numbers from stdin with csv []
[debug] QUERY OK db=40.2ms
copy numbers from stdin with csv []
[debug] QUERY OK db=4.0ms
commit []