PubSub and nested transactions

I’m wondering how others handle the need to do “send PubSub when transaction is complete” in the case of nested transactions.

With non-nested transactions, this is fine:

def create_thing(args) do
  # start transaction
  # create thing
  # do other stuff
  # end transaction
  |> tap(fn
    {:ok, result} -> pubsub_thing_created(result)
    _ -> :do_nothing
  end)
end

def cancel_thing(args) do
  # start transaction
  # cancel thing
  # do other stuff
  # end transaction
  |> tap(fn
    {:ok, result} -> pubsub_thing_cancelled(result)
    _ -> :do_nothing
  end)
end

But when you want to compose them, it creates bugs:

def replace_thing(params) do
  # start transaction
  # cancel_thing(params) - sends pubsub
  # create_thing(params) - sends pubsub
  # do other stuff
  # end transaction
end

When cancel_thing and create_thing run, they think the transaction is complete and they send their PubSub messages. But actually, they’re running inside of the replace_thing transaction, which hasn’t yet committed.

This creates bugs. Anybody who gets the pubsub message about create_thing and tries to look it up might find that it doesn’t yet exist (the transaction hasn’t committed yet), and in fact, it may never exist (transaction rolls back).

Idea: Use Oban

Instead of sending PubSub messages, each function could queue an Oban job to send the PubSub message.

Pro: Since it won’t get inserted unless and until the transaction commits, it won’t run until after that.
Cons: Introduces possibly-unacceptable delays and any other bugs that may come with running jobs.

Idea: Store Callbacks in the Process dictionary

We’ve used custom code like this.
In the “lower level” functions, instead of calling pubsub_thing_created/1 directly, call something like:

run_or_store_callbacks([&PubSub.thing.created/1, [thing])

This checks the process dictionary to see whether we’re currently 1 or more levels deep in transactions.
If not, it runs the callbacks.
If so, it stores the callbacks in another process dictionary key (appending to whatever is there).
When we get to the top of the call stack again, we can call all of those.

(This could probably be improved with Repo.in_transaction?/0 but would still use the process dictionary to store the callbacks.)

Pro: fast
Con: custom, ugly

Idea: An Ecto after_transaction option?

Could/should this be built into Ecto?

Repo.transaction(fn ->
  foo = create_foo()
  Repo.after_transaction([&PubSub.foo_created/1, foo])
  Repo.transaction(fn ->
    bar = create_bar
    Repo.after_transaction([&PubSub.bar_created/1, bar])
  end)
end)
## callbacks run at this point

Other Ideas?

Do you do something else?

1 Like

Ecto.Multi, maybe? It does run in a transaction and IIRC has no issue with nested transactions (but don’t quote me on that, it should be tried).

Another way would be to run the transaction in a task. Right after the transaction beginning, spawn a process which monitors the task and send this process notification “requests”. When the task finishes, the monitoring process will get a message and it will trigger the actual notifications.

1 Like

I think the important bit here is that a decision needs to be made between at-least-once and at-most-once delivery of notifications and you already showed the options:

  • If you need guaranteed sending or delivery (at least once) of notifications then oban / the outbox pattern is the way to go.

  • “at most once” would be handled by delaying effects.
    Commonly this is done to avoid running side effects within pure functions, but the same approach can be used to delay running “side effects” within the context of a transaction. That’s your process dict option. Personally I’d prefer returning effects instead of storing them in the process dict, but Repo.in_transaction? afaik also looks at the process dict. It might not be too bad for just this usecase. Returning effects has more wide spread applications though.

Given how general the solution to the latter option is I don’t think ecto should have explicit tools for that. Afaik there’s a bunch of third party libs around the idea of effect handling and integrating with Repo.in_transaction? shouldn’t be too hard.

That’s preventing the nested transaction – or rather the functional compositions of multiple things running in a transaction on their own or with other things – which feels like something still expected to function for a viable solution.

1 Like

How my approach prevents anything? Transactions run in the calling process.

Yes, but how does moving the whole computation from one process to another help anything? The code wanting to define the notification to be sent still runs within the process running the transaction or it would split its part of a transaction into a different process, which would make it no longer be nested.

If I have understood correctly, the idea is to accumulate events and fire them after the outermost transaction finishes successfully. My approach allows just that while leveraging process lifetimes.

Just getting around to tinkering with this again. Here’s what I’ve got so far.
It still stores callbacks in the process dictionary, but is simpler code than what we had previously.

(These are long code blocks so you have to scroll them individually.)

defmodule MyApp.TransactionFollowups do
  @moduledoc """
  This module exists to handle followup actions (typically, PubSub calls) that
  need to be run after a transaction completes, and to do this in a way that
  allows nested function calls.

  Without nesting, this would be pretty simple:

      def create_thing(args) do
        # start transaction
        # create thing
        # do other stuff
        # end transaction
        |> tap(fn
          {:ok, result} -> pubsub_thing_created(result)
          _ -> :do_nothing
        end)
      end

  However, `create_thing/1` can also be run from inside another transaction:

      def replace_thing(params) do
        # start transaction
        # cancel_thing(params) - sends pubsub
        # create_thing(params) - sends pubsub
        # do other stuff
        # end transaction
      end

  When `cancel_thing/1` and `create_thing/1` run, they think the transaction is
  complete and they send their PubSub messages. But actually, they’re running
  inside of the `replace_thing/1` transaction, which hasn't yet committed.

  This creates bugs. Anybody who gets the PubSub message about `create_thing/1`
  and tries to look it up might find that it doesn't yet exist (because the
  transaction hasn't committed yet). In fact, it may **never** exist (if the
  transaction fails).

  This module provides a solution:

  - Whatever followups a function needs to run, we `capture/1`
  - At point where `cancel_thing/1` and `create_thing/1` have finished their
    own `Repo.transaction/1`, they call `run_if_committed/1`. If they're being
    called as standalone functions, the transaction really has committed and the
    followups run. If they're being called from inside another transaction, the
    followups don't run yet.
  - At the end of its wrapping `Repo.transaction/1`, `replace_thing/1` also
    calls `run_if_committed/1`, which runs the followups captured in the "inner"
    function calls.

  Note that there's no such thing as "nested transactions" in PostgreSQL; only
  the outer call actually begins a database transaction. See
  https://hexdocs.pm/ecto/Ecto.Repo.html#c:transaction/2-nested-transactions
  """

  @type transaction_result :: {:ok, term()} | term()
  @process_key __MODULE__

  @doc """
  Given a function and arguments, or a list of them, stores them to be run
  later.

  ## Examples

    iex> MyApp.TransactionFollowups.capture(
      {&MyApp.PubSub.broadcast_thing_created!/2, [parent_id, thing]}
    )
    :ok

    iex> MyApp.TransactionFollowups.capture(
      [{&MyApp.PubSub.broadcast_thing_created!/2, [parent_id, thing]}]
    )
    :ok
  """
  @spec capture([{function(), [term()]}]) :: :ok
  def capture(followup_or_followups) do
    followups = List.wrap(followup_or_followups)
    existing_followups = Process.get(@process_key, [])
    Process.put(@process_key, existing_followups ++ followups)

    :ok
  end

  @doc """
  Given the successful result of a transaction, if there is no "outer"
  transaction running, will run any functions that were deferred
  using `MyApp.TransactionFollowups.capture/1`.

  Always returns the original result.

  ## Examples

      iex> MyApp.TransactionFollowups.run_if_committed(transaction_result)
      transaction_result
  """
  @spec run_if_committed(transaction_result()) :: transaction_result()
  def run_if_committed(result) do
    with {:ok, _} <- result,
         false <- MyApp.Repo.in_transaction?() do
      followups = Process.get(@process_key, [])
      for {fun, args} <- followups, do: apply(fun, args)
      Process.delete(@process_key)

      result
    else
      _ -> result
    end
  end
end

Tests:

defmodule MyApp.TransactionFollowupsTest do
  use MyApp.DataCase, async: true
  alias MyApp.TransactionFollowups, as: TF
  alias MyApp.Repo

  defmodule Examples do
    def insert_thing(result_function) do
      Ecto.Multi.new()
      |> Ecto.Multi.run(:insert_thing, fn _repo, _changes_so_far ->
        # Passing a single followup
        TF.capture({&send/2, [self(), :inserted_thing]})
        result_function.()
      end)
      |> Repo.transaction()
      |> TF.run_if_committed()
    end

    def delete_thing(result_function) do
      Ecto.Multi.new()
      |> Ecto.Multi.run(:delete_thing, fn _repo, _changes_so_far ->
        # Passing a list of followups
        TF.capture([{&send/2, [self(), :deleted_thing]}])
        result_function.()
      end)
      |> Repo.transaction()
      |> TF.run_if_committed()
    end

    def replace_thing(result_function) do
      Ecto.Multi.new()
      |> Ecto.Multi.run(:delete_old_thing, fn _repo, _changes_so_far ->
        case delete_thing(result_function) do
          {:ok, result} ->
            {:ok, result}

          {:error, _step, result, _changes_so_far} ->
            {:error, result}
        end
      end)
      |> Ecto.Multi.run(:insert_new_thing, fn _repo, _changes_so_far ->
        case insert_thing(result_function) do
          {:ok, result} ->
            {:ok, result}

          {:error, _step, result, _changes_so_far} ->
            {:error, result}
        end
      end)
      |> Repo.transaction()
      |> TF.run_if_committed()
    end
  end

  test "runs followups after successful non-nested calls to Repo.transaction/1" do
    Examples.delete_thing(fn -> {:ok, :fake_success} end)
    assert_received :deleted_thing

    Examples.insert_thing(fn -> {:ok, :fake_success} end)
    assert_received :inserted_thing
  end

  test "does not run followups after failed non-nested calls to Repo.transaction/1" do
    Examples.delete_thing(fn -> {:error, :fake_failure} end)
    refute_received :deleted_thing

    Examples.insert_thing(fn -> {:error, :fake_failure} end)
    refute_received :inserted_thing
  end

  test "runs followups after successful nested calls to Repo.transaction/1" do
    Examples.replace_thing(fn -> {:ok, :fake_success} end)
    # Checking this way to show that order is preserved
    {:messages, messages} = Process.info(self(), :messages)
    assert messages == [:deleted_thing, :inserted_thing]
  end

  test "does not run followups after failed nested calls to Repo.transaction/1" do
    Examples.replace_thing(fn -> {:error, :fake_failure} end)
    {:messages, messages} = Process.info(self(), :messages)
    assert messages == []
  end
end
1 Like

Some folks internally weren’t a fan of using the process dictionary. The fundamental feeling was that if you need something to happen only if a db transaction succeeds, you should do it using the db transaction itself.

So we switched strategies: whenever we try to PubSub while we’re in a transaction, we instead store that in a pubsub_messages table to be sent after the transaction is complete. We have a custom poller for doing that.

You might say, “why not insert Oban jobs?” A couple reasons. We need these to be sent low-latency, in the order inserted, and global unique ordering has been a pain point with Oban for us in recent versions (YMMV and I think our problems have been solved now, but we’re not keen to create more of them just in case). Also, a custom Broadcaster can grab N messages at a time and broadcast them all instead of running 1 job per message.

We also considered using LISTEN/NOTIFY, but serialization, payload size limits, and needing a persistent listening connection all made that more complex.

We have the advantage that all our PubSub work was already done via custom modules, which all used a single implementation function. So we only had to modify that one in order to store the messages:

def broadcast!(topic_or_topics, payload) do
  topics = List.wrap(topic_or_topics)

  if MyApp.Repo.in_transaction?() do
    {:ok, _record} = MyApp.PubSub.Messages.create_message(topics, payload)
  else
    :ok = do_broadcast!(topics, payload)
  end

  :ok
end

The pubsub_messages migragration is:

defmodule MyApp.Repo.Migrations.CreatePubsubMessages do
  use Ecto.Migration

  def change do
    create table(:pubsub_messages) do
      add :topics, {:array, :text}, null: false, default: []
      add :inspected_payload, :text, null: false
      add :binary_payload, :text, null: false
      timestamps()
    end

    create index(:pubsub_messages, [:inserted_at])
  end
end

(Actually we don’t need updated_at, but whatevs.)

PubSub.Message looks like this:

defmodule MyApp.PubSub.Message do
  @moduledoc """
  Ephemeral record of a pubsub message that should be broadcast on one or more topics.
  These messages are meant to be processed and deleted by a
  `MyApp.PubSub.Broadcaster` very quickly after insertion.
  """

  use MyApp.Schema
  import Ecto.Changeset

  schema "pubsub_messages" do
    field :topics, {:array, :string}, default: []
    # To rehydrate and broadcast
    field :binary_payload, :string
    # Just to make this table human-readable
    field :inspected_payload, :string

    timestamps()
  end

  @type t :: %__MODULE__{
          id: non_neg_integer(),
          topics: [String.t()],
          binary_payload: String.t(),
          inspected_payload: String.t(),
          inserted_at: DateTime.t(),
          updated_at: DateTime.t()
        }

  @fields [
    :topics,
    :binary_payload,
    :inspected_payload
  ]

  @doc false
  def create_changeset(topics, payload) do
    attrs = %{
      topics: List.wrap(topics),
      binary_payload: payload_to_binary(payload),
      inspected_payload: inspect(payload)
    }

    %__MODULE__{}
    |> cast(attrs, @fields)
    |> validate_required(@fields)
  end

  def payload_to_binary(payload) do
    payload
    |> :erlang.term_to_binary()
    |> Base.encode64()
  end

  def binary_to_payload(binary) do
    binary
    |> Base.decode64!()
    # This is safe because users have no control over the payload
    # sobelow_skip ["Misc.BinToTerm"]
    |> :erlang.binary_to_term()
  end
end

The context module is:

defmodule MyApp.PubSub.Messages do
  @moduledoc """
  Functions for managing `MyApp.PubSub.Message`s.
  """

  alias Ecto.Multi

  alias MyApp.ErrorNotifier
  alias MyApp.PubSub.Message
  alias MyApp.Repo

  @doc """
  Insert a `MyApp.PubSub.Message`, representing a topic and payload for
  broadcasting.

  The reason for inserting a database record is to ensure that the broadcast
  will happen only if, and only after, the transaction is complete.

  ## Examples

    MyApp.PubSub.Messages.create_message(["topic1", "topic2"], %{my: :payload})
    # => {:ok, %MyApp.PubSub.Message{}}
  """
  @spec create_message(topics :: [String.t()], payload: term()) ::
          {:ok, Message.t()} | {:error, Ecto.Changeset.t()}
  def create_message(topics, payload) do
    topics
    |> Message.create_changeset(payload)
    |> Repo.insert()
  end

  @doc """
  Either broadcast and delete up to `limit` messages, or, if the lock for doing so
  is already taken, do nothing.

  This code is written to be concurrency-aware: it starts a transaction and
  attempts to acquire a PostgreSQL transaction-level advisory lock (a very
  inexpensive check). If it can't (because another process is currently running
  this function), it bails. This means we can safely run this code on multiple
  nodes concurrently and the first concurrent invocation will actually do
  anything.

  This is important because messages (especially controller updates) should be
  broadcast **in the order in which they're queued**; we wouldn't want Process
  B to start broadcasting messages 101-200 while Process A was still working
  through broadcasting messages 1-100.

  Accepts a `:limit` option to limit the number of messages to broadcast and delete.

  ## Examples

      MyApp.PubSub.Messages.broadcast(limit: 2)
      # => {:ok, messages}
      MyApp.PubSub.Messages.broadcast(limit: 2)
      # => {:error, :lock_already_claimed}
  """
  @spec broadcast(opts :: Keyword.t()) ::
          {:ok, [{list(topic :: String.t()), binary_payload :: binary()}]}
          | {:error, :lock_already_claimed}
  def broadcast(opts \\ []) do
    limit = Keyword.get(opts, :limit, 100)
    # This option is just for testing. Haters gonna hate.
    broadcast_function = Keyword.get(opts, :broadcast_function, &do_broadcast/1)

    Multi.new()
    |> Multi.run(:get_lock, fn repo, _changes_so_far ->
      get_lock(repo)
    end)
    |> Multi.run(:get_and_delete_messages, fn repo, _changes_so_far ->
      get_and_delete_messages(repo, limit)
    end)
    |> Multi.run(:do_broadcast, fn _repo, %{get_and_delete_messages: messages} ->
      broadcast_function.(messages)
      {:ok, :done}
    end)
    |> Repo.transaction()
    |> case do
      {:ok, %{get_and_delete_messages: messages}} ->
        {:ok, messages}

      {:error, :get_lock, :lock_already_claimed, _} ->
        {:error, :lock_already_claimed}
    end
  end

  # PostgreSQL uses an integer to identify an advisory lock.
  # This value is the output of `:erlang.phash2(MyApp.PubSub.Messages)`, but
  # hard coded in case `phash2/1` or the module name change in the future; otherwise
  # we might have a mismatch while nodes are being redeployed.
  @advisory_lock_id 74_903_187

  defp get_lock(repo) do
    # Uses a transaction-level advisory lock to ensure that only one
    # transaction at a time can get and delete records.
    # The lock is released when the transaction is committed or rolled back
    # (including timeouts, crashes, etc).
    {:ok, %{rows: [[acquired]]}} =
      repo.query(
        "SELECT pg_try_advisory_xact_lock($1)",
        [@advisory_lock_id]
      )

    case acquired do
      true -> {:ok, :lock_acquired}
      false -> {:error, :lock_already_claimed}
    end
  end

  defp get_and_delete_messages(repo, limit) do
    # Single query for efficiency
    query = """
    WITH messages_to_delete AS (
      SELECT id, topics, binary_payload
      FROM pubsub_messages
      ORDER BY inserted_at ASC
      LIMIT $1
      FOR UPDATE
    )
    DELETE FROM pubsub_messages
    USING messages_to_delete
    WHERE pubsub_messages.id = messages_to_delete.id
    RETURNING pubsub_messages.topics, pubsub_messages.binary_payload
    """

    case repo.query(query, [limit]) do
      {:ok, %{rows: messages}} -> {:ok, messages}
      error -> error
    end
  end

  defp do_broadcast(messages) do
    Enum.each(messages, fn [topics, binary_payload] ->
      payload = Message.binary_to_payload(binary_payload)

      try do
        MyApp.PubSub.do_broadcast!(topics, payload)
      rescue
        err ->
          # failure is very unlikely, but probably unrecoverable; report and move on
          ErrorNotifier.capture_message("PubSub failed", extra: err)
          :ok
      end
    end)
  end
end

The Broadcaster process is:

defmodule MyApp.PubSub.Broadcaster do
  @moduledoc """
  Periodically broadcasts (and deletes) any pending PubSub messages.
  """
  use GenServer

  def child_spec(arg) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [arg]},
      # When shutting down, ensure we have time to process pending messages
      shutdown: MyApp.fetch_module_config!(__MODULE__, :polling_interval_ms) * 2
    }
  end

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @impl GenServer
  def init(:ok) do
    # Schedule work to be performed on server start
    schedule_broadcast(0)
    {:ok, %{}}
  end

  @impl GenServer
  def handle_info(:broadcast, state) do
    case MyApp.PubSub.Messages.broadcast() do
      {:ok, [_h | _t]} ->
        # Found messages. There may be more, so check again immediately.
        schedule_broadcast(0)

      _ ->
        # Nothing found or another process had the lock
        schedule_broadcast(polling_interval_ms())
    end

    {:noreply, state}
  end

  defp schedule_broadcast(after_ms) do
    Process.send_after(self(), :broadcast, after_ms)
  end

  # More frequent polling means less latency for messages to be broadcast in
  # exchange for more querying of the database.
  # However, given that the pubsub messages table will be constantly pruned and
  # that we query exclusively by an indexed column, these frequent queries
  # should be cheap.
  def polling_interval_ms do
    MyApp.fetch_module_config!(__MODULE__, :polling_interval_ms)
    |> jitter()
  end

  def jitter(interval) do
    # jitter the value by up to 50%
    jitter_amount = round(interval * 0.5)
    jitter = Enum.random(-jitter_amount..jitter_amount)
    interval + jitter
  end
end

That gets started in the supervision tree after Phoenix.PubSub but before the EndPoint, so shutdown is in the reverse order and should send whatever is pending.