How can I get a GenServer handler to emit a notification when a function takes too long to complete?

defmodule Server.AccountMonitor do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(state) do
    Process.flag(:trap_exit, true)
    Process.send(self(), :work, state)

    {:ok, state}
  end

  @impl true
  def handle_info(:work, state) do
    start_time = DateTime.utc_now()
    {:ok, :ok} = LogBook.main("Updating . . . ", __MODULE__, 30, :print_to_screen)
    {:ok, response} = DoStuff.some_expensive_fun()
    finish_time = DateTime.utc_now()
    elapsed_time = DateTime.diff(finish_time, start_time, :second)

    {:ok, :message_logged} =
      LogBook.main(
        "Update complete. Elapsed time: #{elapsed_time} second(s).",
        __MODULE__,
        39,
        :quiet
      )

    DoStuff.main(response)

    Process.send(self(), :work, state)

    {:noreply, state}
  end
end

Right now, this is logging a message when DoStuff.some expensive fun() completes, but how would I go about having a message emitted instead if it hasn’t completed after a given number of seconds?

Hmmm, one solution that i could think of is spawn a Task for logging which await X seconds then log that “work hasn’t finished under X seconds”. then after work finished cancel that Task so it doesn’t log if finished.

log_task = Task.start(fn -> :timer.sleep(x_second); Logger.info("Task hasn;t finished") end)
work...
Task.shutdown(log_task)

This usage looks pretty bad to me though, ideally there’s a controller process which supervise worker process (which do work) and can do logging if work doesn’t finish at certain time.

4 Likes

Interesting approach. It seems you recommend wrapping the Task within a supervised GenServer, correct?

Do the expensive stuff in some other process :slight_smile: so your supervised process does not block. I use a simple spawn_link below but should probably be a supervised Task instead, this will show you the idea though.

def handle_info(:work state) do
  parent = self()
  ref = make_ref()
  Process.send_after(self(), {:work_timeout, ref}, 10_000)
  
  pid = spawn_link(fn -> 
    result = # do the heavy work in here..
    send(parent, {:work_complete, ref, result})
  end)

  # save pid and ref in your state
  {:noreply, state}
end

def handle_info({:work_complete, ref, result}, state) do
  # handle work_complete here, match on "ref" and remove ref from your state
end

def handle_info({:work_timeout, ref}, state) do
  # handle timeout here
end
4 Likes

wrapping the Task within a supervised GenServer

yes, that could be a concrete implementation of having a “controller process which supervise worker process”. that being said i am not really familiar with developing GenServer for production and it’s best practice. There’s seems to be a bestpractice/guideline on how to wrap task inside genserver on Task — Elixir v1.14.2 (hexdocs.pm)

1 Like

I once implemented a similar pattern for latency tail chopping pattern (based on these actor design in Akka) Interaction Patterns • Akka Documentation

put simply: There’s a supervisor process, which would spawn worker process to do API work. if worker process doesn’t respond with answer in X threshold second, supervisor would spawn another worker process in hope it’s able to do the work faster.

1 Like
defmodule Supervisor.PyOperatorManager do
  use Supervisor

  @timeout 60_000

  def start_link(_) do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    Process.flag(:trap_exit, true)

    children = [
      :poolboy.child_spec(:py_pool,
        name: {:local, :py_pool},
        worker_module: Server.PyOperator,
        size: 10,
        max_overflow: 20
      )
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  def launch(data \\ [], py_module, py_lambda) do
    :poolboy.transaction(
      :py_pool,
      fn pid ->
        GenServer.call(pid, {data, py_module, py_lambda}, @timeout)
      end,
      @timeout
    )
  end
end




defmodule Server.PyOperator do
  use GenServer
  use Export.Python

  @timeout 2_000

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  @impl true
  def init(state) do
    Process.flag(:trap_exit, true)

    priv_path = Path.join(:code.priv_dir(:arbit), "python")
    {:ok, py_pid} = Python.start_link(python_path: priv_path)
    {:ok, Map.put(state, :py_pid, py_pid)}
  end

  @impl true
  def handle_call(
        {[] = args, "python_script" = py_module, "main" = py_lambda},
        _from,
        %{py_pid: py_pid} = state
      ) do
    parent = self()
    ref = make_ref()
    Process.send_after(self(), {:work_timeout, ref}, @timeout)

    python_script_results = Python.call(py_pid, py_module, py_lambda, [args])
    send(parent, {:work_complete, ref, python_script_results})

    {:noreply, state}
  end

  @impl true
  def handle_call({args, py_module, py_lambda}, _from, %{py_pid: py_pid} = state) do
    python_script_results = Python.call(py_pid, py_module, py_lambda, [args])
    {:reply, python_script_results, state}
  end

  @impl true
  def handle_info({:work_complete, _ref, python_script_results}, state) do
    {:reply, python_script_results, state}
  end

  def handle_info({:work_timeout, _ref}, state) do
    LogBook.main(
      "Still waiting . . . . Elapsed time: #{@timeout / 1000} second(s).",
      __MODULE__,
      90,
      :warning
    )

    {:noreply, state}
  end

  @impl true
  def handle_info({:EXIT, _, :normal}, state), do: {:noreply, state}

  @impl true
  def handle_info({:DOWN, _, :process, _, :normal}, state), do: {:noreply, state}

  @impl true
  def terminate(_reason, %{py_pid: py_pid}) do
    Python.stop(py_pid)
    :ok
  end
end




defmodule Server.AccountMonitor do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(state) do
    Process.flag(:trap_exit, true)
    Process.send(self(), :kickoff_update, state)

    {:ok, state}
  end

  @impl true
  def handle_info(
        :kickoff_update,
        state
      ) do
    RunAncillaryJob.launch()

    Process.send(self(), :run_update, state)
    {:noreply, state}
  end

  @impl true
  def handle_info(:run_update, state) do
    script_response = Supervisor.PyOperatorManager.launch([], "python_script", "main")

    {:noreply, state}
  end

  def handle_info({:reply, script_response}, state) do
    {:ok, _} = DoStuff.main(script_response)

    Process.send(self(), :run_update, state)

    {:noreply, state}
  end

  @impl true
  def handle_info(critical_error, state), do: {:stop, critical_error, state}

  @impl true
  def terminate(critical_error, _state),
    do: 
    LogBook.main(
      {:glitch, critical_error: critical_error, state: state},
      __MODULE__,
      66,
      :error
    )
end

Error Message:

{{:bad_return_value,
   {:reply,
    {:ok,
     #response_data},
    %{py_pid: #PID<0.440.0>}}},
  {GenServer, :call, [#PID<0.439.0>, {[], #python_function, "main"}, 60000]}}

What are your thoughts on this strategy? Oh, and what particularly does this error message refer to, and how should it be handled?

I think you can not :reply in handle_info/2 , that’s the error.

2 Likes
  def handle_info({:work_complete, from, python_script_results}, state) do
    GenServer.reply(from, python_script_results)
    {:noreply, state}
  end

Thanks for your help :slight_smile:

My current problem is that {:work timeout, ref} is still emitted even though python script results is completed far before the timeout. Any thoughts on how to resolve this?

You have to cancel timer see Process.cancel_timer/1 Process — Elixir v1.14.2

timeout_reference = Process.send_after(self(), {:work_timeout, ref}, @timeout)
...
Process.cancel_timer(timeout_reference)
flush()

basically you can do it after you receive reply from your script.

3 Likes

Thats why you keep track of that ref :slightly_smiling_face: for instance if you get the work_complete message first you can just remove the ref from the state and when the work_timeout comes you just check your state. If the ref you get here is the same as in the state then the timeout occurred, otherwise if you get another ref or there is no ref in state you can just ignore that message.
So in this case ref is like a flag, indicating a job is running:)

3 Likes