Beginner OTP/GenServer design question

Hi all, I’m wondering if I can get some advice on basic OTP design:

I need to batch send API requests in batches of 5 but no request should wait for more than 10 seconds before being sent.

I have a flawed working version - a batch_server GenServer whose state is a map with a reference id key and value is a list of api tasks:

state = %{batch_id => [ task_1, task_2 ... ]}

The batch_id would allow us to identify the 10 second timeout via a handle_info call:

def handle_call({:new_task, task}, _from, state) do
  current_batch = Enum.find(state, fn {k, v} -> length(v) < 5 end)

  case current_batch do
    nil ->
      batch_id = gen_random_id()
      Process.send_after(self(), {:batch_timeout, batch_id}, 10 * 1000)

     {:reply, :ok, Map.put(state, batch_id, [task]}

    {batch_id, tasks} when length(tasks) == 4 -> 
      # SYNCHRONOUSLY send batched API request
    
    {batch_id, tasks} -> 
      #  Map.update!(state, batch_id, & [task | &1])
  end
end

def handle_info({:batch_timeout, batch_id}, state) do
  if Map.get(state, batch_id) do
    # SYNCHRONOUSLY send batched API request
  else
    # Current batch was already sent as it reached batch limit of 5
  end
end

This :point_up: works fine, but the design is flawed because the API call is synchronous and therefore blocks my GenServer.

I’m looking for an alternative way around this - I was wondering if I should create a new BatchServer instance for each new batch and parent GenServer who keeps a register of BatchServers and can therefore allocate tasks to a running BatchServer or spin up a new one if there are none open.

Any advice apprciated!

1 Like

This matches what :poolboy does pretty well - it’s an Erlang library, but here’s a writeup on how to use it in Elixir

1 Like

When you say SYNCHRONOUSLY is it that you want to

  1. Just ensure errors are propagated
  2. Never execute two batches at the same time
  3. Return a value from executing the batch?

Ensure errors are handled (case 1)

Here we’re just using spawn_link() to run the work, so the batch processor is not blocked, but if something goes wrong with the batch the processor is killed too. Here it might happen that multiple batches are run in parallel at the same time.

def handle_call({:new_task, task}, _from, state) do
  current_batch = Enum.find(state, fn {k, v} -> length(v) < 5 end)

  case current_batch do
    nil ->
      batch_id = gen_random_id()
      Process.send_after(self(), {:batch_timeout, batch_id}, 10 * 1000)

     {:reply, :ok, Map.put(state, batch_id, [task]}

    {batch_id, tasks} when length(tasks) == 4 -> 
      # SYNCHRONOUSLY send batched API request
      state = Map.update!(state, batch_id, & [task | &1])
      {:noreply, state} = handle_info({:batch_timeout, batch_id}, state)
      {:reply, :ok, state}
    
    {batch_id, tasks} -> 
      {:reply, :ok, Map.update!(state, batch_id, & [task | &1])}
  end
end

def handle_info({:batch_timeout, batch_id}, state) do
  case Map.pop(state, batch_id) do
    {nil, _state} -> {:noreply, state}
    {batch, state} -> 
      spawn_link(fn -> work(batch) end)
      {:noreply, state}
  end
end

Never execute two batches at the same time (case 2)

For this I’m moving the start of the timer into the caller and change from a GenServer.call() to a GenServer.cast() – So now the processing is after each other and never blocks. But as a result the server might get overloaded when batch processing is slower than incoming batches…

def new_task(task) do
  batch_id = gen_random_id()
  Process.send_after(__MODULE__, {:batch_timeout, batch_id}, 10 * 1000)
  GenServer.cast(__MODULE__, {:new_task, task, batch_id}})
end

def handle_cast({:new_task, task, batch_id}, _from, state) do
  current_batch = Enum.find(state, fn {k, v} -> length(v) < 5 end)

  case current_batch do
    nil ->
     {:noreply, Map.put(state, batch_id, [task]}

    {batch_id, tasks} when length(tasks) == 4 -> 
      state = Map.update!(state, batch_id, & [task | &1])
      handle_info({:batch_timeout, batch_id}, state)

    {batch_id, tasks} -> 
      {:noreply, Map.update!(state, batch_id, & [task | &1])}
  end
end

def handle_info({:batch_timeout, batch_id}, state) do
  case Map.pop(state, batch_id) do
    {nil, _state} -> {:noreply, state}
    {batch, state} -> 
      # SYNCHRONOUSLY send batched API request
      work(...)
      {:noreply, state}
  end
end

Return a value from the batch (case 3)

In case you want to return a value from the batch processing / task processing back to the one who submitted the task you can combine that either with case 1 or two 2. Just in addition you have to use the GenServer.call() with {:noreply, …} and GenServer.reply() to reply after the batch submission is done. Left as an exercise for the reader :wink:

3 Likes

@al2o3cr
Great. Thanks for pointing that out, I’ve never used directly but assumed this was a pattern that is widely used. I’ll take a look at the docs.


@dominicletz
This is fantastic answer, I really appreciate that!

This is a challenge I have for a job I’m applying for, I am not sure about errors and responses, but yes they were on my mind when I posed the question. I don’t mind about requests going out at the same time.

I’ll take a closer look at your solutions shortly :pray: