Waiting for multiple tasks in a genserver

I have a genserver that wants to spawn multiple tasks (in this case multiple api calls to different services) and wait for all of them to return, and then process all of the responses simultaneously.

I was originally going to use Task.await_many/2 to well, wait for all of the tasks to return, but this comment in the documentation made me doubt:

It is not recommended to await long-running tasks inside an OTP behaviour such as GenServer

The recommendation is to instead match on the message coming from a task inside your GenServer.handle_info/2

The docs have a really nice example for monitoring a single task, but how would you write that handle_info function to wait for multiple tasks to return (especially things like handling timeouts for the individual tasks)?

1 Like

I would assume Task.Supervisor.async_stream_nolink is the starting point. Interesting question and I hope someone can provide a concrete example for you.

1 Like

Oh interesting. I have to admit that I find the concept of a concurrent stream to be kind of mind-bending.

What’s responsible for deciding to make those requests? If it’s a request from outside doing something like GenServer.call(pid, :make_all_the_requests), what should happen if another make_all_requests comes in while the first one is still processing?

One straightforward alternative would be to have the GenServer launch a single Task which then launches the others and does an await_all.

2 Likes

Good questions!

The thing responsible for deciding to make the requests is indeed external. (A message comes in via an API → that message is forwarded to a bunch of external & internal processes that analyze it in parallel → the results are all analyzed jointly when they come back).

what should happen if another make_all_requests comes in while the first one is still processing?

That’s a great question! I’d love to know how you’d handle both the scenario where you process them in series, and where you abort the pending one. Kinda new to OTP so still trying to figure this all out!

The way you describe it sounds like a good fit for Oban with a good uniqueness constraint (i.e. if request 1234 is currently going on and 2/5 API requests are complete but the others are still in progress so don’t schedule another copy) will serve you well.

If you don’t want to use a library you can just have something that uses a DynamicSupervisor. Through a combination of start_child, which_children and waiting for messages in a designated monitor/controlling process you can easily achieve deduplication and parallelism.

Well, they are just that – recommendations. The authors can’t know all your application requirements. If I wasn’t worried about reliability and fault-tolerance I’d just use Task.await_many indeed. Seems like an almost perfect fit for your scenario.

1 Like

I think I was overthinking this a bit.

def handle_call(:do_all_the_things, _from, state) do
    urls = [url1, url2, url3 ... ]
    

    state = urls 
            |> Enum.reduce(state, fn url, acc -> 
                 task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)
                 # After we start the task, we store its reference and the url it is fetching
                 state = put_in(acc.tasks[task.ref], url)
              end)

    {:reply, :ok, state}
  end

# The task completed successfully
  def handle_info({ref, answer}, %{ref: ref} = state) do
    # We don't care about the DOWN message now, so let's demonitor and flush it
    Process.demonitor(ref, [:flush])
    # Do something with the result and then return
    {:noreply, %{state | ref: nil}}
  end

  # The task failed
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
    # Log and possibly restart the task...
    {:noreply, %{state | ref: nil}}
  end

The new concern about what to do if a second do_all_the_things comes in is more interesting. Probably have to track it in the state as a map. For the simple URL fetch example something like state = %{ all_the_things_being_done: [ url1: task1_ref ... ]}. If the value is not empty, only make a new task for urls that are not already pending and add the url/task reference pair to the state.

1 Like

Thanks for the code!

One question about it: I may be misunderstanding it, but it seems like the handle_info callback for “task completed successfully” will get called for every sub-task; where in the code do you process all of the responses (ie, the equivalent of an await_many)?

I’m excited to figure out the “multiple do_all_the_things scenario” since it will make me dive a bit deeper into understanding how to use the mailbox.

You are correct about how that example handles the sub-tasks. I think if you want to handle them all as a group that acts as a single unit you need another layer of Task.Supervisor as suggested here: Waiting for multiple tasks in a genserver - #4 by al2o3cr

Within that supervisor you then handle the subtasks and “bubble up” the response to the original GenServer.

To build upon the proposed solution, I’d say you can batch :do_all_the_things in one separate process. Ignoring task supervisor, this is a rough code.

def handle_call(:do_all, _from, state) do
urls = […]

gen_server_pid = self()

batch_pid = spawn(fn ->
  tasks = Enum.map(urls, START_TASK_FOR_ONE_URL)
  result = Task.await_many(tasks)
  # You can include a kind of ID here too to distinguish batches
  send(gen_server_pid, {:batch_result, result})
end)

# Keep batch_pid if you need it
{:reply, :ok, update_state(state, batch_pid)}
end

def handle_info({:batch_result, result}, state) do
  # Process results
  {:noreply, state}
end

You get the idea. You start a separate process that spawns processes to process each mini task, then gathers all results and sends them to gen server in one message.

3 Likes