I ended up exploring and learning which has been a lot of fun! Thanks for the suggestions and links to documentations.
I ended up finding Ecto can be configured with backoff_type: :stop
, which disables its reconnection strategy. That means the repo just crashes when disconnecting, which let me put a monitor on the repo process and react by opening a circuit breaker. I hacked together a rough proof of concept where I register my guard instead of the repo:
$ cat lib/my_app/application.ex
def start(_type, _args) do
children = [
{MyApp.GuardedRepo, repo: MyApp.Repo},
And then I can use MyApp.GuardedRepo
like this:
iex(1)> MyApp.GuardedRepo.status
:closed
iex(2)> MyApp.GuardedRepo.query("SELECT NOW();")
[debug] QUERY OK db=1.1ms decode=0.6ms queue=0.5ms idle=1196.9ms
SELECT NOW(); []
{:ok,
%Postgrex.Result{
command: :select,
columns: ["now"],
rows: [[~U[2024-03-10 21:39:34.368015Z]]],
num_rows: 1,
connection_id: 96928,
messages: []
}}
<in another terminal I run `pg_ctl -D priv/db stop`.
Some error-logging still show as the Ecto process crashes, but only a few lines.
I've deleted those here to keep things easier to read>
iex(3)> MyApp.GuardedRepo.status
:open
iex(4)> MyApp.GuardedRepo.query("SELECT NOW();")
{:error, :open}
This GuardedRepo
module can also attempt a reconnect, which will close the circuit if the connection succeeds and let queries go through again. It doesn’t have all the proper circuit-breaker logic to automatically close, etc., I figure it’s good enough for a quick hackathon to get this far and the rest would be kind of icing on the cake that could be added in later iterations.
This is literally the first time I’ve used GenServers or Supervisors so I assume the code for this might be hot fire, but I’d be happy to hear reactions if someone wants to type in their spit-takes :
defmodule MyApp.GuardedRepo do
@moduledoc """
* `GuardedRepo`: This is the public interface. It also manages child processes: `StateManager` & `RepoSupervisor`.
* `StateManager`: Holds guard-state, including a "circuit breaker" status that's `:closed` when the database is available, and `:open` when unavailable.
* `RepoSupervisor`: Supervisor that "firewalls" the Ecto repo process so the repo can crash (when database becomes unavailable) without affecting `GuardedRepo` functionality. It manages child processes: `RepoStarter` & `RepoObserver`
* `RepoStarter`: A simple wrapper to start the Ecto repo and signal to `StateManager` if the repo managed to start (because Ecto repos crash if no database is available)
* `RepoObserver`: This monitors the Ecto repo process, and if it crashes it notifies `StateManager`
"""
use Supervisor
def start_link(args), do: Supervisor.start_link(__MODULE__, args, name: __MODULE__)
def query(sql, params \\ [], opts \\ []) do
case status() do
:closed ->
repo = __MODULE__.StateManager |> Process.whereis() |> GenServer.call(:get_repo)
repo.query(sql, params, opts)
:open ->
{:error, :open}
end
end
def attempt_circuit_close do
case status() do
:closed ->
{:error, :already_closed}
:open ->
:ok = Supervisor.terminate_child(__MODULE__, __MODULE__.RepoSupervisor)
{:ok, _pid} = Supervisor.restart_child(__MODULE__, __MODULE__.RepoSupervisor)
end
end
def status, do: __MODULE__.StateManager |> Process.whereis() |> GenServer.call(:get_status)
@impl Supervisor
def init(args) do
repo = Keyword.get(args, :repo)
children = [
{__MODULE__.StateManager, repo: repo},
%{
id: __MODULE__.RepoSupervisor,
start: {__MODULE__.RepoSupervisor, :start_link, [[repo: repo]]},
restart: :permanent
}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule MyApp.GuardedRepo.StateManager do
use GenServer
def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)
@impl true
def init(args) do
repo = Keyword.get(args, :repo)
{:ok, %{repo: repo, status: :open}}
end
@impl true
def handle_call(:get_repo, _from, state), do: {:reply, state.repo, state}
@impl true
def handle_call(:get_status, _from, state), do: {:reply, state.status, state}
@impl true
def handle_call(:close_circuit, _from, state), do: {:reply, :ok, %{state | status: :closed}}
@impl true
def handle_call(:open_circuit, _from, state), do: {:reply, :ok, %{state | status: :open}}
end
defmodule MyApp.GuardedRepo.RepoSupervisor do
use Supervisor
def start_link(args), do: Supervisor.start_link(__MODULE__, args, name: __MODULE__)
@impl true
def init(args) do
repo = Keyword.get(args, :repo)
children = [
MyApp.GuardedRepo.RepoObserver,
%{
id: MyApp.GuardedRepo.RepoStarter,
start: {MyApp.GuardedRepo.RepoStarter, :start_link, [[repo: repo]]},
restart: :temporary
}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
defmodule MyApp.GuardedRepo.RepoStarter do
use GenServer
def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)
def init(args) do
repo = Keyword.get(args, :repo)
case repo.start_link() do
{:ok, pid} ->
MyApp.GuardedRepo.RepoObserver
|> Process.whereis()
|> GenServer.call({:start_monitoring, pid})
MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:close_circuit)
{:ok, :started}
{:error, reason} ->
MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:open_circuit)
{:error, reason}
end
end
end
defmodule MyApp.GuardedRepo.RepoObserver do
use GenServer
def start_link(_no_args \\ []), do: GenServer.start_link(__MODULE__, :no_args, name: __MODULE__)
@impl true
def init(_args), do: {:ok, %{monitoring_started: false, monitor_ref: nil, repo_pid: nil}}
@impl true
def handle_call({:start_monitoring, repo_pid}, _from, state),
do:
{:reply, :ok,
%{
state
| monitoring_started: true,
monitor_ref: repo_pid |> Process.monitor(),
repo_pid: repo_pid
}}
@impl true
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
MyApp.GuardedRepo.StateManager |> Process.whereis() |> GenServer.call(:open_circuit)
{:noreply, %{state | repo_pid: nil}}
end
end
As I hope is clear from this thread I don’t mean for this solution to be some viable new great approach, I just think its a good way to learn by pushing against default patterns to see why a system is put together the way it is. If someone can spot easier ways to solve what I’ve done above I’d love to hear it, and don’t assume I know what might seem obvious to you because I’m just beginning my dive into Elixir and I probably have blindspots the size of all of Elixir. So any form of feedback is appreciated.