$node_A->safe_psql('postgres',
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
+###############################################################################
+# Test that publisher's transactions marked with DELAY_CHKPT_IN_COMMIT prevent
+# concurrently deleted tuples on the subscriber from being removed. This test
+# also acts as a safeguard to prevent developers from moving the commit
+# timestamp acquisition before marking DELAY_CHKPT_IN_COMMIT in
+# RecordTransactionCommitPrepared.
+###############################################################################
+
+my $injection_points_supported = $node_B->check_extension('injection_points');
+
+# This test depends on an injection point to block the prepared transaction
+# commit after marking DELAY_CHKPT_IN_COMMIT flag.
+if ($injection_points_supported != 0)
+{
+ $node_B->append_conf('postgresql.conf',
+ "shared_preload_libraries = 'injection_points'
+ max_prepared_transactions = 1");
+ $node_B->restart;
+
+ # Disable the subscription on Node B for testing only one-way
+ # replication.
+ $node_B->psql('postgres', "ALTER SUBSCRIPTION $subname_BA DISABLE;");
+
+ # Wait for the apply worker to stop
+ $node_B->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+ );
+
+ # Truncate the table to cleanup existing dead rows in the table. Then insert
+ # a new row.
+ $node_B->safe_psql(
+ 'postgres', qq(
+ TRUNCATE tab;
+ INSERT INTO tab VALUES(1, 1);
+ ));
+
+ $node_B->wait_for_catchup($subname_AB);
+
+ # Create the injection_points extension on the publisher node and attach to the
+ # commit-after-delay-checkpoint injection point.
+ $node_B->safe_psql(
+ 'postgres',
+ "CREATE EXTENSION injection_points;
+ SELECT injection_points_attach('commit-after-delay-checkpoint', 'wait');"
+ );
+
+ # Start a background session on the publisher node to perform an update and
+ # pause at the injection point.
+ my $pub_session = $node_B->background_psql('postgres');
+ $pub_session->query_until(
+ qr/starting_bg_psql/,
+ q{
+ \echo starting_bg_psql
+ BEGIN;
+ UPDATE tab SET b = 2 WHERE a = 1;
+ PREPARE TRANSACTION 'txn_with_later_commit_ts';
+ COMMIT PREPARED 'txn_with_later_commit_ts';
+ }
+ );
+
+ # Confirm the update is suspended
+ $result =
+ $node_B->safe_psql('postgres', 'SELECT * FROM tab WHERE a = 1');
+ is($result, qq(1|1), 'publisher sees the old row');
+
+ # Delete the row on the subscriber. The deleted row should be retained due to a
+ # transaction on the publisher, which is currently marked with the
+ # DELAY_CHKPT_IN_COMMIT flag.
+ $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
+
+ # Get the commit timestamp for the delete
+ my $sub_ts = $node_A->safe_psql('postgres',
+ "SELECT timestamp FROM pg_last_committed_xact();");
+
+ $log_location = -s $node_A->logfile;
+
+ # Confirm that the apply worker keeps requesting publisher status, while
+ # awaiting the prepared transaction to commit. Thus, the request log should
+ # appear more than once.
+ $node_A->wait_for_log(
+ qr/sending publisher status request message/,
+ $log_location);
+
+ $log_location = -s $node_A->logfile;
+
+ $node_A->wait_for_log(
+ qr/sending publisher status request message/,
+ $log_location);
+
+ # Confirm that the dead tuple cannot be removed
+ ($cmdret, $stdout, $stderr) =
+ $node_A->psql('postgres', qq(VACUUM (verbose) public.tab;));
+
+ ok($stderr =~ qr/1 are dead but not yet removable/,
+ 'the deleted column is non-removable');
+
+ $log_location = -s $node_A->logfile;
+
+ # Wakeup and detach the injection point on the publisher node. The prepared
+ # transaction should now commit.
+ $node_B->safe_psql(
+ 'postgres',
+ "SELECT injection_points_wakeup('commit-after-delay-checkpoint');
+ SELECT injection_points_detach('commit-after-delay-checkpoint');"
+ );
+
+ # Close the background session on the publisher node
+ ok($pub_session->quit, "close publisher session");
+
+ # Confirm that the transaction committed
+ $result =
+ $node_B->safe_psql('postgres', 'SELECT * FROM tab WHERE a = 1');
+ is($result, qq(1|2), 'publisher sees the new row');
+
+ # Ensure the UPDATE is replayed on subscriber
+ $node_B->wait_for_catchup($subname_AB);
+
+ $logfile = slurp_file($node_A->logfile(), $log_location);
+ ok( $logfile =~
+ qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote row \(1, 2\); replica identity full \(1, 1\)/,
+ 'update target row was deleted in tab');
+
+ # Remember the next transaction ID to be assigned
+ $next_xid =
+ $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+ # Confirm that the xmin value is advanced to the latest nextXid after the
+ # prepared transaction on the publisher has been committed.
+ ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is updated on subscriber"
+ );
+
+ # Confirm that the dead tuple can be removed now
+ ($cmdret, $stdout, $stderr) =
+ $node_A->psql('postgres', qq(VACUUM (verbose) public.tab;));
+
+ ok($stderr =~ qr/1 removed, 0 remain, 0 are dead but not yet removable/,
+ 'the deleted column is removed');
+
+ # Get the commit timestamp for the publisher's update
+ my $pub_ts = $node_B->safe_psql('postgres',
+ "SELECT pg_xact_commit_timestamp(xmin) from tab where a=1;");
+
+ # Check that the commit timestamp for the update on the publisher is later than
+ # or equal to the timestamp of the local deletion, as the commit timestamp
+ # should be assigned after marking the DELAY_CHKPT_IN_COMMIT flag.
+ $result = $node_B->safe_psql('postgres',
+ "SELECT '$pub_ts'::timestamp >= '$sub_ts'::timestamp");
+ is($result, qq(t),
+ "pub UPDATE's timestamp is later than that of sub's DELETE");
+
+ # Re-enable the subscription for further tests
+ $node_B->psql('postgres', "ALTER SUBSCRIPTION $subname_BA ENABLE;");
+}
+
###############################################################################
# Check that dead tuple retention stops due to the wait time surpassing
# max_retention_duration.