Processing many tasks concurrently and aggregating results

I’m looking to process a bunch of tasks concurrently while not blocking the current process. I’d also like to aggregate the results into an object. I’d like the result to understand when an error occurred in the task, but instead of crashing when this happens, instead just add a note to the result indicating an error occurred.

For the purpose of making the problem simple, I’ve created a dummy “Calculator” which simulates a long running calculation.

defmodule Calculator do
  def divide(val) do
    # Simulating long calculation
    :timer.sleep(1000)
    100/val
  end
end

I can send in 0 to this calculator to simulate an error.

I’ve implemented this behavior using a GenServer and TaskSupervisor.async. See code below:

defmodule Processor do
  use GenServer

  def start_link() do
     GenServer.start_link( __MODULE__, [])
  end

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  def handle_cast({:compute_values, values}, state) do
    Enum.each(
      values,
      fn(val) ->
        # Streams.TaskProcessor is a Task Supervisor supervised by application
        Task.Supervisor.async_nolink(Streams.TaskSupervisor, fn() ->
          Calculator.divide(val)
        end)
      end
    )

    {:noreply, state}
  end

  # For handling errors
  def handle_info({:DOWN, _, _, _, {_error_type, _error_stacktrace}}, state) do
    {:noreply, state ++ [:error]}
  end

  # For handling normal down states, do nothing
  def handle_info({:DOWN, _, _, _, :normal}, state) do
    {:noreply, state}
  end

  # For handling successful messages sent
  def handle_info({ref, calculated_value}, state) do
    {:noreply, state ++ [calculated_value]}
  end
end

If I then call:

{:ok, p} = Processor.start_link
GenServer.cast(p, {:compute_values, [10, 0, 20, 5]})

And then after three seconds call:

GenServer.call(p, :get_state)

I get the expected result:

[10.0, 5.0, 20.0, :error]

It seems like this works, but I’m wondering:

  • Is anything that I’m doing here a bad practice? I’m new to the elixir :slight_smile:

  • Is there an easier way to achieve this? Seems like a lot of boilerplate.

    • When I saw Task.Supervisor.async_stream_nolink released in Elixir 1.4 I thought it may be helpful, but am having a hard time understanding if I could use it to achieve a similar result.

Thanks for any help!

You could use GenStage/Flow, like this:

defmodule Run do
  alias Experimental.Flow

  def run do
    # start with list or stream of values
    Flow.from_enumerable((1..20))
    # distribute into :stages processes
    |> Flow.partition(max_demand: 2, stages: 5)
    # run a function over each value
    |> Flow.map(&process/1)
    # get back the list of values
    |> Enum.to_list()
  end

  def process(n) when rem(n,5) == 0, do: {:error, "#{n} is divisible by 5!"}
  def process(n), do: {:ok, n * 2}
end

Run.run

This will produce something like

[ok: 2, error: "5 is divisible by 5!", ok: 4, ok: 6, ok: 8, ok: 12, ok: 16,
 ok: 28, ok: 14, error: "10 is divisible by 5!", ok: 26, ok: 18,
 error: "15 is divisible by 5!", ok: 34, ok: 22, ok: 24, ok: 38,
 error: "20 is divisible by 5!", ok: 32, ok: 36]

As you can see the result is in different order - items were processed in parallel by two stages.

Note the Run.run call will block until all items are processed.

Links

4 Likes

Hi teamon,

Flow looks interesting, and this piece of code is a nice example, but it doesn’t satisfy all of my requirements:

  • Instead of crashing when an exception occurs, the aggregate result should just note an error.
    • Your example works when your function returns {:error, “{x} is divisible by 5”}, but when a real non-caught exception occurs (i.e. 5/0), it seems to crash.
    • The computation example was just to make things simple, the real application will deal with API requests which can raise a variety of errors.
  • I’d like the results to be executed and aggregated while not blocking the current process which triggers the action.

Do you have additional ideas given the requirements?

Thanks for your help,
Steve

1 Like

This problem sounds the same as that solved by free monads mentioned in this thread. These blog posts about monads and error handling in Elixir might help too: 1, 2

@teamon’s example with Flow is pretty awesome! And thank you for the reading materials :003:

Thanks for all the help and resources uranther & teamon – much appreciated!

Free Monads seem helpful for the error catching. Exceptional also seems like a great package to use for error handling.

If anyone has time – I’d love any additional feedback on not blocking the current process. One idea is to use GenServer handle_cast to trigger the Flow computations and aggregate the results. I’m curious as to if there is a better or more concise way.

EDIT: It looks like the “Supervisable flows” documented in https://hexdocs.pm/gen_stage/Experimental.Flow.html may get me closer towards this. I’ll read up on this.

Thanks again folks,
Steve

1 Like

As you can read in https://semaphoreci.com/blog/2016/11/24/how-to-capture-all-errors-generated-by-a-function-call-in-elixir.html there are quite a few different kind of errors. If you want to catch 5/0 errors you need to wrap the process function inside try/rescue block and return {:error, …} instead. It all depends on what kind of errors you expect.

Flow.run is the only blocking function call here, you could do

myflow = Flow.from_enumerable(...) |> Flow.partition(...) |> ...
pid = spawn_link Flow, :run, [myflow] # run in separate process

This way it won’t block and in case or error the calling process will be notified. If the calling process is short-lived, e.g. a HTTP request handler inside Plug/Phoenix then it is better to use https://hexdocs.pm/elixir/Task.Supervisor.html#start_child/4

1 Like

It is awesome, all of expede’s libraries are awesome. :slight_smile:

1 Like

For anyone who is reading this and looking to solve a similiar problem, I also found https://github.com/eproxus/parallel - a parallel implementation for Enum. Combining this with https://github.com/expede/exceptional should solve my problem.

2 Likes