Mongo: too many cursors are already opened

I’m using the mongodb_driver v1.0.0 on an AWS DocumentDB. I ran into a problem
paginating over a large collection (several million rows):

{:error, %Mongo.Error{code: 2, error_labels: [], fail_command: false, host: nil, message: "Cannot open a new cursor since too many cursors are already opened", not_writable_primary_or_recovering: false, resumable: false, retryable_reads: false, retryable_writes: false}}

I guess I haven’t bumbled into this before because I haven’t had to deal with a collection of this size before. I need to grab fields from every document in a collection and write it to PostGres. Hopefully someone can point out what’s wrong with my pattern here…

My first attempt (the one that blew out the number of cursors) was pretty old-school pagination:

defmodule MongoToPostgres do
  require Logger

  @doc """
  ## Examples

      iex> MongoToPostgres.copy_chunk()
  """
  def copy_chunk(next_query \\ %{}) do
    Mongo.Repo.find(
        MyApp.MongoRepo,
        "urls",
        %{
          sort: [_id: 1],
          limit: 250
        }
        |> Map.merge(next_query)
      )
      |> case do
        %Mongo.Stream{docs: []} -> :done
        %Mongo.Stream{docs: docs} -> Logger.debug("Handling #{length(docs)} Mongo docs")
           do_stuff(docs)

          %{"_id" => last_id} = List.last(docs)

          # Loop
          copy_chunk(%{_id: %{"$gt": last_id}})
      end
  end
end

This treats the docs as a list, not as a stream.

Then I tried doing this treating the returned docs as a stream, but I couldn’t quite make sense of the Mongo documentation with the batch_size… I need to go over every element in the collection.

defmodule MongoToPostgres do
  require Logger

  @doc """
  ## Examples

      iex> MongoToPostgres.copy_chunk()
  """
  def copy_chunk(next_query \\ %{}) do
    Mongo.find(MyApp.MongoRepo, "urls", %{
        sort: [_id: 1]
      }, batch_size: 25000)
    |> case do
      %Mongo.Stream{docs: []} ->
        :done

      %Mongo.Stream{docs: docs} ->
        Logger.debug("Handling #{length(docs)} Mongo docs")

        docs
        |> Stream.chunk_every(250)
        |> Stream.each(fn chunk_o_docs ->
          do_stuff_elsewhere(chunk_o_docs)
        end)
        |> Stream.run()
    end
  end
end

Plan B is to use mongoexport to dump this to a CSV and import it that way, but I am curious how this should be structured. Thanks in advance for any pointers!

The find function returns a stream, you can use something like this:

top
|> Repo.find("urls", query, opts)
|> Stream.chunk_very(250)
|> Stream.map(fn chunk -> ... end)
|> Stream.run()

Thanks – I think there’s a typo there → Stream.chunk_very should be Stream.chunk_every

So the %Mongo.Stream{} struct itself implements Enumerable and I don’t need to peel the docs value out of it. That makes sense.

We are getting timeouts, but at least that’s a new error!

{:error, %Mongo.Error{code: nil, error_labels: nil, fail_command: nil, host: "my-host:27017", message: "my-host.com:27017 tcp recv: unknown POSIX error - :timeout", not_writable_primary_or_recovering: nil, resumable: true, retryable_reads: nil, retryable_writes: nil}}

Yes, this was a type error :slight_smile: You can specify a timeout value in the case if your query takes too long:

Repo.find(top, "urls", query, timeout: 120_000)

See also: GitHub - zookzook/elixir-mongodb-driver: MongoDB driver for Elixir

Thanks for the input! We did try adding the timeout and now the query completes… but it doesn’t seem to actually be streaming over the contents of that collection. I have to do more digging I think…