Calling many GenServers in parallel

I want to call many GenServers at once (specific use-case is collecting data for metrics, but I think the idea might be widely applicable).

Of course I can do it sequentially:

Enum.map(targets, fn {server, request} -> GenServer.call(server, request) end)

But that might become too slow.

The requests are fully independent of each other, so I thought it makes sense to parallelize them. I haven’t found any built-in way to do it, so I cooked up the following abomination, that uses the low-level messages that are actually sent when doing GenServer.call. Of course, my implementation is very bare-bones and probably doesn’t handle many corner cases.

  def parallel_call(targets, timeout \\ 5000) do
    targets = Enum.map(targets, fn {server, request} ->
      ref = Process.monitor(server)
      {server, request, ref}
    end)

    monitors = Map.new(targets, fn {server, _, ref} -> {ref, server} end)

    Enum.each(targets, fn {server, request, ref} ->
      send(server, {:"$gen_call", {self(), {:ref, ref}}, request})
    end)

    collect_responses(targets, monitors, %{}, System.os_time(:millisecond) + timeout)
  end

  defp collect_responses(targets, monitors, responses, deadline) do
    if map_size(responses) == length(targets) do
      Enum.map(targets, fn {_, _, ref} -> responses[ref] end)
    else
      remaining_timeout = max(0, deadline - System.os_time(:millisecond))

      receive do
        {{:ref, ref}, reply} ->
          Process.demonitor(ref)
          responses = Map.put(responses, ref, {:ok, reply})
          collect_responses(targets, monitors, responses, deadline)
        {:DOWN, ref, _, _, reason} ->
          responses = Map.put(responses, ref, {:error, reason})
          collect_responses(targets, monitors, responses, deadline)
      after
        remaining_timeout ->
          exit(:timeout)
      end
    end
  end

What do you think? Are there flaws with this implementation? Is there a better way to do it?

1 Like

Would not Task.await_many/2 suffice? Like

timeout = 5_000
tasks =
  for {server, request} <- targets do
    Task.async(fn -> GenServer.call(server, request, timeout) end)
  end

Task.await_many(tasks)
4 Likes

You probably should not do this, but you can call a GenServer without using GenServer.call/3, but with plain send/2, and get the result back using receive/1.

Under the hood, GenServer.call/3 just sends a message with a specific format to a process (does not have to be a GenServer process), and expects that process to send back a message with another specific format within timeout milliseconds.

All you need to do is figure out what the formats of the request and reply messages are, and forge those kind of messages by your self.

There’s APIs to do that in erlang: gen_server — stdlib v6.0.1 and gen_server — stdlib v6.0.1 / gen_server — stdlib v6.0.1

1 Like

Thank you for that information. I didn’t know there are already such APIs in Erlang.

Great talk about GenServer.call by Isaac Yonemoto:

I like this approach the most, thank you! This uses only the standard functionality, without low-level trickery, so probably the safest option.

It’s a bit scary to create so many extra processes, but I hear processes are cheap on the BEAM :slight_smile:

That’s kind of what I did, see the second code snippet.

Thanks for the references! Didn’t notice these functions when I was browsing the docs.

However, I don’t think receive_response/wait_response/check_response will be able to handle the case of receiving many responses at once, since they all depend on exact request reference.

1 Like

If the total number is less than 100K processes, you should be fine.

2 Likes

Nitpick: the Task.async version has very different behavior in the face of crashing / missing targets.

The original will include an {:error, :noproc} or {:error, exit_reason} in the result for targets that crash or aren’t running.

The Task version will exit the calling process instead.

1 Like