When spawning an Elixir process, is it possible to return data back to the parent process?

When spawning an Elixir process, is it possible to return data back to the parent process? I don’t mean a message, I mean result data.

I would slide to be able to have 10 processes each do a body of work (in parallel) that is computationally expensive and once completed, return their result so that the parent can take the result and do additional work with the combined result of all 10 child processes. It would be ideal to be able to await until all 10 processes have completed. This way the work would get done much faster than a single process.

If yes, my next question would be is there a limit to the amount (size) of data that can be returned from a process or is this no different than any function returning data?

We are currently using Node.js and its very challenging. We have to prep the data, write it to disk, send messages to workers, use event handlers to process response messages, get the processes data from disk, etc, etc.

The dream would be to literally execute 10 functions in parallel, in processes, and have alll there data returned back to the parent.

Thank you.

2 Likes

Seems like you’re looking for Task.async_stream or any of the other options in the Task module.

3 Likes

The only way to exchange data between processes is through messages.

Something I’ve seen in erlang code often is something like this:

def work_in_parallel(workdata) do
  workdata
  |> Enum.map(fn workitem ->
    ref = make_ref()
    {ref, Process.spawn(__MODULE__, :worker_fun, [{ref, self()}, workitem])}
  end)
  |> Enum.map(fn {ref, pid} ->
    receive do
      {^ref, ^pid, result} -> result
    end
  end)
end

def worker_fun({ref, from}, data) do
  result = data + 1
  send from, {{ref, self()}, result}
end

Ps: I’m pretty sure there are abstractions available that do also proper management of poolsize for workers. Parallel map should be good to Google for.

2 Likes

To expand @LostKobrakai response, You find Task documentation here.

If your dream can be done with one line…

iex> result = 1..10 |> Enum.map(fn _ -> Task.async(fn -> :timer.sleep(1000); "work done!" end) end) |> Enum.map(& Task.await &1)
["work done!", "work done!", "work done!", "work done!", "work done!",
 "work done!", "work done!", "work done!", "work done!", "work done!"]

What are You going to think about GenStage?

Update: As mentionned by @NobbZ, it is wise to use a timeout… like so

# replace
& Task.await &1

# by
& Task.await(&1, @anytimeoutvalue)
3 Likes

Remember that Task.await will exit when the timeout exceeds.

Either use a timeout that is large enough or take care of the exits properly.

2 Likes

This is basically a pmap. :wink: You can also quite easily expand it to handle any form of error handing you need, for example tag values to show which ones succeeded or failed, or kill them all if one fails.

If you want to do a pmap with error handling then it is better to spawn a “master” pmap process which spawns the worker processes and manages them. This will then not interfere with the caller process’s handling of links etc.

7 Likes

Thank you for the responses everyone. I’ve been looking at Task and when I get home I will play with some code and see how it works. The docs talk about returning the results of processes via messages… is there a limit to the size of data that can be returned via a message?

I must have Node.js PTSD because when I see the word message, I think a small, limited amount of data, not the entire result set of some expensive CPU computation.

Depending on your use case, Task, GenStage or Flow might help you. You seem to want to do a classic Gather-Scatter processing and that doesn’t require 500+ lines of code – that’s one of the top allures of Erlang and Elixir.

Maybe if you share with us what is it that you want to achieve, we’ll be able to give you a direct advice.

2 Likes

That would require a pretty lengthy explanation as many parts of our Node.js backend use processes.

A simple example would be generating digital signatures. Say we had a list with 10,000 items and each item required a digital signature, could we split the list among 10 processes (1,000 each) for example, and have each process generate 1000 signatures, returning their respective results and then the parent processes would combine all 10 results back into an updated list of 10,000 items with their respective signatures?

Another example would be if we had to write 10,000 rows of data to a database such as Neo4j. Could the list of 10,000 items be split into batches of 1,000 and 10 processes could each write a batch to Neo4j (in parallel) and once all 10 batches have been written, the results of all 10 processes would be combined back into a single list of 10,000 items in the parent process.

I hope these examples make sense. Thanks.

There is no limit to the size of a message other than the amount of memory available.

Both of your examples are basically what Task.async_stream does. You can specify how many workers (via the max_concurrency parameter; defaults to System.schedulers_available/0) and by default the results come back to you in the same order as they originally were.

The official docs explain this pretty well, including working with timeouts and what to do if/when they occur.

However if you also want to work with errors, then maybe you should work with one of the Task.Supervisor functions.

1 Like

Thank you for all the helpful information everyone, it’s much appreciated.

@danielcasler You are describing something that Flow is ideal for