# How does Flow and Task work in Elixir and how to run a function on all cores

Hi, I’m pretty new to Elixir.

I am currently rewriting an algorithm I wrote in c++ to Elixir. I wrote this algorithm in C++ sequentially and parallel.

The sequential code looks like this in an abstract way:

``````int algorithm(p_value, depth) {
//some_preperation
for(int i = 0; i < N; ++i){
//do_some_stuff
if(condition1) {
return 1
} else {
result += algorithm(new_value, depth+1);
}
}
}

``````

Based on the C++ code, I then rewrote the algorithm in Elixir. The sequential algorithm works.

The Code looks lile this in an abstract way:

``````def algorithm(p_value, depth) do
#some_preparation
Enum.reduce_while(0..N-1,0, fn x, acc ->
#do_some_stuff
if(condition1) do
{:cont, acc + algorithm(new_value, depth+1)} #I want to parallelize the recursion
else
{:halt, 1}

end
end)
end
``````

So far, everything’s okay.

Now let’s talk about the parallel variation. In C++ I used the TBB library for this. How the library works is unimportant here. It is only important to understand at which position I want to parallelize and that is at the recursive call of the methods.

I would like to transfer this thought to Elixir.

First of all the C++ code in an abstract form:

``````int algorithm(p_value, depth) {
unsigned int results[N];
count = 0; ##to count the number of tasks
##some_preperation

for(int i = 0; i < N; ++i){
##do_some_stuff
if(condition1) {
++*this->currentResult; ## instead of returning 1
} else {
list.push_back(*new(allocate_child())algorithm(new_value, depth + 1, &results[i])); ##create a task for the recursion and insert it to my list
++count;
}
}
##when the foor-loop finished --> iterate through tasks -> spawn and wait for them -->get the results of the tasks
if(count > 0){
set_ref_count(count+1);
spawn_and_wait_for_all(list);

for(int i = 0; i < N; ++i){
*this->currentResult+= results[i];
}
}
}

``````

With the help of the Elixir forum I came up with 2 ways to parallel this.

With the help of Flow and Task. I have read the documentation thoroughly and I think I know how Flow and Task works.
I have applied both to my algorithm.

The code using Flow looks like this:

``````def algorithm(p_value, depth) do
#some_preparation
#a_stream#
|> Flow.from_enumerable()
|> Flow.reduce(fn -> [] end, fn x, acc ->
#do_some_stuff
if(condition1) do
acc ++ [algorithm(new_value, depth+1)]
else
[1]
end
end)
|> Enum.to_list()
|> Enum.sum()
end

``````

The code using Task looks like this:

``````def algorithm(p_value, depth) do
#some_preparation
Enum.reduce_while(0..N-1,[], fn x, acc ->
#do_some_stuff
if(condition1) do
acc ++ [Task.async(fn -> algorithm(new_value, depth+1) end)] #concatinate everything in an array
else
[1]
end
end)
else
end
end)
end

``````

My question or my problem is this:

The Flow variant works. The result of the algorithm is correct. However, I have the feeling that it does not run as parallel as I had imagined. This variant with Flow is slower than my sequential variant. Does it really run parallel or do I still have to specify that the function should run on all cores.

The variant with task runs for a small N sometimes. Sometimes it works and sometimes my algorithm runs endlessly and doesn’t stop and that for a small N!
For large N it does not work , because then the following error message appears

``````
[error] Task #PID<0.10596.398> started from #PID<0.6853.403> terminating
** (SystemLimitError) a system limit has been reached
:erlang.spawn_link(:proc_lib, :init_p, [#PID<0.6853.403>, [#PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>, #PID<0.62.0>], Task.Supervised, :reply, [{:nonode@nohost, #PID<0.6853.403>, #PID<0.6853.403>}, [#PID<0.6853.403>, #PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>], :nomonitor, {:erlang, :apply, [#Function<8.131722887/0 in LangfordSequence.langfordSequence_task/2>, []]}]])
(pva 0.1.0) langfordSequence.ex:69: anonymous fn/4 in LangfordSequence.langfordSequence_task/2
(elixir 1.10.3) lib/enum.ex:2111: Enum."-reduce/3-lists^foldl/2-0-"/3
``````

`Task` spawns at least one process per task. As you create more and more tasks from within each task, the number of process explodes.

`Flow` will create a process for each partition in each stage of the flow. You start new flows from within a flow. The number of processes explodes.

There is a maximum of processes per instance of the BEAM. Per default this limit is ~260000. There is a chapter in the manual giving a tabular overview about system limits.

Also the random fails for small N might be because flaky runtimes and early tasks timing out because of that, but that is just gueswork, as you haven’t told us the error message.

1 Like

There is one big problem with stuff you are trying to do:

# Processes in Erlang aren’t meant for parallelism

It is just the nice side effect, not the reason for their existence. Processes in Erlang are most importantly used for failure containment, so failure in single process should not cause issues in unrelated processes. Just like phone calls in “Erlang The Movie” or separate connections to your web application. Concurrency (which is completely different from parallelism) is a thing that was also needed for processes to work nicely in soft real-time environment (which is for example web).

9 Likes

I was not aware that concurrency is not the same as parallelism. Is there a way to program parallelism in Elixir comparable to the TBB library in C++.
Or is Elixir not designed for parallelism?

In Elixir, processes are separate contexts (isolation) and you can have hundreds of thousands of processes on a single CPU (lightweight). If your computer has multiple cores, Elixir will run processes on each of them in parallel.

Erlang utilises parallelism by using multiple schedulers, but you do not have control over that mechanism. If you want to have more control over it then you should use NIFs.

About the parallelism vs concurrency - you can have concurrency even in single-core, single-thread environment. For example run Go with `GOPROCS=1` or run any OS on single-core CPU. Concurrency is about splitting work into smaller parts and working on them independently, parallelism is about doing many things at once. Concurrency is required for working in parallel, but you can have concurrent and non-parallel work, for example merge sort is concurrent, non-parallel (in most cases), work as you sort parts of the sequence independently from other parts and then (still concurrently) join them into bigger sorted chunks. Parallelism is doing several chunks of that work at the same time.

So answering your question - no Erlang was not designed for parallelism, however it makes parallelism easier, because it forces concurrency and “well defined behaviours”, like no shared memory between processes.

4 Likes

I didn’t take time to analyse your algorithm but using `Task.async_stream` gives you a transparent way of running the same function with different inputs which can then be easily and automatically gathered back – using `Enum.to_list` with the output of `Task.async_stream` is the canonical example.

But if you need different stages then `GenStage` and `Flow` are the better tools.

That really depends on the size of your input. It’s indeed true that, depending on the hardware specs of the machine the code is running on, there’s a particular threshold before which you shouldn’t parallelize at all.