Some folks internally weren’t a fan of using the process dictionary. The fundamental feeling was that if you need something to happen only if a db transaction succeeds, you should do it using the db transaction itself.
So we switched strategies: whenever we try to PubSub while we’re in a transaction, we instead store that in a pubsub_messages
table to be sent after the transaction is complete. We have a custom poller for doing that.
You might say, “why not insert Oban jobs?” A couple reasons. We need these to be sent low-latency, in the order inserted, and global unique ordering has been a pain point with Oban for us in recent versions (YMMV and I think our problems have been solved now, but we’re not keen to create more of them just in case). Also, a custom Broadcaster
can grab N messages at a time and broadcast them all instead of running 1 job per message.
We also considered using LISTEN/NOTIFY
, but serialization, payload size limits, and needing a persistent listening connection all made that more complex.
We have the advantage that all our PubSub work was already done via custom modules, which all used a single implementation function. So we only had to modify that one in order to store the messages:
def broadcast!(topic_or_topics, payload) do
topics = List.wrap(topic_or_topics)
if MyApp.Repo.in_transaction?() do
{:ok, _record} = MyApp.PubSub.Messages.create_message(topics, payload)
else
:ok = do_broadcast!(topics, payload)
end
:ok
end
The pubsub_messages
migragration is:
defmodule MyApp.Repo.Migrations.CreatePubsubMessages do
use Ecto.Migration
def change do
create table(:pubsub_messages) do
add :topics, {:array, :text}, null: false, default: []
add :inspected_payload, :text, null: false
add :binary_payload, :text, null: false
timestamps()
end
create index(:pubsub_messages, [:inserted_at])
end
end
(Actually we don’t need updated_at
, but whatevs.)
PubSub.Message
looks like this:
defmodule MyApp.PubSub.Message do
@moduledoc """
Ephemeral record of a pubsub message that should be broadcast on one or more topics.
These messages are meant to be processed and deleted by a
`MyApp.PubSub.Broadcaster` very quickly after insertion.
"""
use MyApp.Schema
import Ecto.Changeset
schema "pubsub_messages" do
field :topics, {:array, :string}, default: []
# To rehydrate and broadcast
field :binary_payload, :string
# Just to make this table human-readable
field :inspected_payload, :string
timestamps()
end
@type t :: %__MODULE__{
id: non_neg_integer(),
topics: [String.t()],
binary_payload: String.t(),
inspected_payload: String.t(),
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}
@fields [
:topics,
:binary_payload,
:inspected_payload
]
@doc false
def create_changeset(topics, payload) do
attrs = %{
topics: List.wrap(topics),
binary_payload: payload_to_binary(payload),
inspected_payload: inspect(payload)
}
%__MODULE__{}
|> cast(attrs, @fields)
|> validate_required(@fields)
end
def payload_to_binary(payload) do
payload
|> :erlang.term_to_binary()
|> Base.encode64()
end
def binary_to_payload(binary) do
binary
|> Base.decode64!()
# This is safe because users have no control over the payload
# sobelow_skip ["Misc.BinToTerm"]
|> :erlang.binary_to_term()
end
end
The context module is:
defmodule MyApp.PubSub.Messages do
@moduledoc """
Functions for managing `MyApp.PubSub.Message`s.
"""
alias Ecto.Multi
alias MyApp.ErrorNotifier
alias MyApp.PubSub.Message
alias MyApp.Repo
@doc """
Insert a `MyApp.PubSub.Message`, representing a topic and payload for
broadcasting.
The reason for inserting a database record is to ensure that the broadcast
will happen only if, and only after, the transaction is complete.
## Examples
MyApp.PubSub.Messages.create_message(["topic1", "topic2"], %{my: :payload})
# => {:ok, %MyApp.PubSub.Message{}}
"""
@spec create_message(topics :: [String.t()], payload: term()) ::
{:ok, Message.t()} | {:error, Ecto.Changeset.t()}
def create_message(topics, payload) do
topics
|> Message.create_changeset(payload)
|> Repo.insert()
end
@doc """
Either broadcast and delete up to `limit` messages, or, if the lock for doing so
is already taken, do nothing.
This code is written to be concurrency-aware: it starts a transaction and
attempts to acquire a PostgreSQL transaction-level advisory lock (a very
inexpensive check). If it can't (because another process is currently running
this function), it bails. This means we can safely run this code on multiple
nodes concurrently and the first concurrent invocation will actually do
anything.
This is important because messages (especially controller updates) should be
broadcast **in the order in which they're queued**; we wouldn't want Process
B to start broadcasting messages 101-200 while Process A was still working
through broadcasting messages 1-100.
Accepts a `:limit` option to limit the number of messages to broadcast and delete.
## Examples
MyApp.PubSub.Messages.broadcast(limit: 2)
# => {:ok, messages}
MyApp.PubSub.Messages.broadcast(limit: 2)
# => {:error, :lock_already_claimed}
"""
@spec broadcast(opts :: Keyword.t()) ::
{:ok, [{list(topic :: String.t()), binary_payload :: binary()}]}
| {:error, :lock_already_claimed}
def broadcast(opts \\ []) do
limit = Keyword.get(opts, :limit, 100)
# This option is just for testing. Haters gonna hate.
broadcast_function = Keyword.get(opts, :broadcast_function, &do_broadcast/1)
Multi.new()
|> Multi.run(:get_lock, fn repo, _changes_so_far ->
get_lock(repo)
end)
|> Multi.run(:get_and_delete_messages, fn repo, _changes_so_far ->
get_and_delete_messages(repo, limit)
end)
|> Multi.run(:do_broadcast, fn _repo, %{get_and_delete_messages: messages} ->
broadcast_function.(messages)
{:ok, :done}
end)
|> Repo.transaction()
|> case do
{:ok, %{get_and_delete_messages: messages}} ->
{:ok, messages}
{:error, :get_lock, :lock_already_claimed, _} ->
{:error, :lock_already_claimed}
end
end
# PostgreSQL uses an integer to identify an advisory lock.
# This value is the output of `:erlang.phash2(MyApp.PubSub.Messages)`, but
# hard coded in case `phash2/1` or the module name change in the future; otherwise
# we might have a mismatch while nodes are being redeployed.
@advisory_lock_id 74_903_187
defp get_lock(repo) do
# Uses a transaction-level advisory lock to ensure that only one
# transaction at a time can get and delete records.
# The lock is released when the transaction is committed or rolled back
# (including timeouts, crashes, etc).
{:ok, %{rows: [[acquired]]}} =
repo.query(
"SELECT pg_try_advisory_xact_lock($1)",
[@advisory_lock_id]
)
case acquired do
true -> {:ok, :lock_acquired}
false -> {:error, :lock_already_claimed}
end
end
defp get_and_delete_messages(repo, limit) do
# Single query for efficiency
query = """
WITH messages_to_delete AS (
SELECT id, topics, binary_payload
FROM pubsub_messages
ORDER BY inserted_at ASC
LIMIT $1
FOR UPDATE
)
DELETE FROM pubsub_messages
USING messages_to_delete
WHERE pubsub_messages.id = messages_to_delete.id
RETURNING pubsub_messages.topics, pubsub_messages.binary_payload
"""
case repo.query(query, [limit]) do
{:ok, %{rows: messages}} -> {:ok, messages}
error -> error
end
end
defp do_broadcast(messages) do
Enum.each(messages, fn [topics, binary_payload] ->
payload = Message.binary_to_payload(binary_payload)
try do
MyApp.PubSub.do_broadcast!(topics, payload)
rescue
err ->
# failure is very unlikely, but probably unrecoverable; report and move on
ErrorNotifier.capture_message("PubSub failed", extra: err)
:ok
end
end)
end
end
The Broadcaster
process is:
defmodule MyApp.PubSub.Broadcaster do
@moduledoc """
Periodically broadcasts (and deletes) any pending PubSub messages.
"""
use GenServer
def child_spec(arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]},
# When shutting down, ensure we have time to process pending messages
shutdown: MyApp.fetch_module_config!(__MODULE__, :polling_interval_ms) * 2
}
end
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@impl GenServer
def init(:ok) do
# Schedule work to be performed on server start
schedule_broadcast(0)
{:ok, %{}}
end
@impl GenServer
def handle_info(:broadcast, state) do
case MyApp.PubSub.Messages.broadcast() do
{:ok, [_h | _t]} ->
# Found messages. There may be more, so check again immediately.
schedule_broadcast(0)
_ ->
# Nothing found or another process had the lock
schedule_broadcast(polling_interval_ms())
end
{:noreply, state}
end
defp schedule_broadcast(after_ms) do
Process.send_after(self(), :broadcast, after_ms)
end
# More frequent polling means less latency for messages to be broadcast in
# exchange for more querying of the database.
# However, given that the pubsub messages table will be constantly pruned and
# that we query exclusively by an indexed column, these frequent queries
# should be cheap.
def polling_interval_ms do
MyApp.fetch_module_config!(__MODULE__, :polling_interval_ms)
|> jitter()
end
def jitter(interval) do
# jitter the value by up to 50%
jitter_amount = round(interval * 0.5)
jitter = Enum.random(-jitter_amount..jitter_amount)
interval + jitter
end
end
That gets started in the supervision tree after Phoenix.PubSub
but before the EndPoint
, so shutdown is in the reverse order and should send whatever is pending.