Wait for tasks to finish inside a `Task.async_stream`` in case of an error

Hi there,

Sorry for the long post.
I’m currently using Task.async_stream to process a list and do external API requests, the requests, if done twice will throw an error on the remote side (unique constraints). What each task is doing is:

  1. check if we have a local reference on the DB
  2. if not, do an API request and create it on the remote API
  3. create the row on the local DB and set the remote ID

now if we’re processing 10 elements at a time, if one of those crashes they all get killed due this.
Since the API request is the longest part of the process it’s probable that with 10 concurrent processes it will create 10 rows on the remote APi so now I have 10 items that will error next time I process the same list.

Is there a way to let the currently running processes finish if one of those crashes instead of killing them?

I’ve tried to use a try/rescue/catch with a simple example to gracefully stop a stream while not killing the currently running processes but as soon as one of those return an error it stops processing it:

defmodule Async do
  def map(enum, fun, options) do
    enum
    |> Task.async_stream(
      fn el ->
        try do
          {:ok, fun.(el)}
        rescue
          error -> {:raised, error}
        catch
          error -> {:catched, error}
        end
      end,
      options
    )
    |> Stream.map(&Kernel.elem(&1, 1))
    |> Stream.map(&IO.inspect/1)
    |> Stream.take_while(&Kernel.match?({:ok, _}, &1))
    |> Stream.map(fn {:ok, result} -> result end)
  end
end

Async.map(
  [4, 3, 2, 5],
  fn el ->
    IO.puts("Sleeping for #{el} seconds")
    :timer.sleep(el * 1000)
    IO.puts("Done sleeping for #{el} seconds")

    if el == 2 do
      raise RuntimeError
    end

    el
  end,
  max_concurrency: 2,
  timeout: 10_000
)
|> Stream.run()

shows:

Sleeping for 4 seconds
Sleeping for 3 seconds
Done sleeping for 3 seconds
Sleeping for 2 seconds
Done sleeping for 4 seconds
{:ok, 4}
{:ok, 3}
Sleeping for 5 seconds
Done sleeping for 2 seconds
{:raised, %RuntimeError{message: "runtime error"}}

so you can see it doesn’t wait until the 5 is done sleeping.

I’ve tried another approach using spawn:

defmodule Parallel do
  def map(list, fun, options \\ []) do
    max_concurrency = Keyword.get(options, :max_concurrency, System.schedulers_online() * 2)

    monitor_pid = self()
    Process.flag(:trap_exit, true)

    running_pids =
      list
      |> Enum.take(max_concurrency)
      |> Enum.with_index()
      |> Enum.map(fn {el, index} ->
        spawn_process(monitor_pid, index, el, fun)
      end)

    monitor_loop(:cont, list, [], monitor_pid, running_pids, max_concurrency, fun)
  end

  defp monitor_loop(:halt, running_pids, error, stacktrace) do
    receive do
      {:result, pid, _, _} ->
        running_pids = running_pids -- [pid]

        if Enum.count(running_pids) == 0 do
          IO.puts("No more running pids, we can safely raise exception")
          reraise(error, stacktrace)
        else
          IO.puts("There are running pids, waiting on them and then raise")
        end

      {:EXIT, _, :normal} ->
        monitor_loop(:halt, running_pids, error, stacktrace)

      {:EXIT, pid, _} ->
        IO.puts("Got error from pid #{inspect(pid)}")
        running_pids = running_pids -- [pid]

        if Enum.count(running_pids) == 0 do
          IO.puts("No more running pids, we can safely raise exception")
          throw(error)
        else
          IO.puts("There are running pids, waiting on them and then raise")
          monitor_loop(:halt, running_pids, error, stacktrace)
        end
    end
  end

  defp monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun) do
    IO.puts(
      "Got monitor_loop call list #{inspect(list)} results #{inspect(results)} monitor_pid #{inspect(monitor_pid)} running_pids #{
        inspect(running_pids)
      } next index #{next_index}"
    )

    receive do
      {:result, pid, index, result} ->
        IO.puts("Got result from index #{index}: #{inspect(result)}")
        running_pids = running_pids -- [pid]
        results = [{index, result} | results]

        cond do
          next_index == Enum.count(list) and Enum.count(running_pids) == 0 ->
            IO.puts("No new elements to add, reached end")

            results
            |> Enum.sort_by(&Kernel.elem(&1, 0))
            |> Enum.map(&Kernel.elem(&1, 1))

          next_index == Enum.count(list) and Enum.count(running_pids) > 0 ->
            IO.puts("Still some processes running, restarting loop")
            monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun)

          true ->
            IO.puts("New elements to add, spawning next")
            pid = spawn_process(monitor_pid, next_index, Enum.at(list, next_index), fun)

            monitor_loop(
              :cont,
              list,
              results,
              monitor_pid,
              [pid | running_pids],
              next_index + 1,
              fun
            )
        end

      {:EXIT, _, :normal} ->
        monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun)

      {:EXIT, pid, {error, stacktrace}} ->
        IO.puts("Got error from pid #{inspect(pid)}")
        running_pids = running_pids -- [pid]

        if Enum.count(running_pids) == 0 do
          IO.puts("No more running pids, we can safely raise exception")
          reraise(error, stacktrace)
        else
          IO.puts("There are running pids, waiting on them and then raise")
          monitor_loop(:halt, running_pids, error, stacktrace)
        end
    end
  end

  defp spawn_process(monitor_pid, index, el, fun) do
    spawn_link(fn ->
      send(monitor_pid, {:result, self(), index, fun.(el)})
    end)
  end
end
  defp spawn_process(monitor_pid, index, el, fun) do
    spawn_link(fn ->
      send(monitor_pid, {:result, self(), index, fun.(el)})
    end)
  end
end

Parallel.map(
  [4, 3, 2, 5],
  fn el ->
    IO.puts("Sleeping for #{el} seconds")
    :timer.sleep(el * 1000)
    IO.puts("Done sleeping for #{el} seconds")

    if el == 2 do
      raise RuntimeError
    end

    el
  end,
  max_concurrency: 2
)
|> Stream.run()

which seems to work on the test file:

Sleeping for 4 seconds
Sleeping for 3 seconds
Got monitor_loop call list [4, 3, 2, 5] results [] monitor_pid #PID<0.91.0> running_pids [#PID<0.96.0>, #PID<0.97.0>] next index 2
Done sleeping for 3 seconds
Got result from index 1: 3
New elements to add, spawning next
Sleeping for 2 seconds
Got monitor_loop call list [4, 3, 2, 5] results [{1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.98.0>, #PID<0.96.0>] next index 3
Got monitor_loop call list [4, 3, 2, 5] results [{1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.98.0>, #PID<0.96.0>] next index 3
Done sleeping for 4 seconds
Got result from index 0: 4
New elements to add, spawning next
Sleeping for 5 seconds
Got monitor_loop call list [4, 3, 2, 5] results [{0, 4}, {1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.99.0>, #PID<0.98.0>] next index 4
Got monitor_loop call list [4, 3, 2, 5] results [{0, 4}, {1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.99.0>, #PID<0.98.0>] next index 4
Done sleeping for 2 seconds
Got error from pid #PID<0.98.0>
There are running pids, waiting on them and then raise

11:34:24.114 [error] Process #PID<0.98.0> raised an exception
** (RuntimeError) runtime error
    text.exs:120: anonymous fn/1 in :elixir_compiler_0.__FILE__/1
    text.exs:107: anonymous fn/4 in Parallel.spawn_process/4
Done sleeping for 5 seconds
No more running pids, we can safely raise exception
** (RuntimeError) runtime error
    text.exs:120: anonymous fn/1 in :elixir_compiler_0.__FILE__/1
    text.exs:107: anonymous fn/4 in Parallel.spawn_process/4

however when actually using it in my code (that internally uses other processes, caches etc) it seems to still crash including all the other running processes.

Ignoring my attempt, is there a way to achieve this? Hopefully with a small change on how I use Task.async_stream?

2 Likes

Maybe You can use supervised tasks this time without linking them to caller process?
If you don’t want to link the caller to the task, then you must use a supervised task with Task.Supervisor and call Task.Supervisor.async_nolink/2
https://hexdocs.pm/elixir/Task.html#async/3-linking

1 Like

Example with Task.Supervisor.async_stream_nolink and Task.Supervisor.async_nolink

code
explanation

4 Likes

Thank you both, I’ll read through that, In my idea using nolink would have just skipped those who failed instead of not starting future processes, I’ll read it better