Poolboy + AMQP Error: no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

Recently I upgraded to elixir 1.17.3, though I don’t think its to do with that but I have poolboy managing amqp connections and at times it starts failing and I have to restart the application and then everything is fine. Seems there could be a race condition. Would appreciate to understand it

** (exit) exited in: :gen_server.call(#PID<0.3807.0>, {:call, {:"basic.publish", 0, "topgun", "vector-00-dev", false, false}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 1, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "{\"data\":<redacted>, 70000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

amqp = [
     :poolboy.child_spec(:amqp_worker, amqp_poolboy_config())
   ]
opts = [strategy: :one_for_one, name: Maverick.Supervisor]
   Supervisor.start_link(children, opts)

base = telemetry ++ repo ++ cluster ++ pubsub ++ redix ++ cachex ++ cloak
   goose = web ++ email ++ worker ++ broadway_goose
   maverick = web ++ worker ++ amqp ++ broadway_maverick
   wolfman = web ++ email ++ amqp ++ worker ++ broadway_wolfman

   children =
     case Application.fetch_env!(:maverick, :app_level) do
       "GOOSE" -> base ++ goose
       "WOLFMAN" -> base ++ wolfman
       _ -> base ++ maverick
     end

defp amqp_poolboy_config() do
    [
      {:name, {:local, :amqp_worker}},
      {:worker_module, Maverick.Amqp.Broker},
      {:size, Application.get_env(:maverick, :amqp_worker)[:pool_size]},
      {:max_overflow, Application.get_env(:maverick, :amqp_worker)[:max_overflow]}
    ]
  end

 @retry with: exponential_backoff() |> randomize() |> expiry(10_000)
  def publish(%{} = message, topic) do
    msg = Jason.encode!(message)

    :poolboy.transaction(
      :amqp_worker,
      fn worker ->
        AMQP.Basic.publish(AmqpBroker.channel(worker), AmqpBroker.exchange(), topic, msg)
      end,
      @timeout
    )
  end

defmodule Maverick.Amqp.Broker do
  @moduledoc """
  Maverick.Amqp.Broker
  """

  use GenServer

  @exchange "topgun"

  def host(),
    do: Application.fetch_env!(:maverick, :amqp_host)

  def vhost(),
    do: Application.fetch_env!(:maverick, :amqp_vhost)

  def port(),
    do: Application.fetch_env!(:maverick, :amqp_port)

  def username(),
    do: Application.fetch_env!(:maverick, :amqp_username)

  def password(),
    do: Application.fetch_env!(:maverick, :amqp_password)

  def exchange(), do: @exchange

  def start_link(_config) do
    GenServer.start_link(__MODULE__, [])
  end

  @impl true
  def init(_config) do
    {:ok, conn} = connect()
    {:ok, chan} = create_channel(conn)
    :ok = exchange(chan)
    :ok = queues(chan, Maverick.Queue.Gateway.topics())

    # Keep channel open for the GenServer lifecycle
    {:ok, %{connection: conn, channel: chan}}
  end

  @impl true
  def handle_call(:get_channel, _from, state) do
    {:reply, state.channel, state}
  end

  @impl true
  def handle_call({:create_queues, topics}, _from, state) do
    queues(state.channel, topics, @exchange)
  end

  @impl true
  def terminate(_reason, %{connection: connection}) do
    AMQP.Connection.close(connection)
  end

  def channel(worker) do
    GenServer.call(worker, :get_channel)
  end

  def create_queues(queues) do
    GenServer.call(__MODULE__, {:create_queues, queues})
  end

  defp connect() do
    AMQP.Connection.open(
      username: username(),
      password: password(),
      virtual_host: vhost(),
      host: host(),
      port: port(),
      ssl_options: [
        verify: :verify_peer,
        customize_hostname_check: [
          match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
        ],
        # from CAStore package
        cacertfile: CAStore.file_path()
      ]
    )
  end

  defp create_channel(conn) do
    AMQP.Channel.open(conn)
  end

  defp exchange(chan, exchange \\ @exchange) do
    AMQP.Exchange.declare(chan, exchange)
  end

  defp queues(chan, topics, exchange \\ @exchange) do
    Enum.each(topics, fn topic ->
      AMQP.Queue.declare(chan, topic, durable: true)
      AMQP.Queue.bind(chan, topic, exchange, routing_key: topic)
    end)
  end
end

I believe this could be what I am facing, poolboy dishing out dead connections. Any solution to this ?

Can you create a minimal GitHub repo that reproduces the problem?

1 Like

seems I found something I can see below error just before it starts crashing. After that poolboy is giving out dud workers. If my AmqpBroker is crashing shouldn’t poolboy start a new one ? Is there a way to ensure it is started, am I missing something that should be configured in the poolboy or the GenServer to avoid this crashing loop Or is there a way to kill that worker so it is removed from poolboy then can add that to the genserver terminate handler

GenServer #PID<0.3785.0> terminating
** (stop) :socket_closed_unexpectedly
Last message: :socket_closed

Maybe you have an aggressive firewall?

I am on fly.io. Its just now happening every couple of days . Is there a graceful way of handlong this so poolboy gets the next available connection instead of pulling non existent ones ?

I feel I am hitting this at times on the connection status as still not sure what the issue is and why poolboy gives out dead connections