Help required - avoid sending duplicate messages

I have a strange situation for which I need your thoughts.

My application sends messages to users on request from a different server. For some reason which we’re yet to identify, the server would bombard my application to send same message to users instead of a single request.

I wanted to reduce the problem of annoying customers in case this unfortunate scenario happens since the other team hasn’t figured out why that happens yet.

This is what I came up with:

# Let's check whether message with the same fingerprint exits within the last 5 minutes.
    # If so, send incident reminder to team and log
    msg = :base64.encode("#{message}:#{to}:#{provider}:#{app}") |> MyApp.Bucket.find_message

     cond do
      #If we don't have the message fingerprint or similar message send within last 5 minutes
      msg == nil ->
        #Add new message in the bucket
        # New conversation, save particulars in the database
        MyApp.Bucket.create_message(%MyApp.Bucket{
          id: :base64.encode("#{message}:#{to}:#{provider}:#{app}"),
          provider: provider,
          app: @utilities.get_app_value(app, "name"),
          recipients: to,
          attempts: 1,
          message: message
        })
       
        #Send message
        process_message_sending(message, to, provider, app)

      true ->
        case to_string(msg.message) == message and DateTime.diff(DateTime.from_naive!( NaiveDateTime.utc_now(),"Etc/UTC"), DateTime.from_naive!(msg.updated_at,"Etc/UTC"), :second) < 300 do
          true ->
              "Multiple request to send received for message id #{msg.id}  with #{msg.attempts+1} attempts so far"  |> IO.inspect
          _ ->
            process_message_sending(message, to, provider, app)
        end
        msg
          |> Map.replace!(:attempts, msg.attempts+1)
          |> MyApp.Bucket.update_message()
    end

The idea is to save every outgoing message in a bucket and cross-check with next request to send out message to the same recipient. If similar message has been sent within the last five minutes, it is meant to ignore.

The challenge with this approach is that, if I perform a load test, I’m only around 70% successful at stopping the messages. It looks like there’s a delay in persistence into the Mnesia Bucket wrapped by memento is a tad slow.

Any idea how this can be addressed?

I’ve also thought about rate limiting through nginx but I’ve never done it before and not sure how to successfully use it in conjunction with my code

How many messages are we talking, and are you on a single node or distributed?

If it’s a single node and not toooo much data, I’d probably look at using cachex with the cache expiry used to control when you let the next message through. See https://hexdocs.pm/cachex/Cachex.html#expire/4

You can update cache values without messing with the expiry if you need to count attempts: https://hexdocs.pm/cachex/Cachex.html#get_and_update/4

Cachex does work on distributed systems too apparently, but I don’t know how.

Are there any particular reasons you are using Mnesia / Memento?

@hakarabakara you could use a debouncer for this as well. E.g. if you want this event only to sent max. once per five minutes:

    # Creating the message key to debounce
    key = {message, to, provider, app}

    #Send message
    Debouncer.immediate2(key, fn -> 
      process_message_sending(message, to, provider, app)
    end, 5*60_000)

This does not help you logging an error message, but it will mute any duplicated message within 5 minutes. There are different debouncer behaviours. immediate2() will not trigger a second message within the time limit. So if there is a message after 4 minutes it will not be executed at all, but another new message after 6 minutes would be delivered again.

Memory usage. Thought to highlight this. The key is the full message term {message, to, provider, app} this will work fine since the debouncer is using an ETS table internally but if your message is a very big value it might consume a lot of memory, you could limit this with using a hash of the message itself:

key = {:erlang.phash2(message), to, provider, app} or
key = {:crypto.hash(:sha256, message), to, provider, app} if message is a binary

To start using the debouncer just add it to your deps and your extra_applications like this:

  def deps do
    [
      {:debouncer, "~> 0.1"}
    ]
  end

  # and the app
  def application do
    [
      mod: {Your.Application, []},
      extra_applications: [:debouncer]
    ]
  end
2 Likes

My application is a bot so use mnesia to track Id’s of conversation especially if the app is restarted after an update. We used in-memory ETS storage for this but we had to look for something that persists to disk.

It also runs on a single node at the moment.

We’re talking about 100s messages depending how much was available on the account. Let me read about cachex

1 Like

Thank you for this, let me review also.

1 Like

Maybe you could filter out messages on the client side, that would prevent you for adding complicated code on the backend for what is an abnormal behaviour that should be corrected soon. On the client side you only have to do with one client, so it is easier, as well as caching. But obviously this would not save bandwidth.

It appears to be an infrastructural challenged based on autoscaling and vise versa. While this happens, we came to decision that we try curtail it from where we have a bit of control while the other side is taken care of.

Thank you @mindok and @dominicletz for your input. I ended up using Cachex because of the ability to slide the expiry which Debouncer didn’t seem to have even though it looked like a nifty solution nonetheless.

I also took your input on hashing the key @dominicletz.

2 Likes