I’m using the mongodb_driver
v1.0.0 on an AWS DocumentDB. I ran into a problem
paginating over a large collection (several million rows):
{:error, %Mongo.Error{code: 2, error_labels: [], fail_command: false, host: nil, message: "Cannot open a new cursor since too many cursors are already opened", not_writable_primary_or_recovering: false, resumable: false, retryable_reads: false, retryable_writes: false}}
I guess I haven’t bumbled into this before because I haven’t had to deal with a collection of this size before. I need to grab fields from every document in a collection and write it to PostGres. Hopefully someone can point out what’s wrong with my pattern here…
My first attempt (the one that blew out the number of cursors) was pretty old-school pagination:
defmodule MongoToPostgres do
require Logger
@doc """
## Examples
iex> MongoToPostgres.copy_chunk()
"""
def copy_chunk(next_query \\ %{}) do
Mongo.Repo.find(
MyApp.MongoRepo,
"urls",
%{
sort: [_id: 1],
limit: 250
}
|> Map.merge(next_query)
)
|> case do
%Mongo.Stream{docs: []} -> :done
%Mongo.Stream{docs: docs} -> Logger.debug("Handling #{length(docs)} Mongo docs")
do_stuff(docs)
%{"_id" => last_id} = List.last(docs)
# Loop
copy_chunk(%{_id: %{"$gt": last_id}})
end
end
end
This treats the docs
as a list, not as a stream.
Then I tried doing this treating the returned docs as a stream, but I couldn’t quite make sense of the Mongo documentation with the batch_size
… I need to go over every element in the collection.
defmodule MongoToPostgres do
require Logger
@doc """
## Examples
iex> MongoToPostgres.copy_chunk()
"""
def copy_chunk(next_query \\ %{}) do
Mongo.find(MyApp.MongoRepo, "urls", %{
sort: [_id: 1]
}, batch_size: 25000)
|> case do
%Mongo.Stream{docs: []} ->
:done
%Mongo.Stream{docs: docs} ->
Logger.debug("Handling #{length(docs)} Mongo docs")
docs
|> Stream.chunk_every(250)
|> Stream.each(fn chunk_o_docs ->
do_stuff_elsewhere(chunk_o_docs)
end)
|> Stream.run()
end
end
end
Plan B is to use mongoexport
to dump this to a CSV and import it that way, but I am curious how this should be structured. Thanks in advance for any pointers!