Here’s what I came up with as an exercise. Some small liberties taken based on how I set up Phoenix.PubSub
, but hopefully still understandable.
defmodule MyApp.TaskManager do
use GenServer
@pubsub_topic "really_important_tasks"
@doc """
Starts a singleton task manager.
"""
def start_link(arg) do
GenServer.start_link(__MODULE__, arg, name: __MODULE__)
end
@doc """
Starts an async task and broadcasts the result or failure.
"""
def async_task(id, fun) do
GenServer.cast(__MODULE__, {:async_task, id, fun})
end
@impl true
def init(opts) do
{:ok, %{supervisor: Keyword.fetch!(opts, :supervisor), tasks: %{}}}
end
@impl true
def handle_cast({:async_task, id, fun}, %{supervisor: supervisor, tasks: tasks} = state) do
%Task{ref: ref} = Task.Supervisor.async_nolink(supervisor, fun)
{:noreply, %{state | tasks: Map.put(tasks, ref, id)}}
end
@impl true
def handle_info({ref, result}, state) when is_reference(ref) do
{:noreply, broadcast_result(state, ref, {:ok, result})}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, reason}, %{tasks: tasks} = state) do
{:noreply, broadcast_result(state, ref, {:error, reason})}
end
defp broadcast_result(%{tasks: tasks} = state, ref, result) do
if id = tasks[ref] do
MyApp.PubSub.broadcast(@pubsub_topic, {:task, id, result})
end
%{state | tasks: Map.delete(tasks, ref)}
end
end
# when starting your application
children = [
MyApp.PubSub,
{Task.Supervisor, name: MyApp.TaskSupervisor},
{MyApp.TaskManager, supervisor: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
# to start tasks
MyApp.TaskManager.async_task("some_important_thing", &do_the_thing/0)
# if you care about results
MyApp.PubSub.subscribe("really_important_tasks:*")
(Having written this out, I think it’s pretty well worth the ~50 lines of code to have control over and insight into task execution.)