Processing many tasks concurrently and aggregating results

I’m looking to process a bunch of tasks concurrently while not blocking the current process. I’d also like to aggregate the results into an object. I’d like the result to understand when an error occurred in the task, but instead of crashing when this happens, instead just add a note to the result indicating an error occurred.

For the purpose of making the problem simple, I’ve created a dummy “Calculator” which simulates a long running calculation.

defmodule Calculator do
  def divide(val) do
    # Simulating long calculation

I can send in 0 to this calculator to simulate an error.

I’ve implemented this behavior using a GenServer and TaskSupervisor.async. See code below:

defmodule Processor do
  use GenServer

  def start_link() do
     GenServer.start_link( __MODULE__, [])

  def handle_call(:get_state, _from, state) do
    {:reply, state, state}

  def handle_cast({:compute_values, values}, state) do
      fn(val) ->
        # Streams.TaskProcessor is a Task Supervisor supervised by application
        Task.Supervisor.async_nolink(Streams.TaskSupervisor, fn() ->

    {:noreply, state}

  # For handling errors
  def handle_info({:DOWN, _, _, _, {_error_type, _error_stacktrace}}, state) do
    {:noreply, state ++ [:error]}

  # For handling normal down states, do nothing
  def handle_info({:DOWN, _, _, _, :normal}, state) do
    {:noreply, state}

  # For handling successful messages sent
  def handle_info({ref, calculated_value}, state) do
    {:noreply, state ++ [calculated_value]}

If I then call:

{:ok, p} = Processor.start_link
GenServer.cast(p, {:compute_values, [10, 0, 20, 5]})

And then after three seconds call:, :get_state)

I get the expected result:

[10.0, 5.0, 20.0, :error]

It seems like this works, but I’m wondering:

  • Is anything that I’m doing here a bad practice? I’m new to the elixir :slight_smile:

  • Is there an easier way to achieve this? Seems like a lot of boilerplate.

    • When I saw Task.Supervisor.async_stream_nolink released in Elixir 1.4 I thought it may be helpful, but am having a hard time understanding if I could use it to achieve a similar result.

Thanks for any help!

You could use GenStage/Flow, like this:

defmodule Run do
  alias Experimental.Flow

  def run do
    # start with list or stream of values
    # distribute into :stages processes
    |> Flow.partition(max_demand: 2, stages: 5)
    # run a function over each value
    # get back the list of values
    |> Enum.to_list()

  def process(n) when rem(n,5) == 0, do: {:error, "#{n} is divisible by 5!"}
  def process(n), do: {:ok, n * 2}

This will produce something like

[ok: 2, error: "5 is divisible by 5!", ok: 4, ok: 6, ok: 8, ok: 12, ok: 16,
 ok: 28, ok: 14, error: "10 is divisible by 5!", ok: 26, ok: 18,
 error: "15 is divisible by 5!", ok: 34, ok: 22, ok: 24, ok: 38,
 error: "20 is divisible by 5!", ok: 32, ok: 36]

As you can see the result is in different order - items were processed in parallel by two stages.

Note the call will block until all items are processed.



Hi teamon,

Flow looks interesting, and this piece of code is a nice example, but it doesn’t satisfy all of my requirements:

  • Instead of crashing when an exception occurs, the aggregate result should just note an error.
    • Your example works when your function returns {:error, “{x} is divisible by 5”}, but when a real non-caught exception occurs (i.e. 5/0), it seems to crash.
    • The computation example was just to make things simple, the real application will deal with API requests which can raise a variety of errors.
  • I’d like the results to be executed and aggregated while not blocking the current process which triggers the action.

Do you have additional ideas given the requirements?

Thanks for your help,

1 Like

This problem sounds the same as that solved by free monads mentioned in this thread. These blog posts about monads and error handling in Elixir might help too: 1, 2

@teamon’s example with Flow is pretty awesome! And thank you for the reading materials :003:

Thanks for all the help and resources uranther & teamon – much appreciated!

Free Monads seem helpful for the error catching. Exceptional also seems like a great package to use for error handling.

If anyone has time – I’d love any additional feedback on not blocking the current process. One idea is to use GenServer handle_cast to trigger the Flow computations and aggregate the results. I’m curious as to if there is a better or more concise way.

EDIT: It looks like the “Supervisable flows” documented in may get me closer towards this. I’ll read up on this.

Thanks again folks,

1 Like

As you can read in there are quite a few different kind of errors. If you want to catch 5/0 errors you need to wrap the process function inside try/rescue block and return {:error, …} instead. It all depends on what kind of errors you expect. is the only blocking function call here, you could do

myflow = Flow.from_enumerable(...) |> Flow.partition(...) |> ...
pid = spawn_link Flow, :run, [myflow] # run in separate process

This way it won’t block and in case or error the calling process will be notified. If the calling process is short-lived, e.g. a HTTP request handler inside Plug/Phoenix then it is better to use

1 Like

It is awesome, all of expede’s libraries are awesome. :slight_smile:

1 Like

For anyone who is reading this and looking to solve a similiar problem, I also found - a parallel implementation for Enum. Combining this with should solve my problem.