Code Review: Batching GenServer

Hello!

If anyone has spare time, I’d be interested in any comments on some Elixir code I wrote. I’m a newbie to Elixir, but not to systems programming, etc. I’d love feedback on what’s idiomatic in Elixir, use of the OTP, naming conventions, etc.

I’m calling an API over the network that takes either (1) a single input or (2) an array of inputs.

For the sake of demonstration let’s say the API is called using a function api/1 and it just turns integers into strings. So (1) looks like this:

"1" = api(1)
"2" = api(2)
"3" = api(3)

… and (2) looks like this:

["1", "2", "3"] = api([1, 2, 3])

I can and have implemented batching manually before, but I thought I’d try to use Elixir to automatically batch my API calls. The annoying thing about batching is that it always requires significantly restructuring my code, e.g. a pipeline with Enum.map(&api/1) now becomes a lot more complex.

I wrote this GenServer:

defmodule BatchingServer do
  use GenServer

  # Should return `{outputs, leftovers}`, where `outputs` is `%{from => output}` for each processed item,
  # and `leftovers` is a list of the leftover items to be processed in the *next* batch.
  @type batch_processor() ::
          ([{GenServer.from(), any()}] ->
             {%{GenServer.from() => any()}, [{GenServer.from(), any()}]})

  @type data() :: %{
          interval_ms: String.t(),
          # Given a batch, returns true when the batch should be immediately processed.
          # Otherwise the batch will be processed when no calls have been received for `interval_ms`.
          should_process?: ([any()] -> boolean()),
          process_batch: batch_processor(),
          batch: [{GenServer.from(), any()}]
        }

  def init(data) do
    data = Map.put(data, :batch, [])
    {:ok, data}
  end

  def handle_info(:timeout, data) do
    {replies, data} = process_batch(data)

    Enum.each(replies, fn {from, output} ->
      :ok = GenServer.reply(from, output)
    end)

    {:noreply, data}
  end

  def process(server, data) do
    GenServer.call(server, {:process, data})
  end

  def handle_call({:process, input}, from, data) do
    %{
      interval_ms: interval_ms,
      should_process?: should_batch?,
      batch: batch
    } = data

    data = %{data | batch: [{from, input} | batch]}

    if !should_batch?.(batch) do
      {:noreply, data, interval_ms}
    else
      {replies, data} = process_batch(data)

      Enum.each(replies, fn {reply_from, output} ->
        if reply_from != from do
          :ok = GenServer.reply(reply_from, output)
        end
      end)

      reply = Map.get(replies, from)
      {:reply, reply, data}
    end
  end

  @spec process_batch(data()) :: {%{GenServer.from() => any()}, data()}
  def process_batch(data) do
    %{
      process_batch: process_batch,
      batch: batch
    } = data

    {replies, leftovers} = process_batch.(batch)
    {replies, %{data | batch: leftovers}}
  end
end

When using it, the following:

# Pretend api/1 takes time to run...
inputs = [1, 2, 3]
inputs
|> Enum.map(fn input -> Task.async(fn -> api(input) end) end)
|> Task.await_many()

… becomes:

{:ok, server} =
  GenServer.start_link(BatchingServer, %{
    interval_ms: 1000,
    should_process?: fn batch ->
      length(batch) > 5
    end,
    process_batch: fn batch ->
      outputs = batch
      |> Enum.map(fn {from, input} -> input end)
      |> api()
      |> Enum.zip(batch)
      |> Enum.map(fn output, {from, _input} -> {from, output} end)
      |> Enum.into(%{})
      {outputs, []}
    end
  })

inputs
|> Enum.map(fn input -> Task.async(fn -> BatchingServer.process(server, input) end) end)
|> Task.await_many()

I know it’s a bit of code and the example is super contrived. I’d love feedback!

Just read through the code quickly so apologies if this is not what you’re looking for, but it seems like this is something you might be able to handle with core library functions before reaching for GenServer, Enum.chunk_every and Task.async_stream maybe? Anyway, that would be my main question if I was reviewing this, nothing in the code itself leaps out to me as particularly non-idiomatic.

EDIT: I guess you might want to consider moving the process logic itself into a handle_cast so it’s not blocking…

1 Like

Thanks for replying!! Yea no worries, it’s a bit of code, thanks for looking it over! Sorry if I am misunderstanding your suggestions in my reply below!

it seems like this is something you might be able to handle with core library functions before reaching for GenServer, Enum.chunk_every and Task.async_stream maybe

Yea I was wondering about this! I don’t think so unfortunately, because while Enum.chunk_every would help me split up a list of items into chunks, it wouldn’t help me batch a sequence of individual calls. I want to make a bunch of individual calls like so:

out1 = api(in1)
out2 = api(in2)
...
outN = api(inN)

and behind the scenes batch them into one API call. I actually already use chunk_every and Task.async_stream in my manual batching implementation, but it’s annoying because I have to keep track of keys for each input. In particular I can’t just write

inputs
|> Enum.map(fn input -> Task.async(fn -> api(input) end) end)
|> Task.await_many()

… anymore, instead I have to write something like

inputs
|> fn input -> {key_for_input(input), input} end
|> Enum.chunk_every(50)
|> Task.async_stream (fn batch
  # the api only cares about the values...
  api(Enum.map(batch, &(elem(&1, 1)))
  # but we need to reassociate the output values with their keys afterwards
  |> Enum.zip(Enum.map(batch, &(elem(&1, 0)))
)
|> Enum.to_list
|> Enum.flat_map(...)

And I have to write this for every new API call…

I think I can’t use handle_cast because I do care about the results, e.g. I want to still be able to write code like this:

out1 = api(in1)
out2 = api(in2)
...
outN = api(inN)

I just want it to be batched behind the scenes.

I saw this library for Erlang: GitHub - rabbitmq/gen-batch-server: A generic batching server for Erlang and Elixir but I tried to understand what was going on and it seems very lightly documented.

1 Like

Thought of some issues with my implementation (I’m sure there are more):

  • I could probably make the process_batch interface simpler & less error-prone by just inferring leftovers from outputs instead of requiring that the user compute it.
  • If process_batch returns an empty outputs map, i.e. nothing actually got processed, I should probably raise an error because that means that the server is now stuck.
  • Bug: If leftovers is not empty, I need to set a new timeout; otherwise, if there are no further calls to process, the server could potentially never run process_batch on the leftovers.

I can’t claim I am an authority but here’s what I am using as an idiom when I need the following:

  1. A GenServer accumulating items to process inside its state;
  2. A GenServer that processes the currently accumulated items after a certain deadline, regardless of when you last queued an item – meaning that if you fire it up with initial 10 items and it has a periodic flush of 5 seconds set up and you don’t queue up anything after the initial 10 items, it will process them after 5 seconds. If you queue up another 100 items before the 5 seconds expire then it will process all 110 items. The point being, it will attempt to process its outstanding items each 5 seconds sharp, non-negotiable and independent on when do you enqueue items.

So here’s the code (76 lines):

defmodule YYY.BatchingWorker do
  use GenServer, restart: :permanent

  @wait_before_processing_millis 5_000

  def start_link(inputs) do
    GenServer.start_link(
      __MODULE__,
      inputs,
      name: __MODULE__
    )
  end

  def child_spec(inputs) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [inputs]},
      restart: :permanent
    }
  end

  @impl GenServer
  def init(inputs) do
    {:ok, %{timer_ref: make_timer(self()), inputs: inputs}}
  end

  @impl GenServer
  def handle_call({:enqueue, input}, _from, %{inputs: inputs} = state) do
    new_state = Map.put(state, :inputs, inputs ++ [input])
    {:reply, :ok, new_state}
  end

  @impl GenServer
  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  @impl GenServer
  def handle_info(:process, %{timer_ref: timer_ref, inputs: inputs}) do
    if [] != inputs do
      process_batch(inputs)
    end

    cancel_timer(timer_ref)
    new_timer_ref = make_timer(self())

    {:noreply, %{timer_ref: new_timer_ref, inputs: []}}
  end

  def enqueue(input) do
    GenServer.call(__MODULE__, {:enqueue, input})
  end

  def get_state() do
    GenServer.call(__MODULE__, :get_state)
  end

  def process() do
    send(__MODULE__, :process)
  end

  def stop() do
    GenServer.stop(__MODULE__)
  end

  def process_batch(inputs) do
    IO.puts("Processing #{inspect(inputs)}")
  end

  defp make_timer(pid) when is_pid(pid) do
    Process.send_after(pid, :process, @wait_before_processing_millis)
  end

  defp cancel_timer(nil), do: false
  defp cancel_timer(ref) when is_reference(ref), do: Process.cancel_timer(ref)
end

Notice the following:

  • You can just put YYY.BatchingWorker inside your supervision tree without options, or you can start it up manually whenever you wish (if you specify name: YourChosenModuleName though! Otherwise the convenience functions I specified below will crash – they rely on the GenServer being named after the module they are put in).
  • Accumulating items to process does not have to be synchronous (it’s currently handled by handle_call); you can migrate it to handle_cast if desired so queuing items will never wait on anything.
  • The message to process stuff is NOT a GenServer message; it’s a lower-level process message and that’s why it’s handled by handle_info – this is needed because we want to be able to both (1) manually trigger processing and (2) have a timer trigger processing for us.
  • I have added utility / convenience functions:
    • enqueue (enqueue item or items)
    • get_state (for debugging, feel free to remove)
    • process (manually start processing before the deadline expires)
    • stop (to kill the GenServer: again for debugging purposes and again feel free to remove).
  • You can of course change the name of the module, I’ve used __MODULE__ everywhere so you just have to change the name once in the defmodule definition.
  • process_batch is obviously not set in stone, you might supply a function in the GenServer’s initial state even, it does not have to live inside the GenServer module after all.
  • You cannot modify the handle_info(:process) handler to return something different than :ok if the current batch to process is empty. I left that out because to me that’s not an error; I’d presume it’s possible that no items were enqueued within the deadline (if the deadline is short enough). But if you want to differentiate between both cases (no items vs. there are items), it’s doable – exercise for the reader. :smiley:

I’ve used the above code in more than one contract and personal projects and it works pretty well. You can just put it in a toy project and play with it:

{:ok, _} = GenServer.start_link(YYY.BatchingWorker, [1, 2], name: YYY.BatchingWorker)
YYY.BatchingWorker.process()
YYY.BatchingWorker.enqueue([1,2,3,4,5])
:timer.sleep(5000)
# etc.

Hope that’s helpful. I wouldn’t get paralyzed on what’s idiomatic per se, I’d more look into the proper OTP building blocks to model my problem.

3 Likes

Thanks so much @dimitarvp! Your usecase sounds very similar to mine with the exception of my desire to have the caller block. Will take a look at your code to learn from it! Really appreciate you sharing this!!

Don’t mention it. Your post intrigued me and you’re very polite and have made an effort. It was a pleasure.

Interested what will your final solution be, if you would be so kind to share after you author it.

1 Like

Will do! It might take a while because I’m sure that after I incorporate what I’ve learned from your code + deploy it into the BatchingServer I posted I’ll hit some unexpected cases. For example, yesterday I realized that I could use inspect(foo) in my own error messages to make it a lot easier to debug crashes. It’s really nice that things have nice printable representations in Elixir by default, coming from C++/Java-land. I am happy to mark your reply as the solution. Thanks again!!

1 Like