wal_decoding: physical/streaming replication walsender slot support
authorAndres Freund <andres@anarazel.de>
Mon, 27 Jan 2014 16:13:48 +0000 (17:13 +0100)
committerRobert Haas <rhaas@postgresql.org>
Mon, 27 Jan 2014 22:15:25 +0000 (17:15 -0500)
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walsender.c
src/include/nodes/nodes.h
src/include/nodes/replnodes.h
src/tools/pgindent/typedefs.list

index 015aa44d89c3be831ffb1c999fab41c65d43f65a..f2b334313c4a19720325cb592e78ead3d65cce8c 100644 (file)
@@ -65,7 +65,7 @@ Node *replication_parse_result;
 }
 
 /* Non-keyword tokens */
-%token <str> SCONST
+%token <str> SCONST IDENT
 %token <uintval> UCONST
 %token <recptr> RECPTR
 
@@ -73,6 +73,8 @@ Node *replication_parse_result;
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
 %token K_START_REPLICATION
+%token K_CREATE_REPLICATION_SLOT
+%token K_DROP_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
 %token K_LABEL
 %token K_PROGRESS
@@ -80,12 +82,16 @@ Node *replication_parse_result;
 %token K_NOWAIT
 %token K_WAL
 %token K_TIMELINE
+%token K_LOGICAL
+%token K_PHYSICAL
+%token K_SLOT
 
 %type <node>   command
-%type <node>   base_backup start_replication identify_system timeline_history
+%type <node>   base_backup start_replication create_replication_slot drop_replication_slot identify_system timeline_history
 %type <list>   base_backup_opt_list
 %type <defelt> base_backup_opt
 %type <uintval>        opt_timeline
+%type <str>            opt_slot
 %%
 
 firstcmd: command opt_semicolon
@@ -102,6 +108,8 @@ command:
                        identify_system
                        | base_backup
                        | start_replication
+                       | create_replication_slot
+                       | drop_replication_slot
                        | timeline_history
                        ;
 
@@ -158,18 +166,42 @@ base_backup_opt:
                                }
                        ;
 
+/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */
+create_replication_slot:
+                       K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL
+                               {
+                                       CreateReplicationSlotCmd *cmd;
+                                       cmd = makeNode(CreateReplicationSlotCmd);
+                                       cmd->kind = REPLICATION_KIND_PHYSICAL;
+                                       cmd->slotname = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
+/* DROP_REPLICATION_SLOT SLOT slot */
+drop_replication_slot:
+                       K_DROP_REPLICATION_SLOT IDENT
+                               {
+                                       DropReplicationSlotCmd *cmd;
+                                       cmd = makeNode(DropReplicationSlotCmd);
+                                       cmd->slotname = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
 /*
- * START_REPLICATION %X/%X [TIMELINE %d]
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
  */
 start_replication:
-                       K_START_REPLICATION RECPTR opt_timeline
+                       K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
                                {
                                        StartReplicationCmd *cmd;
 
                                        cmd = makeNode(StartReplicationCmd);
-                                       cmd->startpoint = $2;
-                                       cmd->timeline = $3;
-
+                                       cmd->kind = REPLICATION_KIND_PHYSICAL;
+                                       cmd->slotname = $2;
+                                       cmd->startpoint = $4;
+                                       cmd->timeline = $5;
                                        $$ = (Node *) cmd;
                                }
                        ;
@@ -205,6 +237,15 @@ timeline_history:
                                        $$ = (Node *) cmd;
                                }
                        ;
+
+opt_physical : K_PHYSICAL | /* EMPTY */;
+
+
+opt_slot :     K_SLOT IDENT
+                               {
+                                       $$ = $2;
+                               }
+                               | /* nothing */                 { $$ = NULL; }
 %%
 
 #include "repl_scanner.c"
index 01e5ac6efb03219eac7702627c0e2cced1bb3880..ea56a44b72950d6bc33678c81faf255d55316637 100644 (file)
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "utils/builtins.h"
+#include "parser/scansup.h"
 
 /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
 #undef fprintf
@@ -48,7 +49,7 @@ static void addlitchar(unsigned char ychar);
 %option warn
 %option prefix="replication_yy"
 
-%x xq
+%x xq xd
 
 /* Extended quote
  * xqdouble implements embedded quote, ''''
@@ -57,12 +58,26 @@ xqstart                     {quote}
 xqdouble               {quote}{quote}
 xqinside               [^']+
 
+/* Double quote
+ * Allows embedded spaces and other special characters into identifiers.
+ */
+dquote                 \"
+xdstart                        {dquote}
+xdstop                 {dquote}
+xddouble               {dquote}{dquote}
+xdinside               [^"]+
+
 digit                  [0-9]+
 hexdigit               [0-9A-Za-z]+
 
 quote                  '
 quotestop              {quote}
 
+ident_start            [A-Za-z\200-\377_]
+ident_cont             [A-Za-z\200-\377_0-9\$]
+
+identifier             {ident_start}{ident_cont}*
+
 %%
 
 BASE_BACKUP                    { return K_BASE_BACKUP; }
@@ -74,9 +89,17 @@ PROGRESS                     { return K_PROGRESS; }
 WAL                    { return K_WAL; }
 TIMELINE                       { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
+CREATE_REPLICATION_SLOT                { return K_CREATE_REPLICATION_SLOT; }
+DROP_REPLICATION_SLOT          { return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY       { return K_TIMELINE_HISTORY; }
+LOGICAL                                { return K_LOGICAL; }
+PHYSICAL                       { return K_PHYSICAL; }
+SLOT                           { return K_SLOT; }
+
 ","                            { return ','; }
 ";"                            { return ';'; }
+"("                            { return '('; }
+")"                            { return ')'; }
 
 [\n]                   ;
 [\t]                   ;
@@ -100,20 +123,49 @@ TIMELINE_HISTORY  { return K_TIMELINE_HISTORY; }
                                        BEGIN(xq);
                                        startlit();
                                }
+
 <xq>{quotestop}        {
                                        yyless(1);
                                        BEGIN(INITIAL);
                                        yylval.str = litbufdup();
                                        return SCONST;
                                }
-<xq>{xqdouble} {
+
+<xq>{xqdouble} {
                                        addlitchar('\'');
                                }
+
 <xq>{xqinside}  {
                                        addlit(yytext, yyleng);
                                }
 
-<xq><<EOF>>            { yyerror("unterminated quoted string"); }
+{xdstart}              {
+                                       BEGIN(xd);
+                                       startlit();
+                               }
+
+<xd>{xdstop}   {
+                                       int len;
+                                       yyless(1);
+                                       BEGIN(INITIAL);
+                                       yylval.str = litbufdup();
+                                       len = strlen(yylval.str);
+                                       truncate_identifier(yylval.str, len, true);
+                                       return IDENT;
+                               }
+
+<xd>{xdinside}  {
+                                       addlit(yytext, yyleng);
+                               }
+
+{identifier}   {
+                                       int len = strlen(yytext);
+
+                                       yylval.str = downcase_truncate_identifier(yytext, len, true);
+                                       return IDENT;
+                               }
+
+<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
 
 
 <<EOF>>                        {
index 652487e3de776d1f0153d2da656fe1597267202b..a8f1504f2418cf378282d6716e4b1996e508557a 100644 (file)
@@ -53,6 +53,7 @@
 #include "miscadmin.h"
 #include "nodes/replnodes.h"
 #include "replication/basebackup.h"
+#include "replication/slot.h"
 #include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -224,6 +225,12 @@ WalSndErrorCleanup()
                sendFile = -1;
        }
 
+       if (MyReplicationSlot != NULL)
+               ReplicationSlotRelease();
+
+       /* do a minimal amount of cleanup for the !ready_to_stop cases */
+       LWLockReleaseAll();
+
        replication_active = false;
        if (walsender_ready_to_stop)
                proc_exit(0);
@@ -421,6 +428,15 @@ StartReplication(StartReplicationCmd *cmd)
         * written at wal_level='minimal'.
         */
 
+       if (cmd->slotname)
+       {
+               ReplicationSlotAcquire(cmd->slotname);
+               if (MyReplicationSlot->database != InvalidOid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        (errmsg("cannot use a replication slot created for changeset extraction for streaming replication"))));
+       }
+
        /*
         * Select the timeline. If it was given explicitly by the client, use
         * that. Otherwise use the timeline of the last replayed record, which is
@@ -565,6 +581,9 @@ StartReplication(StartReplicationCmd *cmd)
                Assert(streamingDoneSending && streamingDoneReceiving);
        }
 
+       if (cmd->slotname)
+               ReplicationSlotRelease();
+
        /*
         * Copy is finished now. Send a single-row result set indicating the next
         * timeline.
@@ -622,6 +641,122 @@ StartReplication(StartReplicationCmd *cmd)
        pq_puttextmessage('C', "START_STREAMING");
 }
 
+/*
+ * Initialize a logical or physical replication slot and wait for an initial
+ * consistent point to start sending changes from.
+ */
+static void
+CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
+{
+       const char *slot_name;
+       StringInfoData buf;
+       char            xpos[MAXFNAMELEN];
+       const char *snapshot_name = NULL;
+
+       Assert(!MyReplicationSlot);
+
+       /* setup state for XLogReadPage */
+       sendTimeLineIsHistoric = false;
+       sendTimeLine = ThisTimeLineID;
+
+       ReplicationSlotCreate(cmd->slotname, cmd->kind == REPLICATION_KIND_LOGICAL);
+
+       initStringInfo(&output_message);
+
+       slot_name = NameStr(MyReplicationSlot->name);
+       snprintf(xpos, sizeof(xpos), "%X/%X",
+                        (uint32) (MyReplicationSlot->confirmed_flush >> 32),
+                        (uint32) MyReplicationSlot->confirmed_flush);
+
+       pq_beginmessage(&buf, 'T');
+       pq_sendint(&buf, 4, 2);         /* 4 fields */
+
+       /* first field: slot name */
+       pq_sendstring(&buf, "replication_id");  /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* second field: LSN at which we became consistent  */
+       pq_sendstring(&buf, "consistent_point");        /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* third field: exported snapshot's name */
+       pq_sendstring(&buf, "snapshot_name");   /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* fourth field: output plugin */
+       pq_sendstring(&buf, "plugin");  /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       pq_endmessage(&buf);
+
+       /* Send a DataRow message */
+       pq_beginmessage(&buf, 'D');
+       pq_sendint(&buf, 4, 2);         /* # of columns */
+
+       /* slot_name */
+       pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */
+       pq_sendbytes(&buf, slot_name, strlen(slot_name));
+
+       /* consistent wal location */
+       pq_sendint(&buf, strlen(xpos), 4); /* col2 len */
+       pq_sendbytes(&buf, xpos, strlen(xpos));
+
+       /* snapshot name */
+       if (snapshot_name != NULL)
+       {
+               pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */
+               pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name));
+       }
+       else
+               pq_sendint(&buf, -1, 4);    /* col3 len, NULL */
+
+       /* plugin */
+       if (cmd->plugin != NULL)
+       {
+               pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */
+               pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin));
+       }
+       else
+               pq_sendint(&buf, -1, 4);    /* col4 len, NULL */
+
+       pq_endmessage(&buf);
+
+       /*
+        * release active status again, START_REPLICATION will reacquire it
+        */
+       ReplicationSlotRelease();
+}
+
+/*
+ * Free permanent state by a now inactive but defined logical slot.
+ */
+static void
+DropReplicationSlot(DropReplicationSlotCmd *cmd)
+{
+       /* no need to check decoding requirements here */;
+       ReplicationSlotDrop(cmd->slotname);
+       EndCommand("DROP_REPLICATION_SLOT", DestRemote);
+}
 /*
  * Execute an incoming replication command.
  */
@@ -660,14 +795,28 @@ exec_replication_command(const char *cmd_string)
                        IdentifySystem();
                        break;
 
-               case T_StartReplicationCmd:
-                       StartReplication((StartReplicationCmd *) cmd_node);
-                       break;
-
                case T_BaseBackupCmd:
                        SendBaseBackup((BaseBackupCmd *) cmd_node);
                        break;
 
+               case T_CreateReplicationSlotCmd:
+                       CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+                       break;
+
+               case T_DropReplicationSlotCmd:
+                       DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+                       break;
+
+               case T_StartReplicationCmd:
+                       {
+                               StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+                               if (cmd->kind == REPLICATION_KIND_PHYSICAL)
+                                       StartReplication(cmd);
+                               else
+                                       elog(ERROR, "cannot handle changeset extraction yet");
+                               break;
+                       }
+
                case T_TimeLineHistoryCmd:
                        SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
                        break;
@@ -830,6 +979,36 @@ ProcessStandbyMessage(void)
        }
 }
 
+/*
+ * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
+ */
+static void
+PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
+{
+       bool changed = false;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile ReplicationSlot *slot = MyReplicationSlot;
+
+       Assert(lsn != InvalidXLogRecPtr);
+       SpinLockAcquire(&slot->mutex);
+       if (slot->restart_decoding != lsn)
+       {
+               changed = true;
+               slot->restart_decoding = lsn;
+       }
+       SpinLockRelease(&slot->mutex);
+
+       if (changed)
+               ReplicationSlotsComputeRequiredLSN();
+
+       /*
+        * One could argue that the slot should saved to disk now, but that'd be
+        * energy wasted - the worst lost information can do here is give us wrong
+        * information in a statistics view - we'll just potentially be more
+        * conservative in removing files.
+        */
+}
+
 /*
  * Regular reply from standby advising of WAL positions on standby server.
  */
@@ -875,6 +1054,49 @@ ProcessStandbyReplyMessage(void)
 
        if (!am_cascading_walsender)
                SyncRepReleaseWaiters();
+
+       /*
+        * Advance our local xmin horizon when the client confirmed a flush.
+        */
+       if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+       {
+               if (MyReplicationSlot->database != InvalidOid)
+                       elog(ERROR, "cannot handle changeset extraction yet");
+               else
+                       PhysicalConfirmReceivedLocation(flushPtr);
+       }
+}
+
+/* compute new replication slot xmin horizon if needed */
+static void
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+{
+       bool changed = false;
+       volatile ReplicationSlot *slot = MyReplicationSlot;
+       SpinLockAcquire(&slot->mutex);
+
+       MyPgXact->xmin = InvalidTransactionId;
+
+       /*
+        * For physical replication we don't need the the interlock provided
+        * by data_xmin and effective_data_xmin since the consequences of a
+        * missed increase aren't bad, so set both at once.
+        */
+       if (!TransactionIdIsNormal(slot->data_xmin) ||
+               !TransactionIdIsNormal(feedbackXmin) ||
+               TransactionIdPrecedes(slot->data_xmin, feedbackXmin))
+       {
+               changed = true;
+               slot->data_xmin = feedbackXmin;
+               slot->effective_data_xmin = feedbackXmin;
+       }
+       SpinLockRelease(&slot->mutex);
+
+       if (changed)
+       {
+               ReplicationSlotSave();
+               ReplicationSlotsComputeRequiredXmin(false);
+       }
 }
 
 /*
@@ -904,6 +1126,8 @@ ProcessStandbyHSFeedbackMessage(void)
        if (!TransactionIdIsNormal(feedbackXmin))
        {
                MyPgXact->xmin = InvalidTransactionId;
+               if (MyReplicationSlot != NULL)
+                       PhysicalReplicationSlotNewXmin(feedbackXmin);
                return;
        }
 
@@ -951,8 +1175,17 @@ ProcessStandbyHSFeedbackMessage(void)
         * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
         * safe, and if we're moving it backwards, well, the data is at risk
         * already since a VACUUM could have just finished calling GetOldestXmin.)
+        *
+        * If we're using a replication slot we reserve the xmin via that,
+        * otherwise via the walsender's PGXACT entry.
+
+        * XXX: It might make sense to introduce ephemeral slots and always use
+        * the slot mechanism.
         */
-       MyPgXact->xmin = feedbackXmin;
+       if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
+               PhysicalReplicationSlotNewXmin(feedbackXmin);
+       else
+               MyPgXact->xmin = feedbackXmin;
 }
 
 /* Main loop of walsender process that streams the WAL over Copy messages. */
index dfcc01344eaf9c7ee75154b429932c89f79f213a..5b8df59bc65d1b3b5e96f050ebe1aa8bb1ce3da2 100644 (file)
@@ -412,6 +412,8 @@ typedef enum NodeTag
         */
        T_IdentifySystemCmd,
        T_BaseBackupCmd,
+       T_CreateReplicationSlotCmd,
+       T_DropReplicationSlotCmd,
        T_StartReplicationCmd,
        T_TimeLineHistoryCmd,
 
index 2f57c7de4dc52ac1946a303bbe5f68d8b8e19c5c..aac75fd1024c551456750bacddc0129791279b6d 100644 (file)
@@ -44,6 +44,30 @@ typedef struct BaseBackupCmd
 } BaseBackupCmd;
 
 
+/* ----------------------
+ *             CREATE_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct CreateReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+       ReplicationKind kind;
+       char       *plugin;
+} CreateReplicationSlotCmd;
+
+
+/* ----------------------
+ *             DROP_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct DropReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+} DropReplicationSlotCmd;
+
+
 /* ----------------------
  *             START_REPLICATION command
  * ----------------------
@@ -51,8 +75,11 @@ typedef struct BaseBackupCmd
 typedef struct StartReplicationCmd
 {
        NodeTag         type;
+       ReplicationKind kind;
+       char       *slotname;
        TimeLineID      timeline;
        XLogRecPtr      startpoint;
+       List       *options;
 } StartReplicationCmd;
 
 
index ad40735333b178fff7463d9a61ec3dff5fc151a1..3b7f61ef20865bc55e734d942ba4d6d28c1f2520 100644 (file)
@@ -343,6 +343,7 @@ CreateOpClassItem
 CreateOpClassStmt
 CreateOpFamilyStmt
 CreatePLangStmt
+CreateReplicationSlotCmd
 CreateRangeStmt
 CreateRoleStmt
 CreateSchemaStmt
@@ -416,6 +417,7 @@ DomainConstraintType
 DomainIOData
 DropBehavior
 DropOwnedStmt
+DropReplicationSlotCmd
 DropRoleStmt
 DropStmt
 DropTableSpaceStmt