Genserver/poolboy - How to relaunch a new instance of a process before it's done?

defmodule Server.Bar do
  use GenServer
  use Export.Python
  require Logger

  @ms_sleep_interval 500

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  @impl true
  def init(pid) do
    Process.flag(:trap_exit, true)

    Print.text("Launching #{__MODULE__} . . . ")
    Logger.info("Launching #{__MODULE__} . . . ")
    Supervisor.PyOperatorManager.launch([], "baz", "main")
    |> Foo.main()

    Process.send_after(self(), :tick, @ms_sleep_interval)
    {:ok, pid}
  end

  @impl true
  def handle_info(:tick, state) do
    Process.send_after(self(), :tick, @ms_sleep_interval)
    Print.text("Running #{__MODULE__} . . . ")
    Logger.info("Running #{__MODULE__} . . . ")

    Supervisor.PyOperatorManager.launch([], "baz", "main")
    |> Foo.main()

    {:noreply, state}
  end
end

defmodule Supervisor.PyOperatorManager do
  use Supervisor

  @timeout 60_000

  def start_link(_) do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    Process.flag(:trap_exit, true)

    children = [
      :poolboy.child_spec(:py_pool,
        name: {:local, :py_pool},
        worker_module: Server.PyOperator,
        size: 15,
        max_overflow: 20
      )
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  def launch(data \\ [], py_module, py_lambda) do
    :poolboy.transaction(
      :py_pool,
      fn pid ->
        GenServer.call(pid, {data, py_module, py_lambda}, @timeout)
      end,
      @timeout
    )
  end
end

defmodule Server.PyOperator do
  use GenServer
  use Export.Python
  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  @impl true
  def init(state) do
    Process.flag(:trap_exit, true)

    priv_path = Path.join(:code.priv_dir(:arbit), "python")
    {:ok, py} = Python.start_link(python_path: priv_path)
    {:ok, Map.put(state, :py, py)}
  end

  @impl true
  def handle_call({args, py_module, py_lambda}, _from, %{py: py} = state) do
    results = Python.call(py, py_module, py_lambda, [args])

    {outcome, output} = results

    {outcome, Jason.decode!(output)}
    |> LogBook.write_to_log(__MODULE__, 26)

    {:reply, results, state}
  end

  @impl true
  def terminate(_reason, %{py: py}) do
    Python.stop(py)
    :ok
  end
end

With this setup results come in every four or five seconds rather than every half-second. What am I missing?

PyOperatorManager.launch will block until the GenServer.call inside the poolboy transaction returns.

As a result, Server.Bar.handle_info will be blocked - if the call to launch takes too long then Process.send_after will enqueue a tick message 500ms later as expected, but the GenServer will not check its message queue until the work is done.

To prevent this you’ll need to change the structure of your code to ensure control returns from Server.Bar.handle_info promptly, and handle passing the results of launch to Foo.main separately.

1 Like

Thank you for taking the time to explain everything so thoroughly. Do you have any recipes for such an implementation that you could point me to? I’m still stumped as to how to get GenServer unblocked so that it can handle the tick message before the first process completes.

The GenServer unblocks when the callback (handle_cast/handle_call/handle_info) returns control to the receive loop.

For instance, a handle_call can return {:noreply, state}, which leaves the sender of the call still waiting for a reply. Typical approaches for getting the reply back to the client use GenServer.reply either:

  • from a handle_info callback later, fetching the from tuple from the GenServer’s state
  • from some other process that’s been handed the from tuple

Here’s a longer example of the first kind (note the date - I didn’t see any obvious incompatibilities, but I didn’t run the code):

1 Like