Supervising async tasks

I need to:

  • do some asynchronous work concurrently
  • kill the work when the parent GenServer exits
  • be able to specify the timeout
  • do some bookkeeping when the work succeeds
  • do some other bookkeeping when the work fails

Is the Task.Supervisor the right tool for the job?

I’m thinking of writing a GenServer that would use Task.Supervisor.async_nolink to start Tasks, specifying the :shutdown option and handling the {ref, result} and :DOWN messages.

But how do I make sure that the tasks are immediately terminated when the GenServer is shut down? I guess Task.Supervisor.async is what I should use. But how will I get the results/crashes? Am I going to receive the same messages as with async_nolink?

2 Likes

@sasajuric is working in this project that might help your use case:

https://github.com/sasa1977/parent

Check it out.

1 Like

By parts:

  • do some asynchronous work concurrently

Use a Task.Supervisor

  • kill the work when the parent GenServer exits

Use Task.Supervisor.async but notice you can’t use await, you will receive the done or the down message in your handle_info

  • be able to specify the timeout

When you call Task.Supervisor.async, also call Process.send_after(self, {:kill, task}, timeout). You will receive that message In handle_info, kill the task if timeout has passed

  • do some bookkeeping when the work succeeds

Match the {ref, result} message in handle_info and cancel the timeout started above

  • do some other bookkeeping when the work fails

Match the DOWN message in handle_info and cancel the timeout started above. Also make sure you call Process.flag(:trap_exit, true) so your genserver doesn’t die when the task crashes

16 Likes

Thanks for the response, that’s really helpful!

Things really clicked when I realized that tasks run under Task.Supervisor are always linked to that supervisor. Task.Supervisor.async and Task.Supervisor.async_nolink are used to decide whether to link (or not) to the caller of those functions, which should be a GenServer that should probably do something about the messages from the tasks.

Then I also realized that if I want to do any extra bookkeeping after the tasks complete, it has to be done in some GenServer, not in a supervisor.

Once I’m done with my stuff, I’d like to work on a PR to improve the docs for the Task.Supervisor.

2 Likes

I was following this thread due to how common this use case is as expressed by @stefanchrobot and and the solution @josevalim gave seemed interesting and simple.

So I went and code it myself as I understood the use case and Task documentation (https://hexdocs.pm/elixir/1.5/Task.html#content).

It took me a while to grasp but this is the code. Please correct me if there is anything I misunderstood:

  1. Crete the project

    mix new stask --sup
    cd stask 
    mix deps.get
    
  2. The supervision tree

    defmodule Stask.Application do
    @moduledoc false
    use Application
    
    def start(_type, _args) do
      children = [
        {Task.Supervisor,
          name: Stask.TaskSupervisor,
          restart: :temporary
        }
      ]
      opts = [strategy: :one_for_one, name: Stask.Supervisor]
      Supervisor.start_link(children, opts)   
      end
    end
    
  3. The basic task

     defmodule Stask do
       def some_task(text) do
         # Some hard work
         Process.sleep(15000)
         # Final result
         {:ok, "Hello #{text}"}
       end
     end
    
  4. The GenServer

    defmodule Stask.Server do
    use GenServer
    
    ## Client API
    
    def start() do
      GenServer.start(__MODULE__, nil, name: __MODULE__)
    end
    
    def execute_task(pid, name, task_timeout) do
      GenServer.cast(pid, {:execute, name, task_timeout})
    end
    
    ## Server Callbacks
    
    def init(_) do
      Process.flag(:trap_exit, true)
      {:ok, %{msg: "", timer: "", timeout: ""}}
    end
    
    def handle_cast({:execute, name, task_timeout}, state) do
      task = Task.Supervisor.async(Stask.TaskSupervisor, Stask, :some_task, [name])
      timer_ref = Process.send_after(self(), {:kill, task}, task_timeout)
      {:noreply, %{state | timer: timer_ref}}
    end
    
    ## Handle info functions
    
    # This handle function executes when the task has timed out
     def handle_info({:kill, task}, state) do
       # Do some book keeping here
       IO.puts("Task has been canceled due to time out")
       # You can specify how much time you can wait a task to exit. If it
       # does not exit in this treshold time it will be killed 
       # The default time is 5000ms. You can have Task.shutdown(task, 2000).
       # You can also kill it immediatly. Task.shutdown(task, :brutal_kill)
       # Check documentation: https://hexdocs.pm/elixir/1.5/Task.html#shutdown/2
       case Task.shutdown(task) do
         {:ok, _reply} ->
           # Do some book keeping, the task responded while the task was
           # been shutdown
           IO.puts("The task responded while it was shutting down")
         {:exit, _reason} -> 
           # Do seme book keeping, if task dies while it was waiting for
           # shutdown
          IO.puts("The task died before just it was shutdown")
        nil -> 
          # Do some book kepping, the task was shutdown
          IO.puts("The task was shutdown")
       end
     {:noreply, state}
    end
    
    # This handle_info functions receives the message when the task 
    # finished successfully in time
    def handle_info({_ref, {:ok, msg}}, state) do
        # Do some book keeping, the task finish successfully
        IO.puts("Task finished successfully")
        Process.cancel_timer(state.timer)
    
        {:noreply, %{state | msg: msg}}
     end
    
    # Once the task finised successfully, it exits normally.
    # This handle_info function responds to this message.
    def handle_info({:EXIT, _pid, :normal}, state) do
       # Do some book keeping
       IO.puts("The task exited and finished normally")
    
       {:noreply, state} 
    end
    
    # Finally, when the task, the caller receives a DOWN message.
    # In this case the caller was the GenServer.
    def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
      # Do some book keeping one the task goes down
      IO.puts(state.msg)
    
      {:noreply, state}
    end
    
    end
4 Likes

@joaquinalcerro

  1. I would put Stask.Server under the same supervisor in the Stask.Application.
  2. Unless you intend to process a single message at a time, you should probably keep a map task.pid -> timer_ref in the GenServer.
  3. When killing the task due to timeout, I opted in for Process.exit(task.pid, :kill), since I wanted a hard timeout. It makes that handle_info much simpler - you don’t have to repeat the logic from handling the :DOWN message.

@josevalim I’m not sure what’s the criteria for picking Task.Supervisor.async_nolink vs Task.Supervisor.async. Can you shed some light on that?

In the first case, I don’t need to trap exits and only have to handle the :DOWN message. In the latter, I need to handle both :DOWN and :EXIT. My GenServer is linking to some other processes (RabbitMQ connection), so doing async_nolink seems more convenient (if RabbitMQ connection goes down, I need my GenServer to go down as well).

1 Like

So based on your first two comments the use case was to have a single GenServer coordinating the execution of async tasks?

I think that @josevalim suggested the Task.Supervisor.async because it links the task to the caller and you will be able to trap the exit signal to do the book keeping. The Task.Supervisor.async_nolink will not link the task to the caller so you won’t be able to know when the task fails.

Lets wait for his comments.

Thanks

Yes, that’s exactly my case.

Not really, see the docs:

Compatibility with OTP behaviours

If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.

The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.

Keep in mind that, regardless of how the task created with async_nolink terminates, the caller’s process will always receive a :DOWN message with the same ref value that is held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.

1 Like

Oh… I missed the “Compatibility with OTP behaviours” section. I read the async_nolink/3 documentation and missed the async_nolink/4.

So async_nolink will not link the task to the caller. Without the link, the caller will not receive the :EXIT signal and therefor it will not crash if the task crashes but the caller will always receive the :DOWN message with the result once the task finishes… cool.

Thanks

I might be a few years late :sweat_smile: but here’s what I did today:

defmodule Utils.Worker do
  @moduledoc """
  - Needs:
    - timeout
    - function to run

  """
  @callback run_job(params :: any()) :: any()
  @callback job_ok(params :: any(), result :: any()) :: any()
  @callback job_error(params :: any(), result :: any()) :: any()

  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      @behaviour Utils.Worker
      @task_timeout Keyword.get(opts, :task_timeout) || 10_000
      @task_supervisor Keyword.get(opts, :task_supervisor)

      @test_jobs [
        %{id: 1, config_id: 1},
        %{id: 2, config_id: 1},
        %{id: 3, config_id: 3},
        %{id: 4, config_id: 1},
        %{id: 5, config_id: 2},
        %{id: 6, config_id: 4},
        %{id: 7, config_id: 4},
        %{id: 8, config_id: 5},
        %{id: 9, config_id: 5},
        %{id: 10, config_id: 5},
        %{id: 11, config_id: 5},
        %{id: 12, config_id: 5}
      ]
      use GenServer
      # ************************************************************
      # API (Client)
      # ************************************************************

      def start_link(_) do
        GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
      end

      @impl true
      def init(_) do
        {:ok, %{}}
      end

      def start_jobs(jobs \\ @test_jobs) do
        GenServer.cast(__MODULE__, {:sync_data, jobs})
      end

      # ************************************************************
      # Callbacks (Server)
      # ************************************************************

      @impl true
      def handle_cast({:sync_data, test_jobs}, state) do
        # This is kinda sketchy
        task_map =
          Enum.map(
            test_jobs,
            fn x ->
              task =
                Task.Supervisor.async_nolink(@task_supervisor, fn -> run_job(x) end)

              timer_ref = Process.send_after(self(), {:timeout, task}, @task_timeout)

              {task.ref, %{params: x, timer_ref: timer_ref}}
            end
          )
          |> Map.new()

        {:noreply, Map.merge(state, task_map)}
      end

      # If the task times out
      @impl true
      def handle_info({:timeout, task}, state) do
        Process.exit(task.pid, :timeout)

        {:noreply, state}
      end

      # If the task fails
      @impl true
      def handle_info({:DOWN, task_ref, _type, task_pid, reason}, state) do
        task_state = Map.get(state, task_ref)

        new_state = Map.delete(state, task_ref)

        Process.cancel_timer(task_state.timer_ref)
        # Process.demonitor(task_ref, [:flush])

        job_error(task_state.params, reason)

        {:noreply, new_state}
      end

      # If the task succeeds
      @impl true
      def handle_info({task_ref, result}, state) do
        # The task succeed so we can cancel the monitoring and discard the DOWN message
        task_state = Map.get(state, task_ref)

        new_state = Map.delete(state, task_ref)

        Process.cancel_timer(task_state.timer_ref)
        Process.demonitor(task_ref, [:flush])

        job_ok(task_state.params, result)

        {:noreply, new_state}
      end
    end
  end
end
2 Likes