Hi, I’m pretty new to Elixir.
I am currently rewriting an algorithm I wrote in c++ to Elixir. I wrote this algorithm in C++ sequentially and parallel.
The sequential code looks like this in an abstract way:
int algorithm(p_value, depth) {
//some_preperation
for(int i = 0; i < N; ++i){
//do_some_stuff
if(condition1) {
return 1
} else {
result += algorithm(new_value, depth+1);
}
}
}
Based on the C++ code, I then rewrote the algorithm in Elixir. The sequential algorithm works.
The Code looks lile this in an abstract way:
def algorithm(p_value, depth) do
#some_preparation
Enum.reduce_while(0..N-1,0, fn x, acc ->
#do_some_stuff
if(condition1) do
{:cont, acc + algorithm(new_value, depth+1)} #I want to parallelize the recursion
else
{:halt, 1}
end
end)
end
So far, everything’s okay.
Now let’s talk about the parallel variation. In C++ I used the TBB library for this. How the library works is unimportant here. It is only important to understand at which position I want to parallelize and that is at the recursive call of the methods.
I would like to transfer this thought to Elixir.
First of all the C++ code in an abstract form:
int algorithm(p_value, depth) {
task_list list;
unsigned int results[N];
count = 0; ##to count the number of tasks
##some_preperation
for(int i = 0; i < N; ++i){
##do_some_stuff
if(condition1) {
++*this->currentResult; ## instead of returning 1
} else {
list.push_back(*new(allocate_child())algorithm(new_value, depth + 1, &results[i])); ##create a task for the recursion and insert it to my list
++count;
}
}
##when the foor-loop finished --> iterate through tasks -> spawn and wait for them -->get the results of the tasks
if(count > 0){
set_ref_count(count+1);
spawn_and_wait_for_all(list);
for(int i = 0; i < N; ++i){
*this->currentResult+= results[i];
}
}
}
With the help of the Elixir forum I came up with 2 ways to parallel this.
With the help of Flow and Task. I have read the documentation thoroughly and I think I know how Flow and Task works.
I have applied both to my algorithm.
The code using Flow looks like this:
def algorithm(p_value, depth) do
#some_preparation
#a_stream#
|> Flow.from_enumerable()
|> Flow.reduce(fn -> [] end, fn x, acc ->
#do_some_stuff
if(condition1) do
acc ++ [algorithm(new_value, depth+1)]
else
[1]
end
end)
|> Enum.to_list()
|> Enum.sum()
end
The code using Task looks like this:
def algorithm(p_value, depth) do
#some_preparation
Enum.reduce_while(0..N-1,[], fn x, acc ->
#do_some_stuff
if(condition1) do
acc ++ [Task.async(fn -> algorithm(new_value, depth+1) end)] #concatinate everything in an array
else
[1]
end
end)
|> Enum.map(fn(task) ->
if(Kernel.is_number(task)) do
task
else
Task.await(task)
end
end)
end
My question or my problem is this:
The Flow variant works. The result of the algorithm is correct. However, I have the feeling that it does not run as parallel as I had imagined. This variant with Flow is slower than my sequential variant. Does it really run parallel or do I still have to specify that the function should run on all cores.
The variant with task runs for a small N sometimes. Sometimes it works and sometimes my algorithm runs endlessly and doesn’t stop and that for a small N!
For large N it does not work , because then the following error message appears
[error] Task #PID<0.10596.398> started from #PID<0.6853.403> terminating
** (SystemLimitError) a system limit has been reached
:erlang.spawn_link(:proc_lib, :init_p, [#PID<0.6853.403>, [#PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>, #PID<0.62.0>], Task.Supervised, :reply, [{:nonode@nohost, #PID<0.6853.403>, #PID<0.6853.403>}, [#PID<0.6853.403>, #PID<0.10596.398>, #PID<0.6227.396>, #PID<0.24676.394>, #PID<0.23132.394>, #PID<0.21044.394>, #PID<0.20272.394>, #PID<0.207.0>], :nomonitor, {:erlang, :apply, [#Function<8.131722887/0 in LangfordSequence.langfordSequence_task/2>, []]}]])
(stdlib 3.8) proc_lib.erl:102: :proc_lib.spawn_link/3
(elixir 1.10.3) lib/task/supervised.ex:14: Task.Supervised.start_link/4
(elixir 1.10.3) lib/task.ex:422: Task.async/3
(pva 0.1.0) langfordSequence.ex:69: anonymous fn/4 in LangfordSequence.langfordSequence_task/2
(elixir 1.10.3) lib/enum.ex:2111: Enum."-reduce/3-lists^foldl/2-0-"/3
(pva 0.1.0) langfordSequence.ex:65: LangfordSequence.langfordSequence_task/2
(elixir 1.10.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
Function: #Function<8.131722887/0 in LangfordSequence.langfordSequence_task/2>