Problems with GenServer used for polling

I have a GenServer that once it’s started, is intended to kick off some work at a regular interval. It doesn’t even care about the result of the work. When the work completes (success or failure of any kind) the GenServer should reschedule the work at the next interval.

The following (abbreviated) snippet of the GenServer code appears to work almost all the time. It will run for days/weeks seemingly without issue. Occasionally, it will stop polling as though Process.send_after/3 was never called. At that point, if I log on to the remote console and manually send it a :poll message the work is started and it will be good until the whole cycle repeats.

I’m sure I’ve misused or misunderstood something with spawn_monitor and/or the :DOWN messages I’m expecting.

Any advice or suggestions are appreciated!

def init(state) do
  schedule_poll(0)
  {:ok, state}
end

def handle_info(:poll, state) do
  spawn_monitor(fn ->
    do_some_work()
  end)
  {:noreply, state}
end

def handle_info({:DOWN, _ref, _, _from, _reason}, state) do
  schedule_poll()
  {:noreply, state}
end

def handle_info(msg, state) do
  {:noreply, state}
end

defp schedule_poll(delay \\ nil),
  do: Process.send_after(self(), :poll, delay || 10 * 60_000)

If you do not care about the result, why even wait for it to finish working before rescheduling? Why not just spawn a task or something that will do the work and reschedule the next poll immediately?

That would probably work, but I don’t really want to run the risk of multiple instances of “work” happening concurrently. Since the work involves downloading and processing files from an FTP site it seemed like a good idea to keep multiple processes from stepping on each other there.

In that case, why are you even spawning another process to do the work? If you only ever want one process doing the work at any given time, you should probably just do the work within the GenServer and just schedule the work afterwards.

def handle_info(:poll, state) do
  do_some_work()
  schedule_poll()
  {:no_reply, state}
end
2 Likes

That’s also a route to consider. It’s in another process mostly for isolation from failure. There’s a decent chance the worker crashes, and I didn’t want it to take the poller down with it. The poller’s supervisor would pick it back up, but the work would probably crash again immediately, and so on.

I’ll think about it some more though, maybe I can isolate the most prevalent crashes in the work itself…

I imagine something in the monitored process is causing it to go on forever and so it never sends a :DOWN message. Have you implemented timeouts in the monitored process to make sure it really does go down?

2 Likes

^^ This.

It sounds to me like you have a process that is never going down, not that there’s a problem with spawn monitor. Honestly, I would consider trapping exits over monitoring here. I would also consider timing out on whatever work to be fatal to the poller as well.

def init(state) when is_map(state) do
  schedule_poll(0)
  Process.flag(:trap_exit, true)
  {:ok, state}
end

def handle_info(:poll, state) do
    task = Task.async(fn ->
    do_some_work()
  end)
  # Add a timeout to the return here
  timeout = 120_000 # 2 minutes
  {:noreply, timeout, Map.put(state, :task, task)}
end

def handle_info({ref, _return}, %{task: %{ref: task_ref}} = state) when ref == task_ref do
  Process.demonitor(ref, [:flush])
  schedule_poll()
  {:noreply, Map.delete(state, :task)}
end

def handle_info({:EXIT, pid, reason}, %{task: %{pid: task_pid, ref: ref}} = state) do
  Process.demonitor(ref, [:flush])
  require Logger
  Logger.error "Task didn't properly exit.\n#{inspect reason}"
  schedule_poll()
  {:noreply, Map.delete(state, :task)
end
# Handle Non-Task Exits
def handle_info({:EXIT, _, reason}, state) do
  exit(reason)
end

# If a task times out then stop the poller, killing the task as well.
# This allows you to keep track of when the task is taking too long as well.
def handle_info(:timeout, state) do
  require Logger
  Logger.error "Task timedout!"
  {:stop, :task_timeout, state}
end

defp schedule_poll(delay \\ nil),
  do: Process.send_after(self(), :poll, delay || 10 * 60_000)

If you can’t use the timeout option then you have the option of also spawning a Task inside of a Task and then using await.

A GenServer will also let you set a timeout in the return of all callback functions.

@poll_period :timer.minutes(10)

def init(state) do
  {:ok, state, 0} # Initial timeout of 0 to trigger immediately
end

def handle_info(:timeout, state) do
  do_some_work()
  {:noreply, state, @poll_period}
end

What’s the reasoning behind spawning yet another process to run do_some_work/0?

Edit:

There’s a decent chance the worker crashes, and I didn’t want it to take the poller down with it.

def handle_info(:timeout, state) do
  try do
    do_some_work()
  rescue
    _ ->
    # Log the error
  end
  {:noreply, state, @poll_period}
end

Doing a long blocking task in a gen_server is a bad idea. It blocks all other calls to the server, including debugging (get_state etc.), and interferes with live code upgrades if used.

The timeout is also reset by debugging calls and so on. It’s useful to have a server shut down or hibernate after some inactivity, but not something you can really rely on for periodic tasks.

Using https://hex.pm/packages/quantum is another option to consider.

1 Like

Thanks everyone. I’m trying something along the lines of what @Azolo suggested. It looks like when I’m trapping exits on the task I will always receive :EXIT messages, even if I’ve already received a result message. Does that make sense? I don’t think it changes much except I can’t stop tracking the task until I’ve received an :EXIT on it. And of course I can’t assume the :EXIT was abnormal anymore.

It’s still worth tracking down the runaway process that I apparently have. It might be worth another thread, but any tips for that?

Oh right, My bad. I would change the task return and add another handle_info to handle the normal exit.

def handle_info({ref, _return}, %{task: %{ref: task_ref}} = state) when ref == task_ref do
  Process.demonitor(ref, [:flush])
  schedule_poll()
  {:noreply, state}
end
def handle_info({:EXIT, pid, :normal), %{task: %{pid: task_pid}} = state) do
  {:noreply, Map.delete(state, :task)}
end
def handle_info({:EXIT, pid, reason}, %{task: %{pid: task_pid, ref: ref}} = state) do
  Process.demonitor(ref, [:flush])
  require Logger
  Logger.error "Task didn't properly exit.\n#{inspect reason}"
  schedule_poll()
  {:noreply, Map.delete(state, :task)
end

Or just listen for it when the task is done.

def handle_info({ref, _return}, %{task: %{ref: task_ref, pid: pid}} = state) when ref == task_ref do
  Process.demonitor(ref, [:flush])
  receive do
    {:EXIT, ^pid, :normal} -> :noop
    {:EXIT, ^pid, reason} -> raise "Task exited with #{inspect reason} after returning."
  after
    3000 -> raise "Task didn't exit after returning."
  end

  schedule_poll()
  {:noreply, Map.delete(state, :task)}
end

I like having reasonable timeouts, but there may be reasons you can’t do that. Either way, you may want look for/create a topic about debugging an apparently zombied/runaway process.