I want to call many GenServers at once (specific use-case is collecting data for metrics, but I think the idea might be widely applicable).
Of course I can do it sequentially:
Enum.map(targets, fn {server, request} -> GenServer.call(server, request) end)
But that might become too slow.
The requests are fully independent of each other, so I thought it makes sense to parallelize them. I haven’t found any built-in way to do it, so I cooked up the following abomination, that uses the low-level messages that are actually sent when doing GenServer.call. Of course, my implementation is very bare-bones and probably doesn’t handle many corner cases.
def parallel_call(targets, timeout \\ 5000) do
targets = Enum.map(targets, fn {server, request} ->
ref = Process.monitor(server)
{server, request, ref}
end)
monitors = Map.new(targets, fn {server, _, ref} -> {ref, server} end)
Enum.each(targets, fn {server, request, ref} ->
send(server, {:"$gen_call", {self(), {:ref, ref}}, request})
end)
collect_responses(targets, monitors, %{}, System.os_time(:millisecond) + timeout)
end
defp collect_responses(targets, monitors, responses, deadline) do
if map_size(responses) == length(targets) do
Enum.map(targets, fn {_, _, ref} -> responses[ref] end)
else
remaining_timeout = max(0, deadline - System.os_time(:millisecond))
receive do
{{:ref, ref}, reply} ->
Process.demonitor(ref)
responses = Map.put(responses, ref, {:ok, reply})
collect_responses(targets, monitors, responses, deadline)
{:DOWN, ref, _, _, reason} ->
responses = Map.put(responses, ref, {:error, reason})
collect_responses(targets, monitors, responses, deadline)
after
remaining_timeout ->
exit(:timeout)
end
end
end
What do you think? Are there flaws with this implementation? Is there a better way to do it?