How does Flow and Task work in Elixir and how to run a function on all cores

Hi, I’m pretty new to Elixir.

I am currently rewriting an algorithm I wrote in c++ to Elixir. I wrote this algorithm in C++ sequentially and parallel.

The sequential code looks like this in an abstract way:

int algorithm(p_value, depth) {
  //some_preperation
for(int i = 0; i < N; ++i){
  //do_some_stuff
  if(condition1) {
   return 1
  } else {
     result += algorithm(new_value, depth+1);
  }
}
}
 

Based on the C++ code, I then rewrote the algorithm in Elixir. The sequential algorithm works.

The Code looks lile this in an abstract way:

def algorithm(p_value, depth) do
    #some_preparation
     Enum.reduce_while(0..N-1,0, fn x, acc ->
       #do_some_stuff
       if(condition1) do
          {:cont, acc + algorithm(new_value, depth+1)} #I want to parallelize the recursion
       else 
           {:halt, 1}
        
        end
      end)
  end

So far, everything’s okay.

Now let’s talk about the parallel variation. In C++ I used the TBB library for this. How the library works is unimportant here. It is only important to understand at which position I want to parallelize and that is at the recursive call of the methods.

I would like to transfer this thought to Elixir.

First of all the C++ code in an abstract form:

int algorithm(p_value, depth) {
  task_list list;
  unsigned int results[N]; 
  count = 0; ##to count the number of tasks
  ##some_preperation

  for(int i = 0; i < N; ++i){
    ##do_some_stuff
    if(condition1) {
      ++*this->currentResult; ## instead of returning 1 
    } else {
       list.push_back(*new(allocate_child())algorithm(new_value, depth + 1, &results[i])); ##create a task for the recursion and insert it to my list
       ++count;
    }
}
##when the foor-loop finished --> iterate through tasks -> spawn and wait for them -->get the results of the tasks
 if(count > 0){
        set_ref_count(count+1);
        spawn_and_wait_for_all(list);

        for(int i = 0; i < N; ++i){
            *this->currentResult+= results[i];
        }
    }
}

With the help of the Elixir forum I came up with 2 ways to parallel this.

With the help of Flow and Task. I have read the documentation thoroughly and I think I know how Flow and Task works.
I have applied both to my algorithm.

The code using Flow looks like this:

def algorithm(p_value, depth) do
    #some_preparation
     #a_stream#
     |> Flow.from_enumerable()
     |> Flow.reduce(fn -> [] end, fn x, acc ->
        #do_some_stuff
        if(condition1) do
          acc ++ [algorithm(new_value, depth+1)] 
        else 
          [1]
        end
     end)
    |> Enum.to_list()
    |> Enum.sum()
end

The code using Task looks like this:

def algorithm(p_value, depth) do
    #some_preparation
     Enum.reduce_while(0..N-1,[], fn x, acc ->
       #do_some_stuff
       if(condition1) do
          acc ++ [Task.async(fn -> algorithm(new_value, depth+1) end)] #concatinate everything in an array
       else 
           [1]
        end
      end)
     |> Enum.map(fn(task) ->
          if(Kernel.is_number(task)) do
            task
          else
            Task.await(task)
          end
        end)
  end

My question or my problem is this:

The Flow variant works. The result of the algorithm is correct. However, I have the feeling that it does not run as parallel as I had imagined. This variant with Flow is slower than my sequential variant. Does it really run parallel or do I still have to specify that the function should run on all cores.

The variant with task runs for a small N sometimes. Sometimes it works and sometimes my algorithm runs endlessly and doesn’t stop and that for a small N!
For large N it does not work , because then the following error message appears


[error] Task #PID<0.10596.398> started from #PID<0.6853.403> terminating
** (SystemLimitError) a system limit has been reached
    :erlang.spawn_link(:proc_lib, :init_p, [#PID<0.6853.403>, [#PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>, #PID<0.62.0>], Task.Supervised, :reply, [{:nonode@nohost, #PID<0.6853.403>, #PID<0.6853.403>}, [#PID<0.6853.403>, #PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>], :nomonitor, {:erlang, :apply, [#Function<8.131722887/0 in LangfordSequence.langfordSequence_task/2>, []]}]])
    (stdlib 3.8) proc_lib.erl:102: :proc_lib.spawn_link/3
    (elixir 1.10.3) lib/task/supervised.ex:14: Task.Supervised.start_link/4
    (elixir 1.10.3) lib/task.ex:422: Task.async/3
    (pva 0.1.0) langfordSequence.ex:69: anonymous fn/4 in LangfordSequence.langfordSequence_task/2
    (elixir 1.10.3) lib/enum.ex:2111: Enum."-reduce/3-lists^foldl/2-0-"/3
    (pva 0.1.0) langfordSequence.ex:65: LangfordSequence.langfordSequence_task/2
    (elixir 1.10.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
Function: #Function<8.131722887/0 in LangfordSequence.langfordSequence_task/2>

Task spawns at least one process per task. As you create more and more tasks from within each task, the number of process explodes.

Flow will create a process for each partition in each stage of the flow. You start new flows from within a flow. The number of processes explodes.

There is a maximum of processes per instance of the BEAM. Per default this limit is ~260000. There is a chapter in the manual giving a tabular overview about system limits.

Also the random fails for small N might be because flaky runtimes and early tasks timing out because of that, but that is just gueswork, as you haven’t told us the error message.

1 Like

There is one big problem with stuff you are trying to do:

Processes in Erlang aren’t meant for parallelism

It is just the nice side effect, not the reason for their existence. Processes in Erlang are most importantly used for failure containment, so failure in single process should not cause issues in unrelated processes. Just like phone calls in “Erlang The Movie” or separate connections to your web application. Concurrency (which is completely different from parallelism) is a thing that was also needed for processes to work nicely in soft real-time environment (which is for example web).

9 Likes

I was not aware that concurrency is not the same as parallelism. Is there a way to program parallelism in Elixir comparable to the TBB library in C++.
Or is Elixir not designed for parallelism?

I also assumed that parallelism is possible, since I have read this article :confused:
In this Article P. Brown wrote (https://culttt.com/2016/07/27/understanding-concurrency-parallelism-elixir/#:~:text=Concurrency%20is%20when%20two%20or,example%20on%20a%20multicore%20processor.&text=If%20your%20computer%20has%20multiple,each%20of%20them%20in%20parallel.)

In Elixir, processes are separate contexts (isolation) and you can have hundreds of thousands of processes on a single CPU (lightweight). If your computer has multiple cores, Elixir will run processes on each of them in parallel.

Erlang utilises parallelism by using multiple schedulers, but you do not have control over that mechanism. If you want to have more control over it then you should use NIFs.

About the parallelism vs concurrency - you can have concurrency even in single-core, single-thread environment. For example run Go with GOPROCS=1 or run any OS on single-core CPU. Concurrency is about splitting work into smaller parts and working on them independently, parallelism is about doing many things at once. Concurrency is required for working in parallel, but you can have concurrent and non-parallel work, for example merge sort is concurrent, non-parallel (in most cases), work as you sort parts of the sequence independently from other parts and then (still concurrently) join them into bigger sorted chunks. Parallelism is doing several chunks of that work at the same time.

So answering your question - no Erlang was not designed for parallelism, however it makes parallelism easier, because it forces concurrency and “well defined behaviours”, like no shared memory between processes.

4 Likes

I didn’t take time to analyse your algorithm but using Task.async_stream gives you a transparent way of running the same function with different inputs which can then be easily and automatically gathered back – using Enum.to_list with the output of Task.async_stream is the canonical example.

But if you need different stages then GenStage and Flow are the better tools.

That really depends on the size of your input. It’s indeed true that, depending on the hardware specs of the machine the code is running on, there’s a particular threshold before which you shouldn’t parallelize at all.


Sorry for generic answer, but I felt your other concerns have already beed addressed.