Task async finish before await causes crash?

I have a process which collects data in four batches. After each bach the data is analyzed.

Something like this

tasks = []
Enum.each(0..4, fn(_x) ->
  get_data()
  tasks = [tasks | Task.async(analyze_data())]
end)

Task.await_many(tasks)
do_other_stuff()

However, this results in handle_info errors if the task finishes before coming to await_many.

What is the proper way to do this?

1 Like

What handle_info errors? If your process is not doing anything else in between starting the tasks and calling Task.await_many, I don’t see how it could error as it doesn’t even attempt to receive other messages. Can you share the errors and your whole code?

2 Likes

Since @Nicd didn’t point it out, this code is not even valid.
tasks will always be [], you can’t assign to it from inside the each.
You need to Enum.map and assign the result of that to tasks.

(You are also mixing up the order when prepending, you are prepending the full list before the result of Task.async on each iteration)

2 Likes

Ok, I was in a hurry and tried to simplify and it also resulted in an invalid code :slight_smile:

Let me try again.

  • I have a process Controller which waits for a signal and starts acquiring the data.
  • After the data has been acquired, a task to analyze the data is fired and the process continues acquiring another batch of data.
  • The Controller has a state which keeps track of all the fired tasks as a list (which is prepending the task, not the list :slight_smile: )
  • After four batches of data are acquired, the system has to get all the analysis data and do something with it. Therefore, it has the await_many() in the end.

Since the analysis task is not a very long task, it will be done during acquisition of next batch of data…long before the await_many() comes into play.

The crashes I have mentioned come from unhandled infos carrying the following messages:

  1. {#Reference<0.960932078.1211105281.139068>, :ok}
  2. {:DOWN, #Reference<0.960932078.1211105281.139068>, :process, #PID<0.1184.0>, :normal}

The second one comes if I implement handle_info for the first one. This seemed a bit strange that I would have to add handle_info so I wanted to check what is the correct way to do this

1 Like

Right, I think I understand what’s going on here. When you say: "process Controller", I can only assume you mean a GenServer with its own managed state that handles data acquisition and delegates the process of analysing this data off to separate processes (i.e. Task).

Task is very powerful, but it’s important to note that using Task.async/1 and variants effectively link the running task’s process to the callee (not directly, but via other methods). handle_info/2 is invoked in your Controller module to tell you that your task is complete.

Actually, if we check the docs on Task.await/1, there’s a code snippet that looks close to your requirements: Task — Elixir v1.12.3

In the GenServerTaskExample, we can see how it stores the task reference into the GenServer state here. With an extra bit of effort, I think you should be able to modify it to fit with your requirements.

1 Like

When a task finishes its work it sends two messages to the caller, one with the task reference and the result of the task, the second is a :DOWN message from Process.monitor. Task.await and friends abstract this away and handle both messages for you, returning the result. Even if you don’t call Task.await the messages are still in your inbox. In a gen_server if you do not call Task.await before exiting the callback and returning to the loop then those messages will arrive in your inbox and be forwarded to handle_info. So, if you are calling Task.async from a gen_server and not awaiting it before returning to the loop, you must handle those messages or your process will crash. You only have to handle the first one and then demonitor the task and flush the :DOWN message from your inbox.

def handle_info({ref, result}, state) when is_reference(ref) do
  Process.demonitor(ref, [:flush])
  handle_result(result)
  {:noreply, state}
end
1 Like

Thank you @beepbeepbopbop and @msimonborg, those are very helpful insights! One thing which I am not sure now is - should I still have the await after the last batch of data is acquired?

I need to make sure that all the analyses are done before proceeding.

2 Likes

The messages will be arriving whether you are ready to move forward with them or not, so you will have to handle the messages as they come and store the data in the process state or an ETS table. In that case you probably would not use Task.await at all and handle the batching logic as you receive the results in your hanlde_info. I don’t know how you can tell when your processing is complete, but a naive implementation could be something like this

def handle_info({ref, result}, state) when is_reference(ref) do
  Process.demonitor(ref, [:flush])
  data = [result | state.data]

  new_state =
    case length(data) do
      5 ->
        # batch is full, move to next step and clear the cache
        handle_data(data)
        %{state | data: []}

      _ ->
        # save the cache until more work is done
        %{state | data: data}
    end

  {:noreply, new_state}
end

As was said above by @beepbeepbopbop , you may consider storing the task references in the process state and clearing them as you receive the results. And if the work is critical you can consider storing the data in an ETS table owned by another process in case your gen_server crashes and wipes the state clean :slightly_smiling_face:

1 Like

On the last point, it’s worth noting that even if your gen_server is performing very simple work as the above with low likelihood of failure, a crash could come from outside the process as you are linking to your tasks with Task.async. If there’s a chance your task could crash then your gen_server goes with it and you lose all the work you’ve done. So you may need to either:

  • Trap exits with Process.flag(:trap_exit, true) and match on the exit signals if a task crashes. See Process.exit
  • Supervise your tasks and use Task.Supervisor.async_nolink
  • Don’t store anything in the process state that you can’t afford to lose
1 Like