Backpressure on Postgres with Postgrex.ReplicationConnection

I’m trying to put some back pressure on the Postres write-ahead log.

Basically I’d like to write a wal record to disk before ack’ing to Postgres that we have received the record.

Here is a Livebook where we setup a ReplicationConnection and then write 1_000_000 records while doing some work (Process.sleep) on each one.

At the end of the Livebook we query the wal size and you can see that for ~30 seconds it reports ~175mb, and then nothing.

It looks like records are buffered somewhere lower level.

Also, RAM as reported by the VM is only at ~50 MB in this case. If I consume all records in a process quickly it balloons to ~500 MB.

If anyone has any insight I’d very much appreciate it!

Postgres Replication Question

Mix.install([
  {:ecto, ">= 3.10.2"},
  {:postgrex, ">= 0.17.1"},
  {:pgoutput_decoder, "~> 0.1.0"},
  {:jason, "~> 1.4"},
  {:logflare_api_client, "~> 0.3.5"},
  {:kino_db, "~> 0.2.1"}
])

Setup database

ALTER SYSTEM SET wal_level='logical';
ALTER SYSTEM SET max_wal_senders='10';
ALTER SYSTEM SET max_replication_slots='10';

Connect to our database

opts = [
  hostname: "localhost",
  port: 5432,
  username: "postgres",
  password: System.fetch_env!("LB_REPL_TEST_DB_PASS"),
  database: "repl_test"
]

{:ok, pid} = Kino.start_child({Postgrex, opts})
{:ok, result} = Postgrex.query(pid, "select 1 as one", [])
result

Create Publication

query = """
CREATE TABLE if not exists diary_entries (
  id SERIAL PRIMARY KEY,
  body TEXT,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  user_id INTEGER,
  tag TEXT
)
"""

{:ok, _result} = Postgrex.query(pid, query, [])
{:ok, _pub} = Postgrex.query(pid, "drop publication postgrex", [])
{:ok, _pub} = Postgrex.query(pid, "create publication postgrex", [])
Postgrex.query(pid, "ALTER PUBLICATION postgrex ADD TABLE diary_entries;", [])

Handle WAL records

defmodule Repl.WalHandler do
  @moduledoc """
  Publishes messages from Replication to PubSub
  """

  use GenServer

  alias PgoutputDecoder.Messages
  alias Repl.LogflareClient

  defstruct [:relations]

  @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  @spec process_message(any) :: :ok
  def process_message(message) do
    IO.inspect(message)
    GenServer.call(__MODULE__, {:message, message})
  end

  @impl true
  def init(_args) do
    # Process.flag(:message_queue_data, :off_heap)
    {:ok, %__MODULE__{}}
  end

  def get_relations() do
    GenServer.call(__MODULE__, :get_relations)
  end

  @impl true
  def handle_call(:get_relations, _from, %{relations: nil} = state) do
    {:reply, {:error, :no_relations_yet}, state}
  end

  def handle_call(:get_relations, _from, state) do
    {:reply, {:ok, state.relations}, state}
  end

  def handle_call({:message, %Messages.Relation{} = message}, _from, state) do
    relations = [message | state.relations]
    state = %{state | relations: relations}
    {:reply, {:ok, state}, state}
  end

  def handle_call(
        {:message, %Messages.Delete{relation_id: rel_id, old_tuple_data: nil} = message},
        _from,
        state
      ) do
    relation = Enum.find(state.relations, &(rel_id == &1.id))

    if relation do
      record =
        for {column, index} <- Enum.with_index(relation.columns),
            do: {String.to_atom(column.name), elem(message.changed_key_tuple_data, index)},
            into: %{}

      # LogflareClient.post(record)
      Process.sleep(1_000)

      {:reply, {:ok, state}, state}
    else
      {:reply, {:ok, state}, state}
    end
  end

  def handle_call({:message, %Messages.Delete{relation_id: rel_id} = message}, _from, state) do
    relation = Enum.find(state.relations, &(rel_id == &1.id))

    if relation do
      record =
        for {column, index} <- Enum.with_index(relation.columns),
            do: {String.to_atom(column.name), elem(message.old_tuple_data, index)},
            into: %{}

      # LogflareClient.post(record)
      Process.sleep(1_000)

      {:reply, {:ok, state}, state}
    else
      {:reply, {:ok, state}, state}
    end
  end

  def handle_call({:message, %{relation_id: rel_id} = message}, _from, state) do
    relation = Enum.find(state.relations, &(rel_id == &1.id))

    if relation do
      record =
        for {column, index} <- Enum.with_index(relation.columns),
            do: {String.to_atom(column.name), elem(message.tuple_data, index)},
            into: %{}

      IO.inspect(record, label: "-------RECORD-----")
      Process.sleep(1_000)

      # LogflareClient.post(record)

      {:reply, {:ok, state}, state}
    else
      {:reply, {:ok, state}, state}
    end
  end

  def handle_call({:message, _message}, _from, state) do
    :noop
    {:reply, {:ok, state}, state}
  end
end

Listen to the WAL

defmodule Repl.ReplConn do
  use Postgrex.ReplicationConnection

  alias Repl.WalHandler

  def start_link(opts) do
    # Automatically reconnect if we lose connection.
    extra_opts = [
      auto_reconnect: true
    ]

    Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
  end

  @impl true
  def init(:ok) do
    # Process.flag(:message_queue_data, :off_heap)
    {:ok, %{step: :disconnected}}
  end

  @impl true
  def handle_connect(state) do
    query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
    {:query, query, %{state | step: :create_slot}}
  end

  @impl true
  def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
    query =
      "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'postgrex')"

    {:stream, query, [], %{state | step: :streaming}}
  end

  @impl true
  # https://www.postgresql.org/docs/14/protocol-replication.html
  def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
    record = PgoutputDecoder.decode_message(rest)

    WalHandler.process_message(record)

    {:noreply, state}
  end

  def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
    messages =
      case reply do
        1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
        0 -> []
      end

    {:noreply, messages, state}
  end

  @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
  defp current_time(), do: System.os_time(:microsecond) - @epoch
end

Start it!!!

# ReplHandler
Repl.WalHandler.start_link([])

repl_pid =
  Repl.ReplConn.start_link(
    host: "localhost",
    port: 5432,
    database: "repl_test",
    username: "postgres",
    password: "postgres"
  )
  |> case do
    {:ok, pid} -> pid
    {:error, {:already_started, pid}} -> pid
  end

Insert some stuff

query = """
INSERT INTO diary_entries (body, tag, user_id)
SELECT
    'Lorem ipsum dolor sit amet',
    'Tag' || generate_series(1, 10),
    (random() * 1000)::int
FROM generate_series(1, 100000);
"""

# Postgrex.query(pid, query, ["This is a journal entry.", 1, "work"])
Postgrex.query(pid, query, [])

Check WAL size

# WAL size reports something for ~30 seconds then reports nothing, while
# the wal continues to come in.

# It seems like Postgres has sent all it's WAL while we are still processing it.

# We would like to put some backpressure on Postgres so that we can safely consume
# the wal without dropping any records.

query = """
select 
	slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) as replicationSlotLag, active 
from 
	pg_replication_slots;
"""

Postgrex.query(pid, query, [])
2 Likes

Sorry that nobody more knowledgeable chimed in. I don’t get the problem, the motivation or even fully the means to fix your problem but I gotta naively ask: why not use GenStage or Flow or Oban and employ their backpressure mechanisms? Including put another DB in front of your Postgres (assuming you have a scaling problem and can’t just upgrade servers; sqlite3 or some of the better K/V stores might be a good idea… hell, maybe even Kafka) and then use the aforementioned tech to smooth out the write load on Postgres?

Are you experiencing Postgres being overwhelmed, basically? Apologies but it’s hard for me to even comprehend the problem statement.

1 Like

Synchronous replication sounds like it’s closer to what you want, but I don’t know anything about it besides what’s in that document.

@dimitarvp I help out with Supabase Realtime which takes records from the write-ahead log and delivers them to clients globally. I want to guarantee that delivery.

We currently don’t offer such guarantees. We have what TCP gets us but that isn’t enough long term. To start, I need to pull records from the wal in such a way that I write them somewhere with more scalable storage than an individual Postgres instance may have. So I need to read a record, write it somewhere durable, and then tell Postgres that this wal record was read and it can safely ack it.

After a little research I’m pretty sure this happens via TCP acks.

Once I have them in a more scalable durable queue somewhere I can use Broadway, GenState, etc to pull from that and do what I need.

We also have potentially other things we want to do with the wal too, like archive them in an analytics store to let customer query their database change history.

4 Likes

@al2o3cr sync replication is in the write path. So if your replication is set to sync when a write comes it it will wait until the primary, and read-only(s) confirm that they have successfully written data to their write-ahead logs.

So it’s the same principle. And I know the Postgres replication protocol can handle it because it supports sync replication.

Not many people run sync replication because it slows down your writes and most use case are okay with potentially a little data loss in the event of a failover.

Anywhoo, I know it’s possible I think I just have to handle the protocol directly.