I’m trying to put some back pressure on the Postres write-ahead log.
Basically I’d like to write a wal record to disk before ack’ing to Postgres that we have received the record.
Here is a Livebook where we setup a ReplicationConnection and then write 1_000_000 records while doing some work (Process.sleep) on each one.
At the end of the Livebook we query the wal size and you can see that for ~30 seconds it reports ~175mb, and then nothing.
It looks like records are buffered somewhere lower level.
Also, RAM as reported by the VM is only at ~50 MB in this case. If I consume all records in a process quickly it balloons to ~500 MB.
If anyone has any insight I’d very much appreciate it!
Postgres Replication Question
Mix.install([
{:ecto, ">= 3.10.2"},
{:postgrex, ">= 0.17.1"},
{:pgoutput_decoder, "~> 0.1.0"},
{:jason, "~> 1.4"},
{:logflare_api_client, "~> 0.3.5"},
{:kino_db, "~> 0.2.1"}
])
Setup database
ALTER SYSTEM SET wal_level='logical';
ALTER SYSTEM SET max_wal_senders='10';
ALTER SYSTEM SET max_replication_slots='10';
Connect to our database
opts = [
hostname: "localhost",
port: 5432,
username: "postgres",
password: System.fetch_env!("LB_REPL_TEST_DB_PASS"),
database: "repl_test"
]
{:ok, pid} = Kino.start_child({Postgrex, opts})
{:ok, result} = Postgrex.query(pid, "select 1 as one", [])
result
Create Publication
query = """
CREATE TABLE if not exists diary_entries (
id SERIAL PRIMARY KEY,
body TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
tag TEXT
)
"""
{:ok, _result} = Postgrex.query(pid, query, [])
{:ok, _pub} = Postgrex.query(pid, "drop publication postgrex", [])
{:ok, _pub} = Postgrex.query(pid, "create publication postgrex", [])
Postgrex.query(pid, "ALTER PUBLICATION postgrex ADD TABLE diary_entries;", [])
Handle WAL records
defmodule Repl.WalHandler do
@moduledoc """
Publishes messages from Replication to PubSub
"""
use GenServer
alias PgoutputDecoder.Messages
alias Repl.LogflareClient
defstruct [:relations]
@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@spec process_message(any) :: :ok
def process_message(message) do
IO.inspect(message)
GenServer.call(__MODULE__, {:message, message})
end
@impl true
def init(_args) do
# Process.flag(:message_queue_data, :off_heap)
{:ok, %__MODULE__{}}
end
def get_relations() do
GenServer.call(__MODULE__, :get_relations)
end
@impl true
def handle_call(:get_relations, _from, %{relations: nil} = state) do
{:reply, {:error, :no_relations_yet}, state}
end
def handle_call(:get_relations, _from, state) do
{:reply, {:ok, state.relations}, state}
end
def handle_call({:message, %Messages.Relation{} = message}, _from, state) do
relations = [message | state.relations]
state = %{state | relations: relations}
{:reply, {:ok, state}, state}
end
def handle_call(
{:message, %Messages.Delete{relation_id: rel_id, old_tuple_data: nil} = message},
_from,
state
) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.changed_key_tuple_data, index)},
into: %{}
# LogflareClient.post(record)
Process.sleep(1_000)
{:reply, {:ok, state}, state}
else
{:reply, {:ok, state}, state}
end
end
def handle_call({:message, %Messages.Delete{relation_id: rel_id} = message}, _from, state) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.old_tuple_data, index)},
into: %{}
# LogflareClient.post(record)
Process.sleep(1_000)
{:reply, {:ok, state}, state}
else
{:reply, {:ok, state}, state}
end
end
def handle_call({:message, %{relation_id: rel_id} = message}, _from, state) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.tuple_data, index)},
into: %{}
IO.inspect(record, label: "-------RECORD-----")
Process.sleep(1_000)
# LogflareClient.post(record)
{:reply, {:ok, state}, state}
else
{:reply, {:ok, state}, state}
end
end
def handle_call({:message, _message}, _from, state) do
:noop
{:reply, {:ok, state}, state}
end
end
Listen to the WAL
defmodule Repl.ReplConn do
use Postgrex.ReplicationConnection
alias Repl.WalHandler
def start_link(opts) do
# Automatically reconnect if we lose connection.
extra_opts = [
auto_reconnect: true
]
Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
end
@impl true
def init(:ok) do
# Process.flag(:message_queue_data, :off_heap)
{:ok, %{step: :disconnected}}
end
@impl true
def handle_connect(state) do
query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :create_slot}}
end
@impl true
def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
query =
"START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'postgrex')"
{:stream, query, [], %{state | step: :streaming}}
end
@impl true
# https://www.postgresql.org/docs/14/protocol-replication.html
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
record = PgoutputDecoder.decode_message(rest)
WalHandler.process_message(record)
{:noreply, state}
end
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
end
{:noreply, messages, state}
end
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch
end
Start it!!!
# ReplHandler
Repl.WalHandler.start_link([])
repl_pid =
Repl.ReplConn.start_link(
host: "localhost",
port: 5432,
database: "repl_test",
username: "postgres",
password: "postgres"
)
|> case do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end
Insert some stuff
query = """
INSERT INTO diary_entries (body, tag, user_id)
SELECT
'Lorem ipsum dolor sit amet',
'Tag' || generate_series(1, 10),
(random() * 1000)::int
FROM generate_series(1, 100000);
"""
# Postgrex.query(pid, query, ["This is a journal entry.", 1, "work"])
Postgrex.query(pid, query, [])
Check WAL size
# WAL size reports something for ~30 seconds then reports nothing, while
# the wal continues to come in.
# It seems like Postgres has sent all it's WAL while we are still processing it.
# We would like to put some backpressure on Postgres so that we can safely consume
# the wal without dropping any records.
query = """
select
slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) as replicationSlotLag, active
from
pg_replication_slots;
"""
Postgrex.query(pid, query, [])