GenServer blocking vs callbacks

On our server, we have some remote resources that we want to fetch and periodically update in the background. To update these resources we need to make http requests to third-party services that have rate limits. This means we want to lock down the number of concurrent requests and also pause requests when the third party sends a 429 response. We’ve had a few thoughts about how to do this, but so far each has its pitfalls.

On a high level, the layout of our system looks like this…

  • TheThing - stores info in the state about the resource that we can save to the database etc
  • NetworkResource - a single-threaded http request queue that when presented with a 429 waits and retries the request

When the TheThing decides to update, it needs to make multiple calls to NetworkResource to re-fetch and save the result back into the database. Depending on the local state and remote state, not all network calls may need to be fired, so for example if the timestamp of the first remote resource is the same as the local one, subsequent calls may not need to be made.

We’ve played around with a couple of different methods, but none feel ideal and it feels like we’re swimming against the flow a little bit. Some of it might be down to our C++ & JavaScript background where “async good, sync bad”. I’ve tried to cut the code examples down to the bare bones just so you get the gist…

Method 1: Use callbacks

In the first method, we’ve been using multiple GenServers and writing queues to manage the amount of requests. When we make a request to the NetworkResource we pass a function definition to callback on completion. This has the advantage that none of the processes become locked, but makes the code quite complex and spaghetti-ish, especially as it grows and you have multiple resource calls and callbacks.

defmodule Test.TheThing do
  use GenServer

  def update(id) do
    GenServer.call(__MODULE__, {:update})
  end

  def callback_fetch_resource(response) do
    GenServer.call(__MODULE__, {:callback_fetch_resource, response})
  end

  def handle_call({:update}, from, state) do
    next_state = %{ action: "wait_fetch_resource" }
    Test.NetworkResource.fetch(&callback_fetch_resource/1)
    {:reply, :ok, next_state}
  end

  def handle_call({:callback_fetch_resource, response}, from, state) do
    # Do some stuff, fire off the next request using the same callback pattern
    # ...
    next_state = %{ action: "wait_fetch_other_resource" }
    {:reply, :ok, next_state}
  end
end

defmodule Test.NetworkResource do
  def init(_) do
    Process.send_after(self(), :do_next, 1000)
    {:ok, %{ requests: [] }}
  end

  def fetch(callback) do
    GenServer.call(__MODULE__, {:fetch, callback})
  end

  def handle_call({:fetch, callback}, from, state) do
    next_state = %{ requests: state.requests ++ [:fetch, callback]}
    {:reply, :ok, next_state}
  end

  def handle_info({:do_next}, state) do
    # Check if anything is running, if not create a new `Task` to run the actual HTTP request
    # and on completion run callback.(response) to reply to the calling GenServer
    Process.send_after(self(), :do_next, 1000)
    {:noreply, state}
  end
end

Method 2: Have GenServers block

In the second method, we’ve tried to make more use of the Task module and have the NetworkResource GenServer block when it’s busy. When we have multiple requests that we want to make all at once, we can also use Task.async_stream to send off a batch and wait for the reply. This gives the advantage of being able to write more linear-looking code, but then the NetworkResource GenServer becomes unresponsive while it’s waiting for network stuff to happen. Depending on how busy the network resource is, we may need a large timeout when making the call.

TheThing GenServer now just manages the local state and database storage and we’ve introduced a TheThingUpdater which manages fetching the resources and providing updates back to multiple instances of TheThing

defmodule Test.TheThing do
  use GenServer
end

defmodule Test.TheThingUpdater do
  use GenServer

  def update(id) do
    GenServer.call(__MODULE__, {:update})
  end

  def handle_call({:update}, from, state) do
    task = Task.async(fn ->
      response = Test.NetworkResource.fetch()

      # Do some stuff, fire off the next request
      # ... response2 = Test.NetworkResource.fetch2()
      # ... response3 = Test.NetworkResource.fetch3()

      {:ok, :ok}
    end)

    next_state = %{ running: state.running ++ [task.ref] }
    {:reply, :ok, next_state}
  end

  def handle_info(msg, state) do
    next_state = case msg do
      {sender, {:ok, value}} ->
        next_running = Enum.filter(state.running, fn task -> task != sender end)
        %{state | running: next_running}
      _ -> state
    end

    {:noreply, next_state}
  end
end

defmodule Test.NetworkResource do
  def fetch(callback) do
    GenServer.call(__MODULE__, {:fetch}, :infinity) # Replace infinity with something more reasonable like an hour
  end

  def handle_call({:fetch, callback}, from, state) do
    # Make the http request using HTTPoison etc and handle 429 errors
    # ...

    {:reply, http_response_body, state}
  end
end

Obviously, our actual use case is more complex than this example, it has multiple requests and a couple of different rate-limited resources. We’re just after some guidance on some of the recommended patterns for doing stuff like this and some pitfalls that we might fall into before ploughing more effort into what we’re doing. Thanks!

Have you checked this article?

It’s basically about spawning a background Task to eventually reply to the GenServer’s caller. It uses :noreply as a return value in handle_call callbacks as a mechanism to not block the message queue of a GenServer.

3 Likes

Yo do not need a GenServer for that I guess. When the thing is updated, you may start a task under a task supervisor, who will do all calls in order, blocking for the reply, retrying on errors, and sleeping for a bit on 429s, etc. This will be simple to write and concurrent.

Now if the thing is updated again while tkis task is running you may want to kill it. Or you can use a queue to let it finish before starting anew.

Thanks @dimitarvp, that article has been a bit of an epiphany type moment :slight_smile:

Cool. Hope you managed to make this work for you. It’s not difficult, often times it’s about actually knowing that such mechanisms exist.

Tell me about it! We’d started to write a bunch of code to make this work in what felt like a really convoluted way, but this makes everything so much easier and simplifies everything a million percent! Thanks again

No worries, we’re here to help. Modern programming does not have a problem with a lack of tools; it has a problem with the discoverability of the tools.

Word of caution: the article does not go all the way. Meaning that you should not indiscriminately spawn unlimited amount of tasks. You’d need a second layer of controls about how many such tasks can be spawned at the maximum, enforced either by libraries like :jobs or opq, or your own DynamicSupervisor with a helper function e.g. start_child but with extra logic (f.ex. a counter of how many tasks are currently active etc.).


If you figure you’d use opq then the following could be a good implementation (as opposed to the indiscriminate Task.async spawning in the article):

# Somewhere in your app init (or make it part of your supervision tree).
# Give it a generous limit of workers: the BEAM VM can handle
# a lot of processes which gives you a leeway for stuff to catch up.
# Additionally, a conservative timeout is advised.
queue = OPQ.init(name: :our_workers, workers: 1_000, timeout: 30_000)

and:

# In your `GenServer`:

  @impl GenServer
  def handle_call({:do_the_thing, n}, from, state) do
    OPQ.enqueue(:our_workers, fn ->
      # Do long work here...
      nom_nom_nom(long_work)

      # ...and return.
       GenServer.reply(from, return_value_here)
    end)

    {:noreply, state}
  end

This, combined with the architecture from the article, gives you:

  1. A non-blocking GenServer (while still utilizing it as a bottleneck so as only one process manages certain state or fans out the calls to 3rd party APIs)
  2. A generous limit and timeout for each background task spawned by the GenServer
  3. Protection (backpressure) in extreme circumstances e.g. the GenServer will eventually block waiting for OPQ.enqueue if there are currently 1000 busy background tasks.

IMO pretty neat. I’ve done such things multiple times in my Elixir career and they are rock-solid – unless you need each worker to be persistent and never lost in which case I’m afraid you’d be much better off either paying for Oban Pro or rolling your own persistent (as in: backed by a database) workers.

2 Likes

Thanks for the heads-up on processes, this is something we’d been weary of as we’re going to be doing a lot of background processing. Our plan was to roll our queuing to batch the jobs so you can run 10-100 at any given time, but I’ll have a look at opq which looks like it might do some of this already!

Oh I absolutely recommend opq. It’s intuitive, super easy to use, solid, does what it says, and even has an option for rate limiting.

I found myself rolling my own job queues on occasion where a more custom-tailored solution is needed but most of the time you’d get your stuff done perfectly well with opq (or :jobs, also a very nice library, though it’s in Erlang but still freely usable from Elixir as I am sure you are aware).