I would recommend you to have a look at the Registry module.
First, some considerations:
- Say you run one
GenServer
per user;
- And you have a rescheduler as a separate
GenServer
process.
Tackling the first part, which involves associating a user’s id with a process, start one if one is not running and keeping a list of executing processes based on the user’s name, can be done using the following construct:
defmodule MyWorker do
use GenServer
@doc """
Should be called before `start_worker`.
"""
def start_link() do
Registry.start_link(:unique, __MODULE__)
end
@doc """
Starts a worker process for a given `user` to handle the given `task`.
"""
def maybe_start_worker(user, task) do
case Registry.lookup(__MODULE__, user) do
[{pid, _value}] ->
{:ok, pid} # do nothing, already running
[] ->
name = {:via, Registry, {__MODULE__, user}}
GenServer.start_link(__MODULE__, task, name: name)
end
end
def init(task) do
self() |> Process.send_after(:process_task, 0)
{:ok, task}
end
def handle_info(:process_task, task) do
# do something with the task
access_external_service(task)
# finishes the process, nothing else to do for the moment
{:stop, :normal, task}
end
defp access_external_service(task) do
# a request to an external web server
end
end
Assuming the above, then now you could create another process to re-run the worker process every 30 seconds. This another process can hold a simple Map
where the key is the user’s id and the value is the task. Just call MyScheduler.add_item/2
when the user clicks on the button, followed by MyWorker.start_worker/2
if you want to process the task immediately, otherwise, the worker will be executed 30 seconds later by the scheduler.
defmodule MyScheduler do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_item(user, task) do
GenServer.call(__MODULE__, {:add_item, user, task})
end
def remove_item(user) do
GenServer.call(__MODULE__, {:remove_item, user})
end
def init(map), do: {:ok, map}
def handle_info({:reschedule, user, task}, map) do
MyWorker.maybe_start_worker(user, task)
timer = self() |> Process.send_after({:reschedule, user, task}, 30_000) # reschedule
{:noreply, %{map | user => {task, timer}}}
end
def handle_call({:add_item, user, task}, map) do
timer = self() |> Process.send_after({:reschedule, user, task}, 30_000)
{:reply, :ok, map |> Map.put(user, {task, timer})
end
def handle_call({:remove_item, user}, map) do
map =
map |> Map.get_and_update(user, fn {_task, timer} ->
timer |> Process.cancel_timer()
:pop
end)
{:reply, :ok, map}
end
end
With the above:
- Say your process takes more than 30 seconds to execute; suppose 45 seconds. Then, when the scheduler gets the first timer event, it won’t start a new process at that moment; as it keeps retrying every 30 seconds, the task will be executed in the next try, after 60 seconds.
- In order to stop rescheduling, execute
MyScheduler.remove_item/1
for the same user ID you used in MyScheduler.add_item/2
.