Task.async_stream on all nodes


I use Task.async_stream to execute tasks on an extremely long stream of elements. Basically I use it as a pmap, except that I don’t care about the results, just the side effects of the tasks.

My question is: can I spawn those tasks in the cluster, utilizing all the available nodes? Can Task.Supervisor.async_stream somehow start tasks under more than one supervisors (started one supervisor on all the nodes) in a round robin fashion or select a node by a given hash function?

1 Like

Do you care that the side effects succeed? If you want retries and so on you probably want something like Oban.


Yes, I’d like to know if every Task terminated :normally.

Oban uses PostgreSQL. That is not an available option in my case.

Actually, I came up with a solution that is not yet perfect, but I believe this is the right direction. Imagine a cluster where all nodes have a local Task.Supervisor with a locally registered name :task_supervisor_local_name, and also a gen server like this one:

defmodule SupDispatcher do
  use GenServer

  @retry_count 3

  def start_link(_ \\ nil), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
  def init(_), do: {:ok, @retry_count}

  def handle_call(
        msg = {:start_task, [_task_owner, _callers, _monitor, {_m, _f, a}], _restart, _shutdown},
      ) do

    nodes = Node.list([:visible, :this])
    index = :erlang.phash2(a, length(nodes))
    drawn_node = Enum.at(nodes, index)

    try do
      GenServer.call({:task_supervisor_local_name, drawn_node}, msg)
      _, _ when retry_count > 0 ->
        handle_call(msg, from, retry_count - 1)

      x, y ->
        {:reply, {:error, {:exception, x, y}}, @retry_count}
      {:ok, pid} ->
        {:reply, {:ok, pid}, @retry_count}

      _ when retry_count > 0 ->
        handle_call(msg, from, retry_count - 1)

      error ->
        {:reply, error, @retry_count}

The intented usage of this is that when I call Task.Supervisor.async_stream_nolink on any node, I actually use the above GenServer instead of a Task.Supervisor. The gen server accepts the :start_task message, and chooses a node (based on the arguments of the given call) with phash2. Then it simply forwards the request to the Task.Supervisor on the selected node. The remote Task.Supervisor starts the task, returns the pid of the task to the GenServer, and then the GenServer returns the same pid to the async_stream_nolink as if it was a supervisor. This seems to work fine. The load is distributed across the non-hidden nodes, and it can even handle nodes coming and going. Except that I do not know if the spawned task was finished. It only knows that the task was successfully started.

Is there any other problem with this approach?

Is it possible to fix this and restart failed tasks?

see Distributed tasks and tags - The Elixir programming language

Thanks, that’s nice, but the problem I’m trying to solve is somewhat different than that.

My problem: there’s a huge stream of items (millions of things) that has to be processed, one by one. I want to use something like Task.async_stream with option :max_concurrency set to someting reasonable, that spawns tasks for all elements of the stream but also limits concurrency. This is a well known pattern called pmap. The only problem with this approach is that all tasks run on the same node. So, switching to Task.Supervisor.async_stream instead of Task.async_stream is a bit better, since it has a single supervisor argument. (Now I can start those tasks on any node if there’s a Task.Supervisor on it.) It starts all tasks under that (possibly remote) supervisor. This is better, but still not spawning the processes across the whole cluster. Then comes the above GenServer into the picture. It is basically a fake supervisor. Task.Supervisor.async_stream can talk to this fake supervisor and ask it to start tasks. The fake supervisor then spawns tasks under real Task.Supervisor-s on different nodes, and reports back the pid to async_sream just like a normal supervisor would do.

One little problem with this is that when a task is started, but fails for some reason, there’s no way to know about the failure. All I can guarantee is that the task was successfully started, but who knows if it finished normally or not. Maybe using :transient restart strategy would help in some cases, but it will definitely not help when a whole node goes down with unfinished tasks on it.

Maybe there are some other drawbacks of this solution I am not aware of. I don’t know.

I wanted to ask the community if this is the right direction? Is it possible to ask async_stream to restart failing tasks somehow? Maybe my fake supervisor should keep track of the tasks by monitoring them, and when one fails, it should restart it on another node? Maybe I should use GenFlow instead of this hackie solution?

Do you need cross node orchestration or could all just pull from the same queue? Because for the latter I’d go for a central queue (e.g. sqs) and then run broadway on all your nodes. Broadway does handle error detection/reporting.