Best practice for doing fanout async calls

I have a GenServer defined as a module X whose each instance can be instantiated with a different parameter. My app creates multiple instances of this GenServer. When serving an coming request, the main flow needs to send a message to each instance of X asynchronously, so each instance can process the call in parallel. The main flow waits to get all the responses before moving on.

I am not sure what’s the best way to do this. Currently this is what I am doing.

defmodule X do
  ...
  def handle_cast({:doit, caller, p1, ..}, state) do
    ..
    send(caller, ...)
    {:noreply, state}
  end
  ..
  def do_it_async(server, caller, p1, ...) do
    GenServer.cast(server, {:doit, caller, p1, ..})
  end
end

defmodule Other do
  ..
  def handleRequest(...) do
    servers_list = ..
    servers_list |> Enum.each(&X.do_it_async(&1, self(), ..)
    results = collect_results([], length(servers_list), 0)
    ..
  end

  defp collect_results(results, count, received) when count == received, do: {:ok, results}
  defp collect_results(results, count, received) do
    receive do
      {:x_result, ..} ->
        collect_results([{..} | results], count, received + 1)
    after
      5000 -> :timeout
    end
  end

This is obviously very tedious and error-prone. What is the recommended / common way to do this kind of thing in Elixir?

I am aware of Task.async_stream and I could provide a sync version of the API. However, unless I misunderstood it, I don’t think that’s the right way to do it because it would spawn so many processes just to call the API and wait for the result. Each X instance is already a process.

1 Like

Hi @tsukit,

If your intention is just parallel processing or computation along with throttling, then you can take a look at Elixir Flow.

https://hexdocs.pm/flow/Flow.html

If it is something different, then a little more elaborate would help understand the problem you’re trying to solve.

Thanks.

2 Likes

Registry also provides a nice way to do pub-sub without having to introduce GenStage or back pressure or anything like that, https://hexdocs.pm/elixir/Registry.html#module-using-as-a-pubsub

3 Likes

Thanks @pmangalakader. It’s for parallel processing. I need to do task T1, T2, … Tn in parallel. I create different instances of X for this purpose and also to prevent having to spawning so many process for each request. I’m not sure how this will be model with Flow though (I am very familiar with stream concepts like Akka Stream). Would you be kind giving an example using Flow?

Hi, you should probably go with what @entone suggested.

Simply use the Registry to register each instance of your GenServer process and you’re good to go

@tsukit, the example in the link can easily illustrate word count in parallel. I had the problem of loading GeoData into a database before starting an application. I had tried to use GenServers with poolboy, but it required a lot of manual tweaks to get it to optimized working. So I tried to search for alternatives and found Flow that had used Genstages and sufficed my needs at that time. At the moment I don’t have that code with me.

The below thread has some important needs for understanding Flow:

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

The magic here is the :min_demand and :max_demand, it uses the number of cores by default, but you can increase it or decrease it based on your needs.