Running two long-running batch jobs with restart on failure

I need to run two long-running jobs in order, handling failure.

  1. Run TaskA
  2. Take the result of TaskA, verify it
  3. Run TaskB
  4. Take the result of it, verify it
  5. Schedule the next run

If one of task fails, then it should retry from the failed job, not from the beginning. (e.g. failure of TaskB should retry TaskB, not TaskA).

So I sketched out the supervision tree like it:

  • My.Supervisor: top-level application supervisor
    • My.TaskSupervisor
      • My.TaskScheduler: GenServer to manage the current state
      • My.TaskWorkerSupervisor: Supervisor to manage the current running job

Flows would be

  1. My.Supervisor.init/2 starts My.TaskSupervisor
  2. My.TaskSupervisor.init/2 starts My.TaskScheduler, passing itself
  3. My.TaskScheduler.init/2 starts My.TaskWorkerSupervisor under My.Supervisor using Supervisor.start_child/2
  4. My.TaskScheduler starts a job under My.TaskWorkerSupervisor using Supervisor.start_child/2
  5. When the job completes, My.TaskWorkerSupervisor notifies My.TaskScheduler; when the job crashes, My.TaskWorkerSupervisor catches it and restarts it

Note that I put My.TaskSupervisor on the top of My.TaskScheduler and My.TaskWorkerSupervisor since both should be restarted if any of them crashes, while I do not want to have that behavior on other apps under top-level application.

Is it the right direction? In particular, should (or can) I leverage built-in supervisor? For example, I tried Task.Supervisor, but I don’t think so for following reasons:

  • Task.Supervisor.async/2 and Task.Supervisor.async_nolink/2 requires :temporary for :restart option, so it dose not restart a child when crashes
  • Task.Supervisor.start_child/2 does not link the child process to the caller, so the caller (My.TaskScheduler) cannot take actions when the job is complete

Let’s see. I would create one process that represents the entire workflow (maybe that’s your TaskScheduler?). Then you would have a separate process for each step to be completed. The workflow is a state machine. From the start state, you begin TaskA and monitor it. If you are notified that it fails, then you loop around to the same state and begin it again. If it complete… you go to the verify results state. (It’s not clear what happens if verification fails? Go to an error state?)

If verification passes, go into the state where you run TaskB. Monitor Task B. etc…

If you want to use a supervisor to monitor TaskA and TaskB then you could create a :simple_one_for_one supervisor with to child specs (one for each task) and use Supervisor.start_child to kick off the task when needed… Just because the supervisor is monitoring and restarting the child doesn’t mean your workflow process can’t also monitor them.

Thanks for the feedback!

I think this is the simplest solution for my use case - just to spawn and monitor a process. I’ll try that.

I was under the impression that 1) I should avoid implementing supervisor feature and use built-in Supervisor, and 2) a genserver should focus on state management and avoid doing supervision works. For my cases, I may implement all in one genserver.

It seems same with my initial plan using a separate supervisor - let My.TaskScheduler run processes under My.TaskWorkerSupervisor using Supervisor.start_child/2.

However, what is the simpler way for My.TaskScheduler to get the result of the child process? It gets pid of the spawned child, but the process may crash and be restarted by My.TaskWorkerSupervisor. That’s why I thought My.TaskWorkerSupervisor must receive the message from the child process (which does the actual job and returns the result), and then pass it to My.TaskScheduler.

The worker (TaskA) can simply tell the My.TaskScheduler when it’s done. Here’s an example:

defmodule My.TaskScheduler do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    # for this example we'll create the TaskWorkerSupervisor here
    # it could have been created elsewhere  and made available to this 
    # server.
    {:ok, task_sup} = Supervisor.start_link([], strategy: :one_for_one)
    {:ok, %{task_sup: task_sup}}
  end

  def kickoff_workflow(scheduler) do
    GenServer.cast(scheduler, :start_workflow)
  end

  def handle_cast(:start_workflow, %{task_sup: task_sup} = state) do
    IO.puts("Workflow started")

    # We want to start TaskA so we ask for it's child spec and use
    # start_child to get it running.
    task_spec = Supervisor.child_spec({My.TaskA, self()}, [])
    {:ok, task_a} = Supervisor.start_child(task_sup, task_spec)

    {:noreply, state}
  end

  def handle_cast(:task_a_done, %{task_sup: task_sup} = state) do
    IO.puts("Task A is Done!")

    # The supervisor is done with task_a so it doesn't need to remember
    # how to restart it.
    Supervisor.delete_child(task_sup, My.TaskA)

    {:noreply, state}
  end
end

defmodule My.TaskA do
  # this is a gen server that should restart as a transient worker
  use GenServer, restart: :transient

  # start the task - the argumet is the process to notify when
  # the task is done
  def start_link(task_scheduler) do
    # pass the task to notify through to init
    GenServer.start_link(__MODULE__, task_scheduler)
  end

  def init(task_scheduler) do
    # tell this process to do it's thing... if it crashes and restarts
    # it will automatically restart
    do_your_task(self())

    # remember who we're supposed to tell when done
    {:ok, [task_scheduler: task_scheduler]}
  end

  def do_your_task(task_a) do
    GenServer.cast(task_a, :do_it)
  end
  
  def handle_cast(:do_it, [task_scheduler: task_scheduler] = state) do
    # our task takes 2 seconds!
    Process.sleep(2000)

    # tell whomever we were supposed to that we're done
    GenServer.cast(task_scheduler, :task_a_done)

    # ive done my job
    {:stop, :normal, state}
  end
end

In this case the My.TaskWorkerSupervisor is the supervisor created in My.TaskScheduler.init (and stored in :task_sup), but it could simply have been passed in from elsewhere.

When it kicks off My.TaskA it asks it asks for a child_spec and modifies it so that the pid of My.TaskScheduler becomes a parameter that is passed to start_link. It then uses start_child to start a worker with that spec. Because My.TaskA is declared as a GenServer with restart: :transient the supervisor will restart it if it crashes abnormally, but not if it ends normally.

When My.TaskA finishes it’s task, it reports completion back to My.TaskScheduler. Upon getting that completion message, it removes the child_spec for My.TaskA from the supervisor using delete_child It could go on to kick off My.TaskB at that point.