The above code works fine until I run the test, which gives me error:
(EXIT from #PID<0.544.0>) an exception was raised:
** (DBConnection.ConnectionError) connection not available and request was dropped from queue after 939ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information
I’m using Phoenix v1.4 and Ecto v3.1, database is Postgres.
Any help?
What exactly does using Task.async_stream in a repo transaction achieve? I think Repo.transaction checks out a single connecting and all messages are sent over a over it (i.e. “single thread”) anyway.
That is, it’s not obvious to me what your approach has over
Well, I actually wrote code similar as yours in my first implementation(let’s call it the “single thread” version):
Repo.transaction(fn ->
stream
|> Stream.chunk_every(5000)
|> Enum.reduce(0, fn batch, total ->
{n, _} = Repo.insert_all(Assignment, batch)
total + n
end)
end)
I learned the “parallel” version(code I paste in my original topic) from the book Programming Ecto, Chapter 17, Section “Fetching Large Datasets with Streams”. Here’s the example in the book:
stream =
Artist
|> Repo.stream()
|> Task.async_stream(fn artist ->
save_artist_record(artist)
end)
Repo.transaction(fn ->
Stream.run(stream)
end)
I benchmarked these two versions by inserting large amount of data over 20 times. And the “parallel” version is nearly 40% faster and uses all of my CPU cores, while the “single thread” version uses only one CPU core at a time.
Yeah, the stream was build from Repo.stream. Guess I didn’t give enough details.
The code “stream = build_attrs_stream()” is doing something like:
Hey @ug0 this is because of how the ecto sandbox works during tests. Each test process is provided its own database connection. The Task.async_stream call spawns new processes, and ecto doesn’t know which connection to use. You can solve this by giving Ecto a little more information: