Yes, I’d like to know if every Task terminated :normal
ly.
Oban uses PostgreSQL. That is not an available option in my case.
Actually, I came up with a solution that is not yet perfect, but I believe this is the right direction. Imagine a cluster where all nodes have a local Task.Supervisor
with a locally registered name :task_supervisor_local_name
, and also a gen server like this one:
defmodule SupDispatcher do
use GenServer
@retry_count 3
def start_link(_ \\ nil), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
def init(_), do: {:ok, @retry_count}
def handle_call(
msg = {:start_task, [_task_owner, _callers, _monitor, {_m, _f, a}], _restart, _shutdown},
from,
retry_count
) do
nodes = Node.list([:visible, :this])
index = :erlang.phash2(a, length(nodes))
drawn_node = Enum.at(nodes, index)
try do
GenServer.call({:task_supervisor_local_name, drawn_node}, msg)
catch
_, _ when retry_count > 0 ->
handle_call(msg, from, retry_count - 1)
x, y ->
{:reply, {:error, {:exception, x, y}}, @retry_count}
else
{:ok, pid} ->
{:reply, {:ok, pid}, @retry_count}
_ when retry_count > 0 ->
handle_call(msg, from, retry_count - 1)
error ->
{:reply, error, @retry_count}
end
end
end
The intented usage of this is that when I call Task.Supervisor.async_stream_nolink
on any node, I actually use the above GenServer instead of a Task.Supervisor. The gen server accepts the :start_task
message, and chooses a node (based on the arguments of the given call) with phash2
. Then it simply forwards the request to the Task.Supervisor on the selected node. The remote Task.Supervisor starts the task, returns the pid of the task to the GenServer, and then the GenServer returns the same pid to the async_stream_nolink
as if it was a supervisor. This seems to work fine. The load is distributed across the non-hidden nodes, and it can even handle nodes coming and going. Except that I do not know if the spawned task was finished. It only knows that the task was successfully started.
Is there any other problem with this approach?
Is it possible to fix this and restart failed tasks?