Ecto & reacting to database becoming unavailable?

Hey all,

Newish to Elixir and trying to deepen my knowledge by experimenting, and I got caught out by how Ecto.Repo behaves. I’m hoping someone here can explain why Ecto works how it does, and perhaps can offer ways of accomplishing what I had in mind (even if it’s a bit weird what I’m trying, but it’s to learn!)

Using a normal Phoenix app, if I shut down the database and start the server it immediately floods with repeated error messages:

[error] Postgrex.Protocol (#PID<0.332.0>) failed to connect: ** (DBConnection.ConnectionError) tcp connect (localhost:5432): connection refused - :econnrefused
[error] Postgrex.Protocol (#PID<0.334.0>) failed to connect: ** (DBConnection.ConnectionError) tcp connect (localhost:5432): connection refused - :econnrefused
...

I assumed that was MyApp.Repo failing, but the PIDs are all different and don’t match Process.whereis(MyApp.Repo). So… I guess those errors are probably connection-pool processes crashing, maybe supervised by MyApp.Repo?

What I had wanted to do was to take manual control over MyApp.Repo, and then when it crashed I wanted to let the app slip into “maintenance mode”, and I could manually restart the repo. But that’s a no-go, since the repo isn’t actually crashing.

  1. Can anyone provide some context or link to resources that explain the repo behavior? I’ve looked around hexdocs but have so far not come away with more knowledge on how the sub-processes work and if there’s a mechanism for observing them.
  2. Anyone have suggestions for how to react to the database dropping out? I do understand Repo auto-reconnects and I can just leave it as-is, but I’m on a learning-journey and I figured a nice experiment was to prevent the wall of error-messages that floods in from disconnects.
3 Likes

The first thing you need to read about to understand what is going on is Supervisors.

What you are experiencing is a default behavior of Ecto (this has nothing to do with Phoenix). In 99% of production apps, inability to connect to a data source is considered as a fault the system should not recover from, critical data for business logic and application runtime is stored in database usually.

The core resources that will guide you towards customizing this behavior is:

  1. DynamicSupervisor — Elixir v1.14.5
  2. Supervision trees and applications — Elixir v1.16.1
  3. Replicas and dynamic repositories — Ecto v3.11.2

Feel free to ask more focused questions as you progress along in this direction.

4 Likes

After reading on dynamic repositories I’d look seriously into using that. Then you can supervise it in a more lax manner – it’s very doable and I saw it done once but of course I can’t keep my customers’ code so I have no example in hand.

But yes you can do it, and no you likely shouldn’t do it the normal way. You can and should still create a new module and have use Ecto.Repo inside but f.ex. you shouldn’t put it inside your supervision tree because these children there are non-negotiable startup dependencies.

2 Likes

D4no0, thanks for links, I understood from them how the repo has an underlying dynamic supervisor process that gets started, which controls the connection pool processes.

iex(3)> DynamicSupervisor.count_children(MyApp.Repo)
%{active: 1, workers: 1, supervisors: 0, specs: 1}
iex(7)> p = Process.whereis(MyApp.Repo)
#PID<0.333.0>
iex(9)> Supervisor.which_children(p)
[{DBConnection.ConnectionPool, #PID<0.334.0>, :worker, [Ecto.Repo.Supervisor]}]

I need to learn if/how I can dive into that supervising process, and then… maybe I can get a status from that process, or maybe I have to scan its children’s statuses? I’m still chasing some way of reacting to those database connection errors by then stopping the MyApp.Repo process (implying I’ve taken it out of main supervisor and control its lifecycle myself), but I’m not even sure yet which questions to ask to dig into that journey. And I still feel like it’s 50/50 if I’m just completely misunderstanding the whole thing :slight_smile:

I’ll start experimenting, but of course if you or anyone else have specific ideas/pointers for reacting to those connection pool process errors I’m all ears.

One way of doing it, is to monitor the connection processes using Process.monitor/1. You can read about a potential approach dealing with this here: How can I monitor when a process is restarted.

If you are only interested in disabling the repo when there is no connection to the database, a much easier solution is to create a process that pings the database and either starts or stops the repo supervisor.

1 Like

If you’re just looking for Ecto-related stuff to read through you should also check out the docs for Ecto.Adapters.SQL and DBConnection (which are in separate repositories).

https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.html

https://hexdocs.pm/db_connection/DBConnection.html

Of particular interest to me was the code for connection sandboxing in tests which can be found here and the connection ownership code here - it’s a neat trick, and the abstraction holds for a surprisingly long time before it starts to leak. I remember having to track this code down when I was trying to write tests for functions which start async processes which then access the DB. Unfortunately the Sandbox docs don’t mention the $callers trick used by DBConnection.Ownership outright, and I remember it taking me a while to confirm that’s actually how it worked.

2 Likes

I ended up exploring and learning which has been a lot of fun! Thanks for the suggestions and links to documentations.

I ended up finding Ecto can be configured with backoff_type: :stop, which disables its reconnection strategy. That means the repo just crashes when disconnecting, which let me put a monitor on the repo process and react by opening a circuit breaker. I hacked together a rough proof of concept where I register my guard instead of the repo:

$ cat lib/my_app/application.ex
  def start(_type, _args) do
    children = [
      {MyApp.GuardedRepo, repo: MyApp.Repo},

And then I can use MyApp.GuardedRepo like this:

iex(1)> MyApp.GuardedRepo.status
:closed
iex(2)> MyApp.GuardedRepo.query("SELECT NOW();")
[debug] QUERY OK db=1.1ms decode=0.6ms queue=0.5ms idle=1196.9ms
SELECT NOW(); []
{:ok,
 %Postgrex.Result{
   command: :select,
   columns: ["now"],
   rows: [[~U[2024-03-10 21:39:34.368015Z]]],
   num_rows: 1,
   connection_id: 96928,
   messages: []
 }}
<in another terminal I run `pg_ctl -D priv/db stop`. 
  Some error-logging still show as the Ecto process crashes, but only a few lines. 
  I've deleted those here to keep things easier to read>
iex(3)> MyApp.GuardedRepo.status
:open
iex(4)> MyApp.GuardedRepo.query("SELECT NOW();")
{:error, :open}

This GuardedRepo module can also attempt a reconnect, which will close the circuit if the connection succeeds and let queries go through again. It doesn’t have all the proper circuit-breaker logic to automatically close, etc., I figure it’s good enough for a quick hackathon to get this far and the rest would be kind of icing on the cake that could be added in later iterations.

This is literally the first time I’ve used GenServers or Supervisors so I assume the code for this might be hot fire, but I’d be happy to hear reactions if someone wants to type in their spit-takes :joy::

defmodule MyApp.GuardedRepo do
  @moduledoc """
  * `GuardedRepo`: This is the public interface. It also manages child processes: `StateManager` & `RepoSupervisor`.
  * `StateManager`: Holds guard-state, including a "circuit breaker" status that's `:closed` when the database is available, and `:open` when unavailable.
  * `RepoSupervisor`: Supervisor that "firewalls" the Ecto repo process so the repo can crash (when database becomes unavailable) without affecting `GuardedRepo` functionality. It manages child processes: `RepoStarter` & `RepoObserver`
  * `RepoStarter`: A simple wrapper to start the Ecto repo and signal to `StateManager` if the repo managed to start (because Ecto repos crash if no database is available)
  * `RepoObserver`: This monitors the Ecto repo process, and if it crashes it notifies `StateManager`
  """

  use Supervisor

  def start_link(args), do: Supervisor.start_link(__MODULE__, args, name: __MODULE__)

  def query(sql, params \\ [], opts \\ []) do
    case status() do
      :closed ->
        repo = __MODULE__.StateManager |> Process.whereis() |> GenServer.call(:get_repo)
        repo.query(sql, params, opts)

      :open ->
        {:error, :open}
    end
  end

  def attempt_circuit_close do
    case status() do
      :closed ->
        {:error, :already_closed}

      :open ->
        :ok = Supervisor.terminate_child(__MODULE__, __MODULE__.RepoSupervisor)
        {:ok, _pid} = Supervisor.restart_child(__MODULE__, __MODULE__.RepoSupervisor)
    end
  end

  def status, do: __MODULE__.StateManager |> Process.whereis() |> GenServer.call(:get_status)

  @impl Supervisor
  def init(args) do
    repo = Keyword.get(args, :repo)

    children = [
      {__MODULE__.StateManager, repo: repo},
      %{
        id: __MODULE__.RepoSupervisor,
        start: {__MODULE__.RepoSupervisor, :start_link, [[repo: repo]]},
        restart: :permanent
      }
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

defmodule MyApp.GuardedRepo.StateManager do
  use GenServer

  def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)

  @impl true
  def init(args) do
    repo = Keyword.get(args, :repo)
    {:ok, %{repo: repo, status: :open}}
  end

  @impl true
  def handle_call(:get_repo, _from, state), do: {:reply, state.repo, state}
  @impl true
  def handle_call(:get_status, _from, state), do: {:reply, state.status, state}
  @impl true
  def handle_call(:close_circuit, _from, state), do: {:reply, :ok, %{state | status: :closed}}
  @impl true
  def handle_call(:open_circuit, _from, state), do: {:reply, :ok, %{state | status: :open}}
end

defmodule MyApp.GuardedRepo.RepoSupervisor do
  use Supervisor

  def start_link(args), do: Supervisor.start_link(__MODULE__, args, name: __MODULE__)

  @impl true
  def init(args) do
    repo = Keyword.get(args, :repo)

    children = [
      MyApp.GuardedRepo.RepoObserver,
      %{
        id: MyApp.GuardedRepo.RepoStarter,
        start: {MyApp.GuardedRepo.RepoStarter, :start_link, [[repo: repo]]},
        restart: :temporary
      }
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

defmodule MyApp.GuardedRepo.RepoStarter do
  use GenServer

  def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)

  def init(args) do
    repo = Keyword.get(args, :repo)

    case repo.start_link() do
      {:ok, pid} ->
        MyApp.GuardedRepo.RepoObserver
        |> Process.whereis()
        |> GenServer.call({:start_monitoring, pid})

        MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:close_circuit)
        {:ok, :started}

      {:error, reason} ->
        MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:open_circuit)
        {:error, reason}
    end
  end
end

defmodule MyApp.GuardedRepo.RepoObserver do
  use GenServer

  def start_link(_no_args \\ []), do: GenServer.start_link(__MODULE__, :no_args, name: __MODULE__)

  @impl true
  def init(_args), do: {:ok, %{monitoring_started: false, monitor_ref: nil, repo_pid: nil}}
  @impl true
  def handle_call({:start_monitoring, repo_pid}, _from, state),
    do:
      {:reply, :ok,
       %{
         state
         | monitoring_started: true,
           monitor_ref: repo_pid |> Process.monitor(),
           repo_pid: repo_pid
       }}

  @impl true
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
    MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:open_circuit)
    {:noreply, %{state | repo_pid: nil}}
  end
end

As I hope is clear from this thread I don’t mean for this solution to be some viable new great approach, I just think its a good way to learn by pushing against default patterns to see why a system is put together the way it is. If someone can spot easier ways to solve what I’ve done above I’d love to hear it, and don’t assume I know what might seem obvious to you because I’m just beginning my dive into Elixir and I probably have blindspots the size of all of Elixir. So any form of feedback is appreciated.

6 Likes