Using COPY on Postgrex Replication Start

I am playing around with Postgrex.ReplicationConnection and was wondering if anyone knew how to do an initial COPY of the table data on initialization. I have a small table that I’m replicating and wanted the ability to copy over existing data. I am using a modified version of the Repl Module and can’t quite get it figured out.

My handle_connect function looks like the following. Note the change from the original example’s NOEXPORT_SNAPSHOT to EXPORT_SNAPSHOT.

def handle_connect(state) do
  query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput EXPORT_SNAPSHOT"
  {:query, query, %{state | step: :create_slot}}
end

Then in my handle_result function I have the following:

def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
  [%{rows: [row | _], columns: columns} | _] = results
  res = Enum.zip(columns, row) |> Enum.into(%{})
  query = """
  BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
  SET TRANSACTION SNAPSHOT '#{res["snapshot_name"]}';
  COPY contacts TO STDOUT WITH HEADER;
  COMMIT;
  """
  {:stream, query, [], %{state | step: :streaming}}
end

Without any changes to the Postgrex code, I get the following error:

[error] :gen_statem #PID<0.1300.0> terminating
** (CaseClauseError) no case clause matching: {:ok, {:msg_command_complete, "BEGIN"}, <<69, 0, 0, 0, 111, 83, 69, 82, 82, 79, 82, 0, 86, 69, 82, 82, 79, 82, 0, 67, 50, 50, 48, 50, 51, 0, 77, 105, 110, 118, 97, 108, 105, 100, 32, 115, 110, 97, 112, 115, 104, 111, 116, 32, 105, 100, 101, ...>>}
    (postgrex 0.18.0) lib/postgrex/protocol.ex:1325: Postgrex.Protocol.recv_streaming/2
    (postgrex 0.18.0) lib/postgrex/replication_connection.ex:566: Postgrex.ReplicationConnection.handle/5
    (postgrex 0.18.0) lib/postgrex/replication_connection.ex:474: Postgrex.ReplicationConnection.init/1
    (stdlib 6.0) gen_statem.erl:2695: :gen_statem.init_it/6
    (stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
Queue: []
Postponed: []
State: {:undefined, :undefined}
Callback mode: :state_functions, state_enter: false

If I modify Postgrex.Protocol#L1331 with the following:

      {:ok, msg_command_complete(tag: "BEGIN"), buffer} ->
        {:ok, %{s | buffer: buffer}}

The app starts up, but I don’t see any streaming response. I must admit, I’m a bit over my head with the edit to Postgrex.Protocol without spending some more time with it.

Does anyone have any pointers or other ways I can do the initial sync of the table? Thanks in advance!

– Dan

I finally got a working solution, but it still took a change to the postgrex Postgrex.ReplicationConnection.handle_data/2 function so the stream is ended and we can start another stream. I need to open an issue with Postgrex to make sure this is feasible, but this is the change if anyone wants to play around with it.

diff --git a/lib/postgrex/replication_connection.ex b/lib/postgrex/replication_connection.ex
index 8fcde20..a841703 100644
--- a/lib/postgrex/replication_connection.ex
+++ b/lib/postgrex/replication_connection.ex
@@ -527,7 +527,7 @@ def handle_event(:info, msg, @state, %{protocol: protocol, streaming: streaming}
   defp handle_data([], s), do: {:keep_state, s}
 
   defp handle_data([:copy_done | copies], %{state: {mod, mod_state}} = s) do
-    with {:keep_state, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do
+    with {:keep_state, s} <- handle(mod, :handle_data, [:done, mod_state], nil, %{s | streaming: nil}) do
       handle_data(copies, %{s | streaming: nil})
     end
   end

Here are the relevant functions needed to kick off the COPY + replication.

defmodule WalexFun.Repl do
  use Postgrex.ReplicationConnection

  # Start a read-only transaction that will house the USE_SNAPSHOT feature of replication.
  def handle_connect(state) do
    query = "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"

    {:query, query, %{state | step: :start_transaction}}
  end

  # Create the Replication slot using the transaction Snapshot
  def handle_result([%{command: :begin} | _] = _, %{step: :start_transaction} = state) do
    query = """
    CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput USE_SNAPSHOT;
    """

    {:query, query, %{state | step: :create_slot}}
  end

  # Do the COPY and COMMIT
  @impl true
  def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
    query = """
    COPY contacts TO STDOUT;
    COMMIT;
    """

    {:stream, query, [], %{state | step: :commit_slot}}
  end

  # Once the COPY is done and the other stream is closed, start the slot and start streaming.
  def handle_data(:done, state) do
    IO.inspect("-------- IN DONE HANDLER --------")

    query = "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'events')"

    {:stream, query, [], %{state | step: :streaming}}
  end
2 Likes

This is a great reference. Logical replication is one of those things that is very powerful but there’s definitely a dearth of more complex examples.