Hello,
I have been playing with a pmap
(parallel map) implementation, for a task that seems easy: to run a series of functions in parallel and gather their results. The “snag” is that I want to handle errors and timeouts out of a given deadline, all of them returning a results and not aborting the general computation or - worse - killing the parent.
I ended up with something like:
def parallel_map_ordered(enum, myFn, errorFn, timeout) do
myWrappedFn = fn v ->
try do
myFn.(v)
rescue
err ->
with Logger.error(
"Crashed pmap: #{Tools.ii(err)} on #{Tools.ii(__STACKTRACE__)} for input #{Tools.ii(v)}"
) do
errorFn.(v, err)
end
end
end
tasks = Enum.map(enum, &Task.async(fn -> myWrappedFn.(&1) end))
Task.yield_many(tasks, timeout)
|> Enum.zip(enum)
|> Enum.map(fn {{%Task{}, res}, orgval} ->
case res do
{:ok, v} ->
v
nil ->
with Logger.error("Timed out pmap: for input #{Tools.ii(orgval)}") do
errorFn.(orgval, :timeout)
end
end
end)
end
That you use by passing two functions:
- one is the main function; it gets called for any element of the Enum and is supposed to return an element
- one is the error handler, that gets called with the value that was supposed to be processed and the error or the kw
:timeout
, and returns a value to be inserted into the resulting collection
There is a maximum timeout, after which all computations are aborted.
You use it like this:
test "simplex" do
fnOk = fn v ->
Process.sleep(v * 100)
v * 100
end
fnErr = fn v, e ->
{:error, v, e}
end
assert [100, 200, 300, 100, 200, 100, 200, 500, 100] =
PsTools.parallel_map_ordered(
[1, 2, 3, 1, 2, 1, 2, 5, 1],
fnOk,
fnErr,
5000
)
end
This will work; if you set one of the values to (say) 500, the function will time out and terminate in 5 seconds, but all intermediate values will be preserved.
The implementaion is naive, meaning that there is no batching and everything will be run in parallel, so if you have a file reader and run one million in parallel, you will exhaust file descriptors. If you batch, every batch will be limited by the slowest operation, so again it is kind of meh. But it’s a start.
Now for my questions:
- is there something that I overlooked in the handling? should i catch
:exit
in the rescue clause? - was there a simple way to do that using the Tasks module?
TIA