Create request bottleneck with Plug

I have kind of a weird case where, for certain types of requests, I want to process them one by one if they come from the same user account. We have an api that a mobile app uses to sync data to the server. Under certain conditions, if the server processes two api requests from the same account concurrently a client can end up with out-of-date data. We don’t have high throughput for any one user account. I’ve had a couple of attempts at a “bottleneck” plug the idea is that when a request comes in from a user account it should block until the web app has completed processing all previous requests from that user account. We have a small cluster of servers that are clustered using Erlang distribution. I’ve tried approaches using Postgres advisory locks and also Erlang’s global locks. Neither seems to have any effect. They both let multiple requests through concurrently.

...
def call(conn = %Conn{assigns: %{current_user: %User{id: user_id}}}, _opts) when is_integer(user_id) do
    Statstrack.Repo.query("SELECT pg_advisory_lock_shared(?);", [user_id])
    Plug.Conn.register_before_send(conn, fn conn ->
      Statstrack.Repo.query("SELECT pg_advisory_unlock_shared(?);", [user_id])
      conn
    end)
  end

  def call(conn, _opts) do
    conn
  end
...

And here is my attempt using Erlang’s :global:

  def call(conn = %Conn{assigns: %{current_user: %User{id: user_id}}}, _opts)
      when is_integer(user_id) do
    :global.set_lock({User, {:id, user_id}})

    Plug.Conn.register_before_send(conn, fn conn ->
      :global.del_lock({User, {:id, user_id}})
      conn
    end)
  end

  def call(conn, _opts) do
    conn
  end

Really appreciate any guidance on this one. Thanks!

Serialize the requests through a process. When one of that particular type of request comes in, if there is already a process started for that user, add the request to that user’s process’ queue. If not, create a process for that user and send the request to the process. Let the process’ mailbox handle the serialization of the incoming requests.

2 Likes

The ExHammer lib https://github.com/ExHammer/hammer with redis backend could be a possible solution. I think you may find more relevant search results looking for ‘distributed rate limiting’ instead of ‘bottleneck’

1 Like

This would work great, but OP mentioned he is running distributed erlang in which case, this technique would only be rate limiting per node without using something like https://github.com/bitwalker/swarm

Postgres advisory locks will work the way you want, but you cannot use them in shared mode, as that would mean multiple processes can get the same lock at the same time. SELECT pg_advisory_lock(?) should be what you need.

2 Likes

Thanks guys!

@jeremyjh if I can make advisory locks work that will be the simplest solution I think. I tried without shared mode and that solved my original problem. Thanks!

It turns out though there is another issue. Maybe advisory locks aren’t well suited or maybe I’m misusing. The issue is that advisory locks belong to a connection. They can only be released from the same connection they were acquired from. Furthermore they can be acquired multiple times from within the same connection. So two issues using them in the manner I’m attempting.

  1. If the second request uses the same db connection as the first then pg_advisory_lock won’t block but instead will issue a second instance of the lock.

  2. If the query to release the lock uses a different db connection to the connection used to acquire the lock it will fail because a lock can only be released from the same connection that acquired it.

Unless I’m missing something I have to conclude that advisory locks aren’t quite right for what I’m trying to accomplish.

I may have to serialize through a process as suggested by @easco. If it’s a globally registered process it should work in a distributed environment.

@sneako thanks for the suggestion re rate limiting. Most rate limiters will reject requests beyond a limit rather than blocking them, also they are more intended for limiting the volume within a time window, regardless of request processing time, rather than forcing requests to go one-by-one.

1 Like

I think you are right. I’ve only used advisory locks to coordinate between nodes, not processes sharing the same Repo and connection pool. I didn’t think of the issues you raised but I agree its not the right fit. I think global should work here, but I haven’t used it myself. Looking at its documentation, you need to supply a retries value to set_lock. Otherwise it doesn’t block and just immediately returns false if it couldn’t acquire a lock. You should check that its return value is true before continuing with your process.