</para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>two_phase_at</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       The address (<literal>LSN</literal>) from which the decoding of prepared
+       transactions is enabled. <literal>NULL</literal> for logical slots
+       where <structfield>two_phase</structfield> is false and for physical slots.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>inactive_since</structfield> <type>timestamptz</type>
 
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
+            L.two_phase_at,
             L.inactive_since,
             L.conflicting,
             L.invalidation_reason,
 
    bool        failover;
    XLogRecPtr  restart_lsn;
    XLogRecPtr  confirmed_lsn;
+   XLogRecPtr  two_phase_at;
    TransactionId catalog_xmin;
 
    /* RS_INVAL_NONE if valid, or the reason of invalidation */
    if (remote_dbid != slot->data.database ||
        remote_slot->two_phase != slot->data.two_phase ||
        remote_slot->failover != slot->data.failover ||
-       strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+       strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
+       remote_slot->two_phase_at != slot->data.two_phase_at)
    {
        NameData    plugin_name;
 
        slot->data.plugin = plugin_name;
        slot->data.database = remote_dbid;
        slot->data.two_phase = remote_slot->two_phase;
+       slot->data.two_phase_at = remote_slot->two_phase_at;
        slot->data.failover = remote_slot->failover;
        SpinLockRelease(&slot->mutex);
 
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
    Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-   LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+   LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
 
    WalRcvExecResult *res;
    TupleTableSlot *tupslot;
    bool        some_slot_updated = false;
    bool        started_tx = false;
    const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-       " restart_lsn, catalog_xmin, two_phase, failover,"
+       " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
        " database, invalidation_reason"
        " FROM pg_catalog.pg_replication_slots"
        " WHERE failover and NOT temporary";
                                                           &isnull));
        Assert(!isnull);
 
+       d = slot_getattr(tupslot, ++col, &isnull);
+       remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
        remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
                                                          &isnull));
        Assert(!isnull);
 
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    XLogRecPtr  currlsn;
    int         slotno;
 
        values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+       if (slot_contents.data.two_phase &&
+           !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
+           values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
+       else
+           nulls[i++] = true;
+
        if (slot_contents.inactive_since > 0)
            values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
        else
 
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202504021
+#define CATALOG_VERSION_NO 202504031
 
 #endif
 
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
 
 # Disable autovacuum to avoid generating xid during stats update as otherwise
 # the new XID could then be replicated to standby at some random point making
 # slots at primary lag behind standby during slot sync.
-$publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$publisher->append_conf(
+   'postgresql.conf', qq{
+autovacuum = off
+max_prepared_transactions = 1
+});
 $publisher->start;
 
 $publisher->safe_psql('postgres',
 # Create a subscriber node, wait for sync to complete
 my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
+$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1');
 $subscriber1->start;
 
 # Capture the time before the logical failover slot is created on the
    "'sb1_slot'");
 $primary->reload;
 
+##################################################
+# Test the synchronization of the two_phase setting for a subscription with the
+# standby. Additionally, prepare a transaction before enabling the two_phase
+# option; subsequent tests will verify if it can be correctly replicated to the
+# subscriber after committing it on the promoted standby.
+##################################################
+
+$standby1->start;
+
+# Prepare a transaction
+$primary->safe_psql(
+   'postgres', qq[
+   BEGIN;
+   INSERT INTO tab_int values(0);
+   PREPARE TRANSACTION 'test_twophase_slotsync';
+]);
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_catchup('regress_mysub1');
+
+# Disable the subscription to allow changing the two_phase option.
+$subscriber1->safe_psql('postgres',
+   "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+   'postgres',
+   "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
+   1);
+
+# Set two_phase to true and enable the subscription
+$subscriber1->safe_psql(
+   'postgres', qq[
+   ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true);
+   ALTER SUBSCRIPTION regress_mysub1 ENABLE;
+]);
+
+$primary->wait_for_catchup('regress_mysub1');
+
+my $two_phase_at = $primary->safe_psql('postgres',
+   "SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
+);
+
+# Confirm that two_phase setting of lsub1_slot slot is synced to the standby
+ok( $standby1->poll_query_until(
+       'postgres',
+       "SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
+   ),
+   'two_phase setting of slot lsub1_slot synced to standby');
+
+# Confirm that the prepared transaction is not yet replicated to the
+# subscriber.
+$result = $subscriber1->safe_psql('postgres',
+   "SELECT count(*) = 0 FROM pg_prepared_xacts;");
+is($result, 't',
+   "the prepared transaction is not replicated to the subscriber");
+
 ##################################################
 # Promote the standby1 to primary. Confirm that:
 # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
 # b) logical replication for regress_mysub1 is resumed successfully after failover
-# c) changes can be consumed from the synced slot 'snap_test_slot'
+# c) changes from the transaction prepared 'test_twophase_slotsync' can be
+#    consumed from the synced slot 'snap_test_slot' once committed on the new
+#    primary.
+# d) changes can be consumed from the synced slot 'snap_test_slot'
 ##################################################
-$standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
 # Capture the time before the standby is promoted
    't',
    'synced slot retained on the new primary');
 
+# Commit the prepared transaction
+$standby1->safe_psql('postgres',
+   "COMMIT PREPARED 'test_twophase_slotsync';");
+$standby1->wait_for_catchup('regress_mysub1');
+
+# Confirm that the prepared transaction is replicated to the subscriber
+is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
+   "11", 'prepared data replicated from the new primary');
+
 # Insert data on the new primary
 $standby1->safe_psql('postgres',
    "INSERT INTO tab_int SELECT generate_series(11, 20);");
 
 # Confirm that data in tab_int replicated on the subscriber
 is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
-   "20", 'data replicated from the new primary');
+   "21", 'data replicated from the new primary');
 
 # Consume the data from the snap_test_slot. The synced slot should reach a
 # consistent point by restoring the snapshot at the restart_lsn serialized
 
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
+    l.two_phase_at,
     l.inactive_since,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,