I am playing around with Postgrex.ReplicationConnection and was wondering if anyone knew how to do an initial COPY of the table data on initialization. I have a small table that I’m replicating and wanted the ability to copy over existing data. I am using a modified version of the Repl Module and can’t quite get it figured out.
My handle_connect
function looks like the following. Note the change from the original example’s NOEXPORT_SNAPSHOT
to EXPORT_SNAPSHOT
.
def handle_connect(state) do
query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput EXPORT_SNAPSHOT"
{:query, query, %{state | step: :create_slot}}
end
Then in my handle_result
function I have the following:
def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
[%{rows: [row | _], columns: columns} | _] = results
res = Enum.zip(columns, row) |> Enum.into(%{})
query = """
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT '#{res["snapshot_name"]}';
COPY contacts TO STDOUT WITH HEADER;
COMMIT;
"""
{:stream, query, [], %{state | step: :streaming}}
end
Without any changes to the Postgrex code, I get the following error:
[error] :gen_statem #PID<0.1300.0> terminating
** (CaseClauseError) no case clause matching: {:ok, {:msg_command_complete, "BEGIN"}, <<69, 0, 0, 0, 111, 83, 69, 82, 82, 79, 82, 0, 86, 69, 82, 82, 79, 82, 0, 67, 50, 50, 48, 50, 51, 0, 77, 105, 110, 118, 97, 108, 105, 100, 32, 115, 110, 97, 112, 115, 104, 111, 116, 32, 105, 100, 101, ...>>}
(postgrex 0.18.0) lib/postgrex/protocol.ex:1325: Postgrex.Protocol.recv_streaming/2
(postgrex 0.18.0) lib/postgrex/replication_connection.ex:566: Postgrex.ReplicationConnection.handle/5
(postgrex 0.18.0) lib/postgrex/replication_connection.ex:474: Postgrex.ReplicationConnection.init/1
(stdlib 6.0) gen_statem.erl:2695: :gen_statem.init_it/6
(stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
Queue: []
Postponed: []
State: {:undefined, :undefined}
Callback mode: :state_functions, state_enter: false
If I modify Postgrex.Protocol#L1331 with the following:
{:ok, msg_command_complete(tag: "BEGIN"), buffer} ->
{:ok, %{s | buffer: buffer}}
The app starts up, but I don’t see any streaming response. I must admit, I’m a bit over my head with the edit to Postgrex.Protocol without spending some more time with it.
Does anyone have any pointers or other ways I can do the initial sync of the table? Thanks in advance!
– Dan