A pmap implementation: any feedback?

Hello,
I have been playing with a pmap (parallel map) implementation, for a task that seems easy: to run a series of functions in parallel and gather their results. The “snag” is that I want to handle errors and timeouts out of a given deadline, all of them returning a results and not aborting the general computation or - worse - killing the parent.

I ended up with something like:

def parallel_map_ordered(enum, myFn, errorFn, timeout) do
    myWrappedFn = fn v ->
      try do
        myFn.(v)
      rescue
        err ->
          with Logger.error(
                 "Crashed pmap: #{Tools.ii(err)} on #{Tools.ii(__STACKTRACE__)} for input #{Tools.ii(v)}"
               ) do
            errorFn.(v, err)
          end
      end
    end

    tasks = Enum.map(enum, &Task.async(fn -> myWrappedFn.(&1) end))

    Task.yield_many(tasks, timeout)
    |> Enum.zip(enum)
    |> Enum.map(fn {{%Task{}, res}, orgval} ->
      case res do
        {:ok, v} ->
          v

        nil ->
          with Logger.error("Timed out pmap: for input #{Tools.ii(orgval)}") do
            errorFn.(orgval, :timeout)
          end
      end
    end)
  end

That you use by passing two functions:

  • one is the main function; it gets called for any element of the Enum and is supposed to return an element
  • one is the error handler, that gets called with the value that was supposed to be processed and the error or the kw :timeout, and returns a value to be inserted into the resulting collection

There is a maximum timeout, after which all computations are aborted.

You use it like this:

 test "simplex" do
      fnOk = fn v ->
        Process.sleep(v * 100)
        v * 100
      end

      fnErr = fn v, e ->
        {:error, v, e}
      end

      assert [100, 200, 300, 100, 200, 100, 200, 500, 100] =
               PsTools.parallel_map_ordered(
                 [1, 2, 3, 1, 2, 1, 2, 5, 1],
                 fnOk,
                 fnErr,
                 5000
               )
    end

This will work; if you set one of the values to (say) 500, the function will time out and terminate in 5 seconds, but all intermediate values will be preserved.

The implementaion is naive, meaning that there is no batching and everything will be run in parallel, so if you have a file reader and run one million in parallel, you will exhaust file descriptors. If you batch, every batch will be limited by the slowest operation, so again it is kind of meh. But it’s a start.

Now for my questions:

  • is there something that I overlooked in the handling? should i catch :exit in the rescue clause?
  • was there a simple way to do that using the Tasks module?

TIA

Is there a reason why Task.async_stream would not work? It seems to support all the things you’ve mentioned.

Including returning errors, yep.

Basically you can get away with just this:

defmodule Parallel do
  defp map_executor(func, item) when is_function(func, 1) do
    try do
      # No need to put `:ok` in values that ran successfully
      # because `Task.async_stream` already does it.
      func.(item)
    rescue
      err ->
        # TODO: Also do logging or APM reporting here.
        # Wrap the exception in an `:error` typle.
        {:error, err}
    end
  end

  defp ensure_ok_or_error_tuple({:exit, :timeout}), do: {:error, :timeout}
  defp ensure_ok_or_error_tuple({:ok, _} = ok), do: ok
  defp ensure_ok_or_error_tuple({:error, _} = err), do: err

  def map(list, func, timeout)
    when is_list(list) and is_function(func, 1)
    and is_integer(timeout) and timeout > 0 do
    Task.async_stream(
      list,
      fn(item) -> map_executor(func, item) end,
      timeout: timeout, ordered: true, on_timeout: :kill_task
    )
    |> Enum.map(&ensure_ok_or_error_tuple/1)
  end
end

So more or less: have an executor function where you can neatly take care of exceptions or any special cases before handing off to ensure_ok_or_error_tuple/1 (NOTE: I have not added a catch-all clause there and this is intentional; IMO you’d want the code to crash if you receive an unexpected result so you can add an extra clause to take care of it and not silently ignore it or reshape it into an error tuple that’s not informative; basically: don’t mask a potential bug).

Also depending on your scenario you might want these tasks supervised but the docs cover these cases well so I will not repeat them.

3 Likes

I tried that before my solution, but I had not seen on_timeout: :kill_task option, so it would abort the whole computation on timeout. Thank you!

Happens to the best of us, sometimes you just need a second pair of eyes for the final touches. And you seem to have done almost all of this already, so good job!

2 Likes

I wrote a small helper today which I eventually named async_map, a constrained version of Task.Supervisor.async_stream, if you will.

  # Helper function to run supervised tasks concurrently mapping results to the
  # desired output.
  #
  # - `async_fun` runs in a task process, takes a single enumerable item as
  # argument and must return {:ok, result} or {:error, reason}.
  # - `post_fun` runs in the calling process, takes a tuple of the result of
  # async_fun and the corresponding enumerable item as argument, and returns the
  # final result.
  defp async_map(enumerable, async_fun, post_fun, opts) do
    Task.Supervisor.async_stream(
      MyApp.TaskSupervisor,
      enumerable,
      async_fun,
      Keyword.put(opts, :on_timeout, :kill_task)
    )
    |> Enum.map(fn
      {:ok, result} -> result
      {:exit, :timeout} -> {:error, :timeout}
    end)
    |> Enum.zip(enumerable)
    |> Enum.map(post_fun)
  end

It converts timeouts into errors so that I have a single place to handle errors in post_func. It also gives post_func the original input, be it for logging or generating a fallback value.

I know about the :zip_input_on_exit option to Task.Supervisor.async_stream, but the advantage of doing it like this is that I have access to the input in every case of sucess/error/timeout.

Example usage:

1..10
|> async_map(
  fn n ->
    cond do
      n == 5 -> {:error, :bad_input}
      n == 7 -> {:ok, Process.sleep(10_000)}
      true -> {:ok, n*n}
    end
  end,
  fn
    {{:ok, sq}, n} -> "#{n}*#{n} = #{sq}"
    {{:error, reason}, n} -> "Error on #{n}: #{inspect(reason)}"
  end,
  max_concurrency: 5,
  timeout: 500
)

# returns:
# ["1*1 = 1", "2*2 = 4", "3*3 = 9", "4*4 = 16", "Error on 5: :bad_input",
#  "6*6 = 36", "Error on 7: :timeout", "8*8 = 64", "9*9 = 81", "10*10 = 100"]
3 Likes