Getting result of Task.async from a process that didn't initiate it?

So I have a scenario where a given request will require about 5-6 different stages to happen, which are returned to the user as they are completed. But there’s a catch that the two tasks that constrain the total time of the request, are sequential. Here’s an example.

  • REQ: Image comes in.
    1. Initiate ML model that does conversion of image
    1. Various smaller tasks that provide some analysis of the initial photo that are sent back to the user and displayed as they wait
    1. Initial conversion done. Start reverse conversion task(input is the result of conversion 1)
    1. Send initial converted image to user.
    1. Again various tasks of analysis of conversion 1
    1. Second conversion is done. Send to user
    1. Analysis of conversion 2, send to user

Currently the process that orchestrates all this starts an async task for the ML, but there are scenarios where due to the stuff in step 2, the start of the second conversion happens some amount of time after the completion of the first conversion.

I have tried changing it to a setup where the task that does the first conversion, also starts a task for the second conversion, and returns the results from the first, and the task for the second. That way conversion 2 starts at the earliest opportunity, and is not constrained by the main orchestrating task to always have completed those subtasks already.

While this seemed like a good idea in theory, the issue is the main process can’t await the task that was started by initial conversion Task.async call.

** (ArgumentError) 
task %Task{mfa: {:erlang, :apply, 2}, owner: #PID<0.1410.0>, pid: #PID<0.1423.0>, ref: #Reference<0.497699279.469565443.66976>} 
must be queried from the owner but was queried from #PID<0.1407.0>

I could setup a GenServer that is spun up to manage starting and awaiting the completion of both conversions, but that the main process could still query when it’s ready for the first conversion’s completion, but this sounds like it increases the complexity quite a bit from the current Task.async implementation.

I’m curious if there’s a way for my first idea to work, maybe sort of like the first Task.async that is starting the second task as if on the behalf of the main task. I’ve looked that the “$ancestors” and “$callers” and can see the difference there, but get the feeling that any manipulation of those won’t actually solve my issue, as that task parent PID is probably set at creation.

Any input? Is this possible with Task, or is a custom GenServer implementation the only route forward?

FYI I’m currently using Task.Supervisor.async, rather than Task.async if that makes a difference.

Hey @pejrich the general idea here is that you pass in a reference from the “listening” process, and then have the task send that listening process a message tagged with that reference eg:

# in the process that's listening
ref = make_ref()

TaskRunnerThing.kick_off_tasks(ref)

# or do in handle_info if you're a genserver
receive do
  {:result, ^ref, result} -> # stuff
end


# in task runner

def kick_off_tasks(ref) do
  listener_pid = self()

  Task.Supervisor.start_child(super, fn ->
     result = compute_result(stuff)
     send(listener_pid, {:result, ref, result})
  end)
end
3 Likes

I’d suggest looking at Registry to coordinate larger sets of processes using well known names, which is more flexible than the default local registry. Also at a certain point I’d evaluate if you didn’t happen to outgrow the abstraction of Task.

1 Like

Thanks, Ben! This is great. I so rarely use send/2 directly that I didn’t think of it, but it makes so much sense to use it here from inside the task.

The only change I made was calling Task.Supervisor.async instead of start_child as start_child was working fine, but giving me these warnings:

unhandled info {#Reference<...>, {:ml_response, #Reference<...>, %{data: ...}}
unhandled info {:DOWN, #Reference<...>, :process, #PID<0.1934.0>, :normal}

I assume because I was handling the message in the explicit receive call, rather than in the Channel’s handle_info, but from a code clarity perspective(at in fitting with the rest of steps) explicitly awaiting, sending the data to the client, then moving to the next piece is cleaner than trying to handle that part in handle_info for just these subset of the steps, but switching the call to async removed those warnings.

It’s noticeably completing the stages faster now that the second ML task is always starting at the earliest moment. Thanks again!

I would definitely consider using start_child with an explicit Process.monitor from the caller, because you need a way for the caller to find out that the task has crashed. Right now you’re waiting for a result, but if the task crashes for some reason you’ll never get a result, so you need to watch for a non normal :DOWN message.