On our server, we have some remote resources that we want to fetch and periodically update in the background. To update these resources we need to make http requests to third-party services that have rate limits. This means we want to lock down the number of concurrent requests and also pause requests when the third party sends a 429 response. We’ve had a few thoughts about how to do this, but so far each has its pitfalls.
On a high level, the layout of our system looks like this…
TheThing
- stores info in the state about the resource that we can save to the database etcNetworkResource
- a single-threaded http request queue that when presented with a 429 waits and retries the request
When the TheThing
decides to update, it needs to make multiple calls to NetworkResource
to re-fetch and save the result back into the database. Depending on the local state and remote state, not all network calls may need to be fired, so for example if the timestamp of the first remote resource is the same as the local one, subsequent calls may not need to be made.
We’ve played around with a couple of different methods, but none feel ideal and it feels like we’re swimming against the flow a little bit. Some of it might be down to our C++ & JavaScript background where “async good, sync bad”. I’ve tried to cut the code examples down to the bare bones just so you get the gist…
Method 1: Use callbacks
In the first method, we’ve been using multiple GenServers and writing queues to manage the amount of requests. When we make a request to the NetworkResource
we pass a function definition to callback on completion. This has the advantage that none of the processes become locked, but makes the code quite complex and spaghetti-ish, especially as it grows and you have multiple resource calls and callbacks.
defmodule Test.TheThing do
use GenServer
def update(id) do
GenServer.call(__MODULE__, {:update})
end
def callback_fetch_resource(response) do
GenServer.call(__MODULE__, {:callback_fetch_resource, response})
end
def handle_call({:update}, from, state) do
next_state = %{ action: "wait_fetch_resource" }
Test.NetworkResource.fetch(&callback_fetch_resource/1)
{:reply, :ok, next_state}
end
def handle_call({:callback_fetch_resource, response}, from, state) do
# Do some stuff, fire off the next request using the same callback pattern
# ...
next_state = %{ action: "wait_fetch_other_resource" }
{:reply, :ok, next_state}
end
end
defmodule Test.NetworkResource do
def init(_) do
Process.send_after(self(), :do_next, 1000)
{:ok, %{ requests: [] }}
end
def fetch(callback) do
GenServer.call(__MODULE__, {:fetch, callback})
end
def handle_call({:fetch, callback}, from, state) do
next_state = %{ requests: state.requests ++ [:fetch, callback]}
{:reply, :ok, next_state}
end
def handle_info({:do_next}, state) do
# Check if anything is running, if not create a new `Task` to run the actual HTTP request
# and on completion run callback.(response) to reply to the calling GenServer
Process.send_after(self(), :do_next, 1000)
{:noreply, state}
end
end
Method 2: Have GenServers block
In the second method, we’ve tried to make more use of the Task
module and have the NetworkResource
GenServer block when it’s busy. When we have multiple requests that we want to make all at once, we can also use Task.async_stream
to send off a batch and wait for the reply. This gives the advantage of being able to write more linear-looking code, but then the NetworkResource
GenServer becomes unresponsive while it’s waiting for network stuff to happen. Depending on how busy the network resource is, we may need a large timeout when making the call.
TheThing
GenServer now just manages the local state and database storage and we’ve introduced a TheThingUpdater
which manages fetching the resources and providing updates back to multiple instances of TheThing
defmodule Test.TheThing do
use GenServer
end
defmodule Test.TheThingUpdater do
use GenServer
def update(id) do
GenServer.call(__MODULE__, {:update})
end
def handle_call({:update}, from, state) do
task = Task.async(fn ->
response = Test.NetworkResource.fetch()
# Do some stuff, fire off the next request
# ... response2 = Test.NetworkResource.fetch2()
# ... response3 = Test.NetworkResource.fetch3()
{:ok, :ok}
end)
next_state = %{ running: state.running ++ [task.ref] }
{:reply, :ok, next_state}
end
def handle_info(msg, state) do
next_state = case msg do
{sender, {:ok, value}} ->
next_running = Enum.filter(state.running, fn task -> task != sender end)
%{state | running: next_running}
_ -> state
end
{:noreply, next_state}
end
end
defmodule Test.NetworkResource do
def fetch(callback) do
GenServer.call(__MODULE__, {:fetch}, :infinity) # Replace infinity with something more reasonable like an hour
end
def handle_call({:fetch, callback}, from, state) do
# Make the http request using HTTPoison etc and handle 429 errors
# ...
{:reply, http_response_body, state}
end
end
Obviously, our actual use case is more complex than this example, it has multiple requests and a couple of different rate-limited resources. We’re just after some guidance on some of the recommended patterns for doing stuff like this and some pitfalls that we might fall into before ploughing more effort into what we’re doing. Thanks!