Hello,
I have been using the Postgres NOTIFY feature via Postgrex.Notifications
to listen for messages emitted after insert of a table row. I’m using this to emit the event notification, rather than Phoenix.PubSub
or another strategy, because for my use case I’m specifically interested in knowing when the data is available to read, and not when the transaction was made. I’m distributing the application in a global cluster on fly.io with local read replicas in each VM region, and the intent is to know when the local replica has the data.
I have this working exactly as intended when I only have one writable db instance that all nodes are connected to. Recently I added @brainlid 's excellent fly_postgres
library to my project to setup local replica connections. The library can detect if a node is in the “primary” region or not, and will dynamically change the database connection and config to connect to the nearest replica if it’s outside of the primary. It also wraps the Repo
functions to automatically make RPC calls to nodes in the primary for write operations.
After following the configuration steps everything is working great except that I am no longer receiving the postgres NOTIFY messages on the nodes outside of the primary region when deploying the read replicas. Nodes in the primary region still receive messages properly. I found some information on stack overflow about extra psql you must execute in order to enable triggers on the replicas, but after running the migration I still have no luck.
Does anyone know if I might be doing something wrong? Does Postgrex.Notifications
work with read replicas? Is there some quirk of this configuration, or maybe the fly.io database infrastructure that could be tripping this up?
Here’s the relevant code/config. Note that fly_postgres
has you rename your Repo
to Repo.Local
, and the standard Repo
module wraps the functions in Repo.Local
, changing the behavior based on the region, so if you see naming inconsistencies those are not mistakes Thanks in advance!
# config/config.exs
config :my_app,
ecto_repos: [MyApp.Repo.Local]
config :my_app, MyApp.Repo.Local,
priv: "priv/repo"
# config/runtime.exs
database_url =
System.get_env("DATABASE_URL") ||
raise """
environment variable DATABASE_URL is missing.
For example: ecto://USER:PASS@HOST/DATABASE
"""
maybe_ipv6 = if System.get_env("ECTO_IPV6"), do: [:inet6], else: []
config :my_app, MyApp.Repo.Local,
# ssl: true,
url: database_url,
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "20"),
socket_options: maybe_ipv6
# application.ex
children = [
Fly.RPC,
MyApp.Repo.Local,
{Fly.Postgres.LSN.Tracker, repo: MyApp.Repo.Local},
{Postgrex.Notifications, Keyword.put_new(Repo.config(), :name, MyApp.Notifier)},
MyApp.Listener
]
Supervisor.start_link(...)
# my_app/listener.ex
def init(_) do
Process.monitor(MyApp.Notifier)
{_, ref} = Postgrex.Notifications.listen(MyApp.Notifier, "handoff_inserted")
{:ok, %__MODULE__{ref: ref}}
end
def handle_info({:notification, _, ref, "handoff_inserted", payload}, %{ref: ref} = state) do
do_stuff(payload)
{:noreply, state}
end
# migrations
defmodule MyApp.Repo.Migrations.CreateHandoffNotifications do
use Ecto.Migration
def up do
execute("""
CREATE OR REPLACE FUNCTION notify_handoff_insertions()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'handoff_inserted',
NEW.game_id::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
execute("""
CREATE TRIGGER handoff_inserted
AFTER INSERT
ON handoffs
FOR EACH ROW
EXECUTE PROCEDURE notify_handoff_insertions();
""")
end
def down do
execute("DROP FUNCTION notify_handoff_insertions() CASCADE;")
end
end
# added this migration after installing library and renaming to Repo.Local
defmodule MyApp.Repo.Local.Migrations.AlwaysEnableHandoffNotifications do
use Ecto.Migration
def up do
execute("""
ALTER TABLE handoffs ENABLE ALWAYS TRIGGER handoff_inserted;
""")
end
end