Mongo execution too fast?

I have an interesting thing that I do not understand I am hoping someone can explain what I see. I will try to make simple my explanation.

  1. There is a Phoenix PubSub topic for IDs.
  2. There is a GenStage producer module that has a handle_info to listen for these messages.
  3. Next, there is a GenStage producer consumer with a handle_events function that receives the messages (the IDs) and then it checks the mongo database to see if we have this ID. If we already have a record for this ID, we filter this message out like Enum.reject
  4. Next there is another GenStage producer-consumer that will do an API lookup to external service to look up this ID. The message now holds the full response data from the API request.
  5. Finally, there is a GenStage consumer with handle_events that receives the data and saves the API request data in mongo database.

I hope I have explained this data pipeline clearly!

The interesting problem I am seeing can sometimes happen when 2 messages contain the same ID, something fast like this:

iex> (
...> Phoenix.PubSub.broadcast(:my_pubsub, "ids", "ID123")
...> Phoenix.PubSub.broadcast(:my_pubsub, "ids", "ID123")
...> )

What I expect is that the second message checks the database and it sees that there is already a record for this ID. But actually what I see is that both first and second messages check the database and both times they do not see the record! So instead of 1 API lookups, I have 2, and 2 times the record attempts to write to the database.

Why is this? Are all things happening in parallel? Is there a good solution to this problem? Thank you for explanations!

I feel foolish, but I think I can answer my own question.

The reason messages do not see a record in the database is because the API lookup is slow, so if the 2nd message arrives before the first message has completed, the database check will not work.

That is likely that to be the answer to your question. If I understand the wider problem you are receiving stuff, filtering duplicates, grabbing additional information and inserting the collated result. If you are getting duplicates at the insert end then maybe a reasonable solution would maybe be to not insert and instead use an update with upsert so that each message processed is idempotent and any duplicates have no real effect, you still have an extra API call being made so it depends on the implications of that. The MongoDB API I’m thinking is db.collection.update() — MongoDB Manual though I have no experience with it in the Elixir specific clients.

Edit: you could maybe solve the API call problem with tailable cursors to augment the additional information on inserted records?

Forgive me if I don’t understand everything in your response! This is maybe like a problem of caching (even though it uses a database) similar I think to “cache stampede” or “dog-pile”. The cache entry (the db record in my case) does not exist until after the slow API request is complete, so duplicate requests are allowed causing multiple lookups.

I can think of maybe 2 solutions:

  1. serialize requests in the pipeline so they happen 1 at a time, but this is bad solution here because the system is exist to be parallel and many things must run at same time.
  2. use a kind of “lock”. I do not know how this is called, but I am thinking of something like a lock on a file for writing so the app knows that an API request has begun and not to try the same request until complete. Locking must be fast, maybe ETS.

so the problem is because we can’t tell duplicates until the end of this pipeline – we need to tell duplicates at the start of the pipeline. I hope I have explained clearly. Thank you for all input!

No worries, they were really just a couple of thoughts that went through my mind at the time as possible options for avoiding having to use either of these two solutions. But they are not necessarily always feasible. I will try to explain a little better what I meant, though they might not necessarily be strategies that work for your use case.

So the first suggestion, doing updates with upsert turned on. Basically this means you are always issuing updates to the DB instead of inserts, if upsert is turned on with your update commands then it will insert when the record doesn’t already exist. Since your pubsub messages contain IDs, if you use those are the IDs for your database records then when you receive duplicates you don’t insert new records you just update the existing one with the information it already had. Similar to a PUT request, the first has an effect but if you send the same request multiple times the others don’t have any side effect. Since this is a strategy for gracefully handling duplicates rather than preventing them, it would be an option if the API you call will return the same information for the same request each time and the API you call doesn’t have a particularly large cost associated with it.

The second thought was tailable cursors. This is an ongoing streamed query for newly added records and could potentially be used for quite a different approach. Here instead of filtering the duplicates by doing a lookup and trying to compare them (with race conditions), you could filter the duplicates by trying to insert every time and getting DB conflicts on the IDs when they already exist. So you could insert a placeholder record for that ID as soon as your get the PubSub message which acts a bit like a lock because now you can’t insert another. I was originally thinking that you can then use the stream of newly added records from tailing the cursor to drive your API requests knowing that it will only have one record per ID and therefore only one entry in the stream.

However as I typed this whole thing out it seemed like in your situation you might not need to bother with the cursor at all. You could instead:

  1. receive a message with an ID
  2. insert a placeholder record using that ID to act as a lock
  3. if you get a DB conflict back bail out else
  4. continue the rest of your process to make your API call
  5. update the placeholder record with the full information when your API call completes.

If you are not keen on have a partial record that is updated later, another option would be to have a collection of lock records. Insert one using the message ID as above so you get conflicts to detect duplicates, use a time to live on the lock record so that it gets cleaned up after a safe time. The key part is only doing a write and detecting the error on conflict to avoid read/write timing issues, if you get a conflict you know you have a duplicate.