Periodic tasks in a single GenServer -- not everything yet makes sense

I’ve read on GenServer, but am not able yet to use it because not everything yet makes sense to me.

I want this:

  • when a user clicks on a button, a new task begins
  • a task terminates depending on some external condition, say, an external rest-api webservice has returned “terminate = true”
  • if a task is already running, clicking on a button shouldn’t begin a new
  • a task poll an external rest-api webservice every 30 seconds

I was adviced to use a single GenServer. My questions:

  1. how can I associate a new task or its name with a user’s id? so that when he clicks on a button I can check if I need to create a new task or do nothing if there’s already one running?

  2. how can I add/remove an item to MyWorker? Meaning, should I keep a list of users id?

  3. if there’re no tasks, GenServer should stop. I think. Is it a wise thing to do? If so, how can I stop/run it?

  4. to re-run a task every 30 seconds I need to call schedule_work() in “poll_external_service”, correct? Thus, to terminate it, depending on a response from an external web rest service, I merely don’t run schedule_work(), correct?

    defmodule MyWorker do
    use GenServer

     def start_link do
       GenServer.start_link(__MODULE__, [], name: :my_app_worker)
     end
    
     def init(state) do
       schedule_work() 
       {:ok, state}
     end
    
     def add_item(x) do
    
     end
    
     def remove_item(x) do
    
     end
    
    
     defp poll_external_service do
       # a request to an external web server
    
       schedule_work()
     end
    
     defp schedule_work() do
       Process.send_after(self(), :work, 30 * 1000) 
     end
    

    end

1 Like

I may have misunderstood what you’re trying to do, but this might help.

Assuming that you only wanted to store the user ID for each ask, I used a MapSet. If you want to store additional data with the task (such as a specific URL to poll), you should use a Map (with the key being the user ID, as map keys are unique).

In process_tasks/1 we use Enum.filter/2 to both call a function for each task, and to remove tasks that the external API has indicated should be terminated. The case ExternalAPI.poll(id) do is of course, a contrived example. What’s important is that the anonymous function given to Enum.filter/2 returns false if the task should be removed. (You can give Enum.filter/2 a map too, if you go that route. In that event the anonymous function should take a tuple {user_id, data} representing each key-value pair in the map.)

There’s no need to stop the GenServer when there are no tasks. Erlang processes are very lightweight.

defmodule MyWorker do
  use GenServer

  @tick_interval 30_000

  def start_link, do: GenServer.start_link(__MODULE__, nil, name: __MODULE__)

  def add(id), do: GenServer.cast(__MODULE__, {:add, id})

  def remove(id), do: GenServer.cast(__MODULE__, {:remove, id})

  def init(_) do
    tick()

    # Here we use a MapSet as it will reject duplicate items.
    # If you wish to store more data with a task than just a user ID,
    # use a Map instead.
    {:ok, MapSet.new}
  end

  defp tick, do: Process.send_after(self(), :tick, @tick_interval)

  def handle_cast({:add, id}, tasks), do: {:noreply, MapSet.put(tasks, id)}

  def handle_cast({:remove, id}, tasks), do: {:noreply, MapSet.delete(tasks, id)}

  def handle_info(:tick, tasks) do
    tasks = process_tasks(tasks)

    tick()

    {:noreply, tasks}
  end

  defp process_tasks(tasks) do
    tasks_list =
      Enum.filter tasks, fn(id) ->
        # Here we process the individual task, such as polling
        # the external API.
        case ExternalAPI.poll(id) do
          # We return true from this anonymous function if the task should
          # stay, and false if the task should be removed.
          "terminate = true" -> true
          "terminate = false" -> false
        end
      end

    # `Enum.filter/2` returns a list, so we must convert it back into a MapSet.
    MapSet.new(tasks_list)
  end

  def handle_info(_, tasks), do: {:noreply, tasks}
end
1 Like

Thanks.

In you example In process_tasks a list of tasks is iterated over and an http request ExternalAPI.poll is sent syncronously and therefore waited for a response syncronously as well. Is that a wise thing to do? Why not create for each http request a separate thread?

You’re right, that is better. Instead of Enum.filter/2 you might consider just using Enum.each/2 and calling MyWorker.remove/1 from the process handling the individual task if the task should be removed.

1 Like

Should that be done via spawn? something like this:

spawn(Helloer, :my_func, [123, "something"])

Then how would I do filtering if all them are in different thread? Will I have to wait something for all them for finish in each iteration?

I would use a Task.

As I mentioned, if you go that route you may not want to use Enum.filter/2. Instead, call the remove/1 function from inside the Task process with the user ID of the task to be removed.

Enum.each tasks, fn(id) ->
  Task.start fn ->
    if ExternalAPI.poll(id) == "terminate = true", do: remove(id)
  end
end

Also, how is an instance of MapSet stored between http requests? I mean, an http request to an html page: a user reloads a page with a button and … a list of tasks is gone? In a session or is there more idiomatic way?

I’m not entirely sure what you’re asking.

The MapSet is stored as the MyWorker GenServer’s state on the server. The MyWorker GenServer is a continuously running process that should be started as part of your application’s supervision tree when your application starts. You should not be starting and stopping it on demand when a user loads or reloads a page.

There’s no variable in MyWorker which stores MapSet. Where is it stored?

Will you show me a simple example of a client of MyWorker?

The MyWorker uses the :name option as part of the start_link call, which means that the process is registered under a global (node-wide) name. The other calls that use GenServer.call and GenServer.cast use the same name to access it later.

So this means that the process will remain available after starting until your application closes, the process crashes, or you call GenServer.stop/2 from somewhere. In any case, the process will remain regardless of the connecting/disconnecting of users to your web-facing interface.

1 Like

that’s not what I asked about.

The MapSet is referred to as tasks.

Using the MyWorker module might look as follows:

in a supervisor, possibly the root application supervisor

children = [
  ...
  worker(MyWorker, [])
]

adding user tasks

# adding a task with user ID 1
MyWorker.add(1)

# adding a second task with user ID 2
MyWorker.add(2)

removing a user task

MyWorker.remove(1)

Where is an instance of MapSet.new which is “tasks” actually stored?

He already showed this code here, it is at the bottom of the init call. ^.^

That’s not where it’s stored, that’s where it’s created.

And also where it is stored, as that is the returned state that is then threaded through all other calls.

Does GenServer handle it under the hood? Where in the documentation is that described?

Yes, and it is described in the GenServer documentation. :slight_smile:

Imagine genserver implemented this way in pseudo-code:

def start_link(user_module, args) do
  {:ok, spawn_link(fn -> gen_server_init(user_module, args) end)}
end

def gen_server_init(user_module, args) do
  {:ok, state} = user_module.init(args)
  gen_server_loop(user_module, state)
end

def gen_server_loop(user_module, state) do
  receive do
    {:__special_genserver_cast__, msg} ->
      {:noreply, newstate} = user_module.handle_cast(msg, state)
      gen_server_loop(user_module, newstate)
    {:__special_genserver_call__, msg, from} ->
      {:reply, outmsg, newstate} = user_module.handle_call(msg, from, state)
      send(from, outmsg)
      gen_server_loop(user_module, newstate)
    unknown_msg ->
      {:noreply, newstate} = user_module.handle_info(unknown_msg, state)
      gen_server_loop(user_module, newstate)
  end
end

And so forth, but with significantly more error checking and cases and such. GenServer is just an infinite receiving loop passing the state from iteration to iteration. :slight_smile:

2 Likes

I would recommend you to have a look at the Registry module.

First, some considerations:

  1. Say you run one GenServer per user;
  2. And you have a rescheduler as a separate GenServer process.

Tackling the first part, which involves associating a user’s id with a process, start one if one is not running and keeping a list of executing processes based on the user’s name, can be done using the following construct:

defmodule MyWorker do
  use GenServer

  @doc """
  Should be called before `start_worker`.
  """
  def start_link() do
    Registry.start_link(:unique, __MODULE__)
  end

  @doc """
  Starts a worker process for a given `user` to handle the given `task`.
  """
  def maybe_start_worker(user, task) do
    case Registry.lookup(__MODULE__, user) do
      [{pid, _value}] ->
        {:ok, pid}  # do nothing, already running
      [] ->
        name = {:via, Registry, {__MODULE__, user}}
        GenServer.start_link(__MODULE__, task, name: name)
    end
  end

  def init(task) do
    self() |> Process.send_after(:process_task, 0) 
    {:ok, task}
  end

  def handle_info(:process_task, task) do
    # do something with the task
    access_external_service(task)

    # finishes the process, nothing else to do for the moment
    {:stop, :normal, task}
  end

  defp access_external_service(task) do
    # a request to an external web server
  end
end

Assuming the above, then now you could create another process to re-run the worker process every 30 seconds. This another process can hold a simple Map where the key is the user’s id and the value is the task. Just call MyScheduler.add_item/2 when the user clicks on the button, followed by MyWorker.start_worker/2 if you want to process the task immediately, otherwise, the worker will be executed 30 seconds later by the scheduler.

defmodule MyScheduler do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def add_item(user, task) do
    GenServer.call(__MODULE__, {:add_item, user, task})
  end

  def remove_item(user) do
    GenServer.call(__MODULE__, {:remove_item, user})
  end

  def init(map), do: {:ok, map}

  def handle_info({:reschedule, user, task}, map) do
    MyWorker.maybe_start_worker(user, task)
    timer = self() |> Process.send_after({:reschedule, user, task}, 30_000)  # reschedule
    {:noreply, %{map | user => {task, timer}}}
  end

  def handle_call({:add_item, user, task}, map) do
    timer = self() |> Process.send_after({:reschedule, user, task}, 30_000)
    {:reply, :ok, map |> Map.put(user, {task, timer})
  end

  def handle_call({:remove_item, user}, map) do
    map =
      map |> Map.get_and_update(user, fn {_task, timer} ->
        timer |> Process.cancel_timer()
        :pop
      end)
    {:reply, :ok, map}
  end
end

With the above:

  1. Say your process takes more than 30 seconds to execute; suppose 45 seconds. Then, when the scheduler gets the first timer event, it won’t start a new process at that moment; as it keeps retrying every 30 seconds, the task will be executed in the next try, after 60 seconds.
  2. In order to stop rescheduling, execute MyScheduler.remove_item/1 for the same user ID you used in MyScheduler.add_item/2.
1 Like