Hi there,
Sorry for the long post.
I’m currently using Task.async_stream
to process a list and do external API requests, the requests, if done twice will throw an error on the remote side (unique constraints). What each task is doing is:
- check if we have a local reference on the DB
- if not, do an API request and create it on the remote API
- create the row on the local DB and set the remote ID
now if we’re processing 10 elements at a time, if one of those crashes they all get killed due this.
Since the API request is the longest part of the process it’s probable that with 10 concurrent processes it will create 10 rows on the remote APi so now I have 10 items that will error next time I process the same list.
Is there a way to let the currently running processes finish if one of those crashes instead of killing them?
I’ve tried to use a try/rescue/catch
with a simple example to gracefully stop a stream while not killing the currently running processes but as soon as one of those return an error it stops processing it:
defmodule Async do
def map(enum, fun, options) do
enum
|> Task.async_stream(
fn el ->
try do
{:ok, fun.(el)}
rescue
error -> {:raised, error}
catch
error -> {:catched, error}
end
end,
options
)
|> Stream.map(&Kernel.elem(&1, 1))
|> Stream.map(&IO.inspect/1)
|> Stream.take_while(&Kernel.match?({:ok, _}, &1))
|> Stream.map(fn {:ok, result} -> result end)
end
end
Async.map(
[4, 3, 2, 5],
fn el ->
IO.puts("Sleeping for #{el} seconds")
:timer.sleep(el * 1000)
IO.puts("Done sleeping for #{el} seconds")
if el == 2 do
raise RuntimeError
end
el
end,
max_concurrency: 2,
timeout: 10_000
)
|> Stream.run()
shows:
Sleeping for 4 seconds
Sleeping for 3 seconds
Done sleeping for 3 seconds
Sleeping for 2 seconds
Done sleeping for 4 seconds
{:ok, 4}
{:ok, 3}
Sleeping for 5 seconds
Done sleeping for 2 seconds
{:raised, %RuntimeError{message: "runtime error"}}
so you can see it doesn’t wait until the 5 is done sleeping.
I’ve tried another approach using spawn
:
defmodule Parallel do
def map(list, fun, options \\ []) do
max_concurrency = Keyword.get(options, :max_concurrency, System.schedulers_online() * 2)
monitor_pid = self()
Process.flag(:trap_exit, true)
running_pids =
list
|> Enum.take(max_concurrency)
|> Enum.with_index()
|> Enum.map(fn {el, index} ->
spawn_process(monitor_pid, index, el, fun)
end)
monitor_loop(:cont, list, [], monitor_pid, running_pids, max_concurrency, fun)
end
defp monitor_loop(:halt, running_pids, error, stacktrace) do
receive do
{:result, pid, _, _} ->
running_pids = running_pids -- [pid]
if Enum.count(running_pids) == 0 do
IO.puts("No more running pids, we can safely raise exception")
reraise(error, stacktrace)
else
IO.puts("There are running pids, waiting on them and then raise")
end
{:EXIT, _, :normal} ->
monitor_loop(:halt, running_pids, error, stacktrace)
{:EXIT, pid, _} ->
IO.puts("Got error from pid #{inspect(pid)}")
running_pids = running_pids -- [pid]
if Enum.count(running_pids) == 0 do
IO.puts("No more running pids, we can safely raise exception")
throw(error)
else
IO.puts("There are running pids, waiting on them and then raise")
monitor_loop(:halt, running_pids, error, stacktrace)
end
end
end
defp monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun) do
IO.puts(
"Got monitor_loop call list #{inspect(list)} results #{inspect(results)} monitor_pid #{inspect(monitor_pid)} running_pids #{
inspect(running_pids)
} next index #{next_index}"
)
receive do
{:result, pid, index, result} ->
IO.puts("Got result from index #{index}: #{inspect(result)}")
running_pids = running_pids -- [pid]
results = [{index, result} | results]
cond do
next_index == Enum.count(list) and Enum.count(running_pids) == 0 ->
IO.puts("No new elements to add, reached end")
results
|> Enum.sort_by(&Kernel.elem(&1, 0))
|> Enum.map(&Kernel.elem(&1, 1))
next_index == Enum.count(list) and Enum.count(running_pids) > 0 ->
IO.puts("Still some processes running, restarting loop")
monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun)
true ->
IO.puts("New elements to add, spawning next")
pid = spawn_process(monitor_pid, next_index, Enum.at(list, next_index), fun)
monitor_loop(
:cont,
list,
results,
monitor_pid,
[pid | running_pids],
next_index + 1,
fun
)
end
{:EXIT, _, :normal} ->
monitor_loop(:cont, list, results, monitor_pid, running_pids, next_index, fun)
{:EXIT, pid, {error, stacktrace}} ->
IO.puts("Got error from pid #{inspect(pid)}")
running_pids = running_pids -- [pid]
if Enum.count(running_pids) == 0 do
IO.puts("No more running pids, we can safely raise exception")
reraise(error, stacktrace)
else
IO.puts("There are running pids, waiting on them and then raise")
monitor_loop(:halt, running_pids, error, stacktrace)
end
end
end
defp spawn_process(monitor_pid, index, el, fun) do
spawn_link(fn ->
send(monitor_pid, {:result, self(), index, fun.(el)})
end)
end
end
defp spawn_process(monitor_pid, index, el, fun) do
spawn_link(fn ->
send(monitor_pid, {:result, self(), index, fun.(el)})
end)
end
end
Parallel.map(
[4, 3, 2, 5],
fn el ->
IO.puts("Sleeping for #{el} seconds")
:timer.sleep(el * 1000)
IO.puts("Done sleeping for #{el} seconds")
if el == 2 do
raise RuntimeError
end
el
end,
max_concurrency: 2
)
|> Stream.run()
which seems to work on the test file:
Sleeping for 4 seconds
Sleeping for 3 seconds
Got monitor_loop call list [4, 3, 2, 5] results [] monitor_pid #PID<0.91.0> running_pids [#PID<0.96.0>, #PID<0.97.0>] next index 2
Done sleeping for 3 seconds
Got result from index 1: 3
New elements to add, spawning next
Sleeping for 2 seconds
Got monitor_loop call list [4, 3, 2, 5] results [{1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.98.0>, #PID<0.96.0>] next index 3
Got monitor_loop call list [4, 3, 2, 5] results [{1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.98.0>, #PID<0.96.0>] next index 3
Done sleeping for 4 seconds
Got result from index 0: 4
New elements to add, spawning next
Sleeping for 5 seconds
Got monitor_loop call list [4, 3, 2, 5] results [{0, 4}, {1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.99.0>, #PID<0.98.0>] next index 4
Got monitor_loop call list [4, 3, 2, 5] results [{0, 4}, {1, 3}] monitor_pid #PID<0.91.0> running_pids [#PID<0.99.0>, #PID<0.98.0>] next index 4
Done sleeping for 2 seconds
Got error from pid #PID<0.98.0>
There are running pids, waiting on them and then raise
11:34:24.114 [error] Process #PID<0.98.0> raised an exception
** (RuntimeError) runtime error
text.exs:120: anonymous fn/1 in :elixir_compiler_0.__FILE__/1
text.exs:107: anonymous fn/4 in Parallel.spawn_process/4
Done sleeping for 5 seconds
No more running pids, we can safely raise exception
** (RuntimeError) runtime error
text.exs:120: anonymous fn/1 in :elixir_compiler_0.__FILE__/1
text.exs:107: anonymous fn/4 in Parallel.spawn_process/4
however when actually using it in my code (that internally uses other processes, caches etc) it seems to still crash including all the other running processes.
Ignoring my attempt, is there a way to achieve this? Hopefully with a small change on how I use Task.async_stream
?