I might be a few years late but here’s what I did today:
defmodule Utils.Worker do
@moduledoc """
- Needs:
- timeout
- function to run
"""
@callback run_job(params :: any()) :: any()
@callback job_ok(params :: any(), result :: any()) :: any()
@callback job_error(params :: any(), result :: any()) :: any()
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts] do
@behaviour Utils.Worker
@task_timeout Keyword.get(opts, :task_timeout) || 10_000
@task_supervisor Keyword.get(opts, :task_supervisor)
@test_jobs [
%{id: 1, config_id: 1},
%{id: 2, config_id: 1},
%{id: 3, config_id: 3},
%{id: 4, config_id: 1},
%{id: 5, config_id: 2},
%{id: 6, config_id: 4},
%{id: 7, config_id: 4},
%{id: 8, config_id: 5},
%{id: 9, config_id: 5},
%{id: 10, config_id: 5},
%{id: 11, config_id: 5},
%{id: 12, config_id: 5}
]
use GenServer
# ************************************************************
# API (Client)
# ************************************************************
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
@impl true
def init(_) do
{:ok, %{}}
end
def start_jobs(jobs \\ @test_jobs) do
GenServer.cast(__MODULE__, {:sync_data, jobs})
end
# ************************************************************
# Callbacks (Server)
# ************************************************************
@impl true
def handle_cast({:sync_data, test_jobs}, state) do
# This is kinda sketchy
task_map =
Enum.map(
test_jobs,
fn x ->
task =
Task.Supervisor.async_nolink(@task_supervisor, fn -> run_job(x) end)
timer_ref = Process.send_after(self(), {:timeout, task}, @task_timeout)
{task.ref, %{params: x, timer_ref: timer_ref}}
end
)
|> Map.new()
{:noreply, Map.merge(state, task_map)}
end
# If the task times out
@impl true
def handle_info({:timeout, task}, state) do
Process.exit(task.pid, :timeout)
{:noreply, state}
end
# If the task fails
@impl true
def handle_info({:DOWN, task_ref, _type, task_pid, reason}, state) do
task_state = Map.get(state, task_ref)
new_state = Map.delete(state, task_ref)
Process.cancel_timer(task_state.timer_ref)
# Process.demonitor(task_ref, [:flush])
job_error(task_state.params, reason)
{:noreply, new_state}
end
# If the task succeeds
@impl true
def handle_info({task_ref, result}, state) do
# The task succeed so we can cancel the monitoring and discard the DOWN message
task_state = Map.get(state, task_ref)
new_state = Map.delete(state, task_ref)
Process.cancel_timer(task_state.timer_ref)
Process.demonitor(task_ref, [:flush])
job_ok(task_state.params, result)
{:noreply, new_state}
end
end
end
end