Postgrex Notifications sometimes doesn't receive notifications on production

I’m using postgrex notifications in production, and sometimes, notifications are not received by the process. There are no error message, it looks like the listening process just never receives the notification.
The weirdest thing is that this only happens sometimes. There are days where this will work consistently, and days that it will fail consistently. I cannot figure out why.
In development, I have never seen it fail.

Here is what I tried

  • using :sys.get_status/2`, I have verified that the process is alive on production
  • using :sys.trace/3, I have traced the processed and verified that it receives no message when a new entry that it should listen to is created
  • I am trying to log all the possible ways the process could fail, however nothing seems to appear in the logs.

I am using postgres 11 (and it says on the postgrex repo that it works for up to version 10.0, I’m not sure though that is just not information not being updated)

I am using elixir 1.7.4 (with the erlang version that comes with the official alpine image)

any idea of how to go further on this is welcome.

Here is the whole code of my module for reference

(the notify functions inside are mainly just sending emails. However these emails are important and I cannot afford to not send them)

defmodule Union.DBListener do
  use GenServer
  import Ecto.Query, except: [update: 2], warn: false
  alias Union.{InvestorAccounts, Repo}

  alias Union.Helpers.Log
  alias Union.InvestorAccounts.InvestorAccount
  alias Union.RegulatedEntities.{Account, BankAccount}
  alias UnionWeb.Emails.{Investors, RegulatedEntity, Support}
  alias UnionWeb.InvestorAccountChannel
  require Logger
  alias Postgrex.Notifications

  @moduledoc """
  listens for specific events on the database and dispatches the relevant notifications
  """

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]}
    }
  end

  @spec start_link([String.t()]) :: {:ok, pid}
  def start_link(channels) do
    GenServer.start_link(__MODULE__, channels, name: __MODULE__)
  end

  @doc """
  When the GenServer starts subscribe to the given channels
  """
  @spec init([String.t()]) :: {:ok, []}
  def init(channels) do
    Logger.info(fn ->
      "Starting #{__MODULE__} with channel subscription: #{channels |> Enum.join(", ")}"
    end)

    pg_config = Repo.config()
    {:ok, pid} = Notifications.start_link(pg_config)

    channels
    |> Enum.map(fn channel ->
      {:ok, ref} = Notifications.listen(pid, channel)
      {:ok, %{channel => ref}}
    end)
    |> Enum.reduce({:ok, {pid, %{}}}, fn
      {:ok, new_ref_map}, {:ok, {pid, ref_map}} -> {:ok, {pid, ref_map |> Map.merge(new_ref_map)}}
      {:error, reason}, _ -> {:error, reason}
      _, {:error, reason} -> {:error, reason}
    end)
  end

  @doc """
  Listen for changes in the users table
  """
  def handle_info({:notification, _pid, _ref, "insert", payload}, _state) do
    with {:ok, payload} <- Jason.decode(payload),
         {:ok, _} <- handle_insert(payload) do
    else
      {:error, message} ->
        Logger.error("error decoding payload #{payload}")
        Logger.error(message)
    end

    {:noreply, :event_handled}
  end

  def handle_info({:notification, _pid, _ref, "update", payload}, _state) do
    with {:ok, payload} <- Jason.decode(payload),
         {:ok, _} <- handle_update(payload) do
    else
      {:error, message} ->
        Logger.error("error decoding payload #{payload}")
        Logger.error(message)
    end

    {:noreply, :event_handled}
  end

  def handle_info({:notification, _pid, _ref, "delete", payload}, _state) do
    with {:ok, payload} <- Jason.decode(payload),
         {:ok, _} <- handle_delete(payload) do
    else
      {:error, message} ->
        Logger.error("error decoding payload #{payload}")
        Logger.error(message)
    end

    {:noreply, :event_handled}
  end

  def handle_info(_, _state), do: {:noreply, :event_received}

  @spec handle_insert(map) :: {:ok, String.t()} | {:error, String.t()}
  defp handle_insert(%{"table" => "users", "data" => %{"name" => name, "email" => email}}) do
    Support.notify_new(%{name: name, email: email})
  end

  defp handle_insert(%{
         "table" => "regulated_entity_accounts",
         "id" => regulated_entity_account_id,
         "data" => %{"full_name" => full_name}
       }) do
    regulated_entity_account =
      Account
      |> Repo.get(regulated_entity_account_id)
      |> Repo.preload(regulated_entity: [], users: [])

    [
      Support.notify_new(%{full_name: full_name}),
      Investors.notify_new(regulated_entity_account),
      RegulatedEntity.notify_new(regulated_entity_account)
    ]
    |> Log.reduce_errors()
  end

  defp handle_insert(payload), do: {:error, "unhandled payload #{IO.inspect(payload)}"}

  # TODO separate this module into a listener and a notifier ?
  # this module shouldn't have the responsability of creating a cash account
  @spec handle_update(map) :: {:ok, String.t()} | {:error, String.t()}
  defp handle_update(%{
         "table" => "regulated_entity_accounts",
         "id" => regulated_entity_account_id,
         "new_data" => %{"status" => "active", "full_name" => full_name},
         "old_data" => %{"status" => "pending"}
       }) do
    regulated_entity_account =
      from(
        rea in Account,
        where: rea.id == ^regulated_entity_account_id,
        preload: [regulated_entity: [], users: []]
      )
      |> Repo.one()

    bank_account =
      from(
        b in BankAccount,
        where: b.regulated_entity_id == ^regulated_entity_account.regulated_entity_id,
        where: b.currency_id == ^regulated_entity_account.currency_id
      )
      |> Repo.one()

    [
      InvestorAccounts.find_or_create_cash_account(regulated_entity_account),
      Support.notify_update(%{full_name: full_name}),
      Investors.notify_update(regulated_entity_account, bank_account)
    ]
    |> Log.reduce_errors()
  end

  defp handle_update(payload), do: {:error, "unhandled payload #{IO.inspect(payload)}"}

  @spec handle_delete(map) :: {:ok, any} | {:error, String.t()}
  defp handle_delete(%{
         "table" => "uncopy_entries",
         "data" => %{"investor_account_id" => investor_account_id, "amount" => amount}
       }) do
    Logger.info(
      "notifying uncopy of #{amount} processed for investor account id #{investor_account_id}"
    )

    from(
      ia in InvestorAccount,
      where: ia.id == ^investor_account_id,
      preload: [method: [], regulated_entity_account: [:cash_account, :currency]]
    )
    |> Repo.one()
    |> InvestorAccountChannel.notify_uncopy_processed(Decimal.new(amount))
    |> (&{:ok, &1}).()
  end

  defp handle_delete(payload), do: {:error, "unhandled payload #{IO.inspect(payload)}"}
end