Using COPY on Postgrex Replication Start

I am playing around with Postgrex.ReplicationConnection and was wondering if anyone knew how to do an initial COPY of the table data on initialization. I have a small table that I’m replicating and wanted the ability to copy over existing data. I am using a modified version of the Repl Module and can’t quite get it figured out.

My handle_connect function looks like the following. Note the change from the original example’s NOEXPORT_SNAPSHOT to EXPORT_SNAPSHOT.

def handle_connect(state) do
  query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput EXPORT_SNAPSHOT"
  {:query, query, %{state | step: :create_slot}}
end

Then in my handle_result function I have the following:

def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
  [%{rows: [row | _], columns: columns} | _] = results
  res = Enum.zip(columns, row) |> Enum.into(%{})
  query = """
  BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
  SET TRANSACTION SNAPSHOT '#{res["snapshot_name"]}';
  COPY contacts TO STDOUT WITH HEADER;
  COMMIT;
  """
  {:stream, query, [], %{state | step: :streaming}}
end

Without any changes to the Postgrex code, I get the following error:

[error] :gen_statem #PID<0.1300.0> terminating
** (CaseClauseError) no case clause matching: {:ok, {:msg_command_complete, "BEGIN"}, <<69, 0, 0, 0, 111, 83, 69, 82, 82, 79, 82, 0, 86, 69, 82, 82, 79, 82, 0, 67, 50, 50, 48, 50, 51, 0, 77, 105, 110, 118, 97, 108, 105, 100, 32, 115, 110, 97, 112, 115, 104, 111, 116, 32, 105, 100, 101, ...>>}
    (postgrex 0.18.0) lib/postgrex/protocol.ex:1325: Postgrex.Protocol.recv_streaming/2
    (postgrex 0.18.0) lib/postgrex/replication_connection.ex:566: Postgrex.ReplicationConnection.handle/5
    (postgrex 0.18.0) lib/postgrex/replication_connection.ex:474: Postgrex.ReplicationConnection.init/1
    (stdlib 6.0) gen_statem.erl:2695: :gen_statem.init_it/6
    (stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
Queue: []
Postponed: []
State: {:undefined, :undefined}
Callback mode: :state_functions, state_enter: false

If I modify Postgrex.Protocol#L1331 with the following:

      {:ok, msg_command_complete(tag: "BEGIN"), buffer} ->
        {:ok, %{s | buffer: buffer}}

The app starts up, but I don’t see any streaming response. I must admit, I’m a bit over my head with the edit to Postgrex.Protocol without spending some more time with it.

Does anyone have any pointers or other ways I can do the initial sync of the table? Thanks in advance!

– Dan