My.TaskScheduler.init/2 starts My.TaskWorkerSupervisor under My.Supervisor using Supervisor.start_child/2
My.TaskScheduler starts a job under My.TaskWorkerSupervisor using Supervisor.start_child/2
When the job completes, My.TaskWorkerSupervisor notifies My.TaskScheduler; when the job crashes, My.TaskWorkerSupervisor catches it and restarts it
Note that I put My.TaskSupervisor on the top of My.TaskScheduler and My.TaskWorkerSupervisor since both should be restarted if any of them crashes, while I do not want to have that behavior on other apps under top-level application.
Is it the right direction? In particular, should (or can) I leverage built-in supervisor? For example, I tried Task.Supervisor, but I don’t think so for following reasons:
Task.Supervisor.async/2 and Task.Supervisor.async_nolink/2 requires :temporary for :restart option, so it dose not restart a child when crashes
Task.Supervisor.start_child/2 does not link the child process to the caller, so the caller (My.TaskScheduler) cannot take actions when the job is complete
Let’s see. I would create one process that represents the entire workflow (maybe that’s your TaskScheduler?). Then you would have a separate process for each step to be completed. The workflow is a state machine. From the start state, you begin TaskA and monitor it. If you are notified that it fails, then you loop around to the same state and begin it again. If it complete… you go to the verify results state. (It’s not clear what happens if verification fails? Go to an error state?)
If verification passes, go into the state where you run TaskB. Monitor Task B. etc…
If you want to use a supervisor to monitor TaskA and TaskB then you could create a :simple_one_for_one supervisor with to child specs (one for each task) and use Supervisor.start_child to kick off the task when needed… Just because the supervisor is monitoring and restarting the child doesn’t mean your workflow process can’t also monitor them.
I think this is the simplest solution for my use case - just to spawn and monitor a process. I’ll try that.
I was under the impression that 1) I should avoid implementing supervisor feature and use built-in Supervisor, and 2) a genserver should focus on state management and avoid doing supervision works. For my cases, I may implement all in one genserver.
It seems same with my initial plan using a separate supervisor - let My.TaskScheduler run processes under My.TaskWorkerSupervisor using Supervisor.start_child/2.
However, what is the simpler way for My.TaskScheduler to get the result of the child process? It gets pid of the spawned child, but the process may crash and be restarted by My.TaskWorkerSupervisor. That’s why I thought My.TaskWorkerSupervisor must receive the message from the child process (which does the actual job and returns the result), and then pass it to My.TaskScheduler.
The worker (TaskA) can simply tell the My.TaskScheduler when it’s done. Here’s an example:
defmodule My.TaskScheduler do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [])
end
def init([]) do
# for this example we'll create the TaskWorkerSupervisor here
# it could have been created elsewhere and made available to this
# server.
{:ok, task_sup} = Supervisor.start_link([], strategy: :one_for_one)
{:ok, %{task_sup: task_sup}}
end
def kickoff_workflow(scheduler) do
GenServer.cast(scheduler, :start_workflow)
end
def handle_cast(:start_workflow, %{task_sup: task_sup} = state) do
IO.puts("Workflow started")
# We want to start TaskA so we ask for it's child spec and use
# start_child to get it running.
task_spec = Supervisor.child_spec({My.TaskA, self()}, [])
{:ok, task_a} = Supervisor.start_child(task_sup, task_spec)
{:noreply, state}
end
def handle_cast(:task_a_done, %{task_sup: task_sup} = state) do
IO.puts("Task A is Done!")
# The supervisor is done with task_a so it doesn't need to remember
# how to restart it.
Supervisor.delete_child(task_sup, My.TaskA)
{:noreply, state}
end
end
defmodule My.TaskA do
# this is a gen server that should restart as a transient worker
use GenServer, restart: :transient
# start the task - the argumet is the process to notify when
# the task is done
def start_link(task_scheduler) do
# pass the task to notify through to init
GenServer.start_link(__MODULE__, task_scheduler)
end
def init(task_scheduler) do
# tell this process to do it's thing... if it crashes and restarts
# it will automatically restart
do_your_task(self())
# remember who we're supposed to tell when done
{:ok, [task_scheduler: task_scheduler]}
end
def do_your_task(task_a) do
GenServer.cast(task_a, :do_it)
end
def handle_cast(:do_it, [task_scheduler: task_scheduler] = state) do
# our task takes 2 seconds!
Process.sleep(2000)
# tell whomever we were supposed to that we're done
GenServer.cast(task_scheduler, :task_a_done)
# ive done my job
{:stop, :normal, state}
end
end
In this case the My.TaskWorkerSupervisor is the supervisor created in My.TaskScheduler.init (and stored in :task_sup), but it could simply have been passed in from elsewhere.
When it kicks off My.TaskA it asks it asks for a child_spec and modifies it so that the pid of My.TaskScheduler becomes a parameter that is passed to start_link. It then uses start_child to start a worker with that spec. Because My.TaskA is declared as a GenServer with restart: :transient the supervisor will restart it if it crashes abnormally, but not if it ends normally.
When My.TaskA finishes it’s task, it reports completion back to My.TaskScheduler. Upon getting that completion message, it removes the child_spec for My.TaskA from the supervisor using delete_child It could go on to kick off My.TaskB at that point.