I have a genserver that wants to spawn multiple tasks (in this case multiple api calls to different services) and wait for all of them to return, and then process all of the responses simultaneously.
I was originally going to use Task.await_many/2
to well, wait for all of the tasks to return, but this comment in the documentation made me doubt:
It is not recommended to await
long-running tasks inside an OTP behaviour such as GenServer
The recommendation is to instead match on the message coming from a task inside your GenServer.handle_info/2
The docs have a really nice example for monitoring a single task, but how would you write that handle_info
function to wait for multiple tasks to return (especially things like handling timeouts for the individual tasks)?
1 Like
I would assume Task.Supervisor.async_stream_nolink
is the starting point. Interesting question and I hope someone can provide a concrete example for you.
1 Like
Oh interesting. I have to admit that I find the concept of a concurrent stream to be kind of mind-bending.
What’s responsible for deciding to make those requests? If it’s a request from outside doing something like GenServer.call(pid, :make_all_the_requests)
, what should happen if another make_all_requests
comes in while the first one is still processing?
One straightforward alternative would be to have the GenServer launch a single Task
which then launches the others and does an await_all
.
2 Likes
Good questions!
The thing responsible for deciding to make the requests is indeed external. (A message comes in via an API → that message is forwarded to a bunch of external & internal processes that analyze it in parallel → the results are all analyzed jointly when they come back).
what should happen if another make_all_requests
comes in while the first one is still processing?
That’s a great question! I’d love to know how you’d handle both the scenario where you process them in series, and where you abort the pending one. Kinda new to OTP so still trying to figure this all out!
The way you describe it sounds like a good fit for Oban
with a good uniqueness constraint (i.e. if request 1234 is currently going on and 2/5 API requests are complete but the others are still in progress so don’t schedule another copy) will serve you well.
If you don’t want to use a library you can just have something that uses a DynamicSupervisor
. Through a combination of start_child
, which_children
and waiting for messages in a designated monitor/controlling process you can easily achieve deduplication and parallelism.
Well, they are just that – recommendations. The authors can’t know all your application requirements. If I wasn’t worried about reliability and fault-tolerance I’d just use Task.await_many
indeed. Seems like an almost perfect fit for your scenario.
1 Like
I think I was overthinking this a bit.
def handle_call(:do_all_the_things, _from, state) do
urls = [url1, url2, url3 ... ]
state = urls
|> Enum.reduce(state, fn url, acc ->
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)
# After we start the task, we store its reference and the url it is fetching
state = put_in(acc.tasks[task.ref], url)
end)
{:reply, :ok, state}
end
# The task completed successfully
def handle_info({ref, answer}, %{ref: ref} = state) do
# We don't care about the DOWN message now, so let's demonitor and flush it
Process.demonitor(ref, [:flush])
# Do something with the result and then return
{:noreply, %{state | ref: nil}}
end
# The task failed
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
# Log and possibly restart the task...
{:noreply, %{state | ref: nil}}
end
The new concern about what to do if a second do_all_the_things
comes in is more interesting. Probably have to track it in the state as a map. For the simple URL fetch example something like state = %{ all_the_things_being_done: [ url1: task1_ref ... ]}
. If the value is not empty, only make a new task for urls that are not already pending and add the url/task reference pair to the state.
1 Like
Thanks for the code!
One question about it: I may be misunderstanding it, but it seems like the handle_info
callback for “task completed successfully” will get called for every sub-task; where in the code do you process all of the responses (ie, the equivalent of an await_many
)?
I’m excited to figure out the “multiple do_all_the_things
scenario” since it will make me dive a bit deeper into understanding how to use the mailbox.
You are correct about how that example handles the sub-tasks. I think if you want to handle them all as a group that acts as a single unit you need another layer of Task.Supervisor as suggested here: Waiting for multiple tasks in a genserver - #4 by al2o3cr
Within that supervisor you then handle the subtasks and “bubble up” the response to the original GenServer.
To build upon the proposed solution, I’d say you can batch :do_all_the_things
in one separate process. Ignoring task supervisor, this is a rough code.
def handle_call(:do_all, _from, state) do
urls = […]
gen_server_pid = self()
batch_pid = spawn(fn ->
tasks = Enum.map(urls, START_TASK_FOR_ONE_URL)
result = Task.await_many(tasks)
# You can include a kind of ID here too to distinguish batches
send(gen_server_pid, {:batch_result, result})
end)
# Keep batch_pid if you need it
{:reply, :ok, update_state(state, batch_pid)}
end
def handle_info({:batch_result, result}, state) do
# Process results
{:noreply, state}
end
You get the idea. You start a separate process that spawns processes to process each mini task, then gathers all results and sends them to gen server in one message.
3 Likes