OTP Batch processing

I am currently attempting to design a set of modules which uses batching under the hood while using a single ID function definition. In concrete terms, I would have a function definition like:

batcher = SomeBatcher.new()
{:ok, res} = SomeBatcher.load(batcher, "1")

Repeated instances of this call would batch them in groups and run some arbitrary code against said group. It’s some what similar to dataloaders in that respect. So far, I’ve go this working:

defmodule Batcher do
  @moduledoc false

  use GenServer

  defmodule State do
    defstruct [:batch_fn, :max_batch_count, :timeout, :timer, pending: %{}]
  end

  def start_link([batch_fn, max_batch_count]) do
    start_link(batch_fn, max_batch_count)
  end

  def start_link([batch_fn, max_batch_count, timeout]) do
    start_link(batch_fn, max_batch_count, timeout)
  end

  def start_link(batch_fn, max_batch_count, timeout \\ 1_000) do
    GenServer.start_link(
      __MODULE__,
      {:ok, %State{batch_fn: batch_fn, max_batch_count: max_batch_count, timeout: timeout}}
    )
  end

  def init({:ok, state}) do
    {:ok, state}
  end

  def load(batcher_pid, id) do
    GenServer.call(batcher_pid, {:add, id})
  end

  def handle_call({:add, id}, from, %State{pending: pending} = state) do
    pending = Map.put(pending, id, from)
    state = %{state | pending: pending}

    IO.inspect("adding #{id} into pending stack")
    {:noreply, schedule_timeout(state), {:continue, :flush_on_capacity}}
  end

  defp schedule_timeout(%State{timer: timer_ref} = state) do
    case timer_ref do
      nil ->
        %{state | timer: create_timer(state)}

      existing_timer ->
        Process.cancel_timer(existing_timer)
        %{state | timer: create_timer(state)}
    end
  end

  def handle_continue(
        :flush_on_capacity,
        %State{pending: pending, max_batch_count: max_size} = state
      )
      when map_size(pending) >= max_size do
    IO.inspect("flushing from max capacity")
    {:noreply, flush(state)}
  end

  def handle_continue(:flush_on_capacity, state), do: {:noreply, state}

  defp create_timer(%State{timeout: timeout}), do: Process.send_after(self(), :timeout, timeout)

  def handle_info(:timeout, state) do
    IO.inspect("flushing from timeout")
    {:noreply, flush(state)}
  end

  defp flush(%State{batch_fn: batch_fn, pending: pending, timer: timer_ref} = state) do
    IO.inspect("flushing jobs: #{inspect(pending)}")

    res =
      pending
      |> Map.keys()
      |> batch_fn.()
      |> Enum.into(%{})

    for {id, from} <- pending do
      GenServer.reply(from, {:ok, res[id]})
    end

    Process.cancel_timer(timer_ref)
    %{state | pending: %{}}
  end
end

I haven’t used GenServer.reply/2 before, but reading the documentation seems to make it clear enough. The actual question I have is:

  • Is it possible due to how I am wiping the pending state at flush/1 that I could remove pending jobs that have not been actioned?

Let’s say I have the following configuration on the batcher:

  • 5 maximum capacity
  • Timeout of 1 second

With this configured batcher, I send 7 load/2 calls, where the first argument is the batcher process and the second some string. I would assume that the first 5 that run are queued. Once the threshold is met, it will execute the batch function on said 5. What I don’t understand is the possibility of the sixth or seventh load/2 call being executed at the same, which may be lost when the pending map is reset. My tests don’t seem to indicate that this happens (thankfully), which leads me to believe that regardless of handle_continue and the like that the Erlang process is running each task one by one, which I think it does.

Does this assumption line up within other people’s expectations? I think it might based off my understanding of the concurrency model, but if there are more nuances I am overlooking, I would welcome any feedback.