bdr: Properly send timestamps, xlogrecptrs as int64s
authorAndres Freund <andres@anarazel.de>
Sun, 16 Mar 2014 22:22:14 +0000 (23:22 +0100)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Jul 2014 15:55:21 +0000 (17:55 +0200)
contrib/bdr/bdr_apply.c
contrib/bdr/bdr_output.c

index 0178bbe1fd3b9c3be4a51764c70224358e2f2624..1ff969d0bdb4a71a25b59661b557b88aa21a2627 100644 (file)
@@ -82,8 +82,8 @@ process_remote_begin(StringInfo s)
 
    Assert(bdr_apply_con != NULL);
 
-   origlsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
-   committime = *(TimestampTz *) pq_getmsgbytes(s, 8);
+   origlsn = pq_getmsgint64(s);
+   committime = pq_getmsgint64(s);
 
    /* setup state for commit and conflict detection */
    replication_origin_lsn = origlsn;
@@ -102,10 +102,8 @@ process_remote_begin(StringInfo s)
    /* don't want the overhead otherwise */
    if (bdr_apply_con->apply_delay > 0)
    {
-       current = GetCurrentTimestamp();
-#ifndef HAVE_INT64_TIMESTAMP
-#error "we require integer timestamps"
-#endif
+       current = GetCurrentIntegerTimestamp();
+
        /* ensure no weirdness due to clock drift */
        if (current > replication_origin_timestamp)
        {
@@ -136,9 +134,9 @@ process_remote_commit(StringInfo s)
 
    Assert(bdr_apply_con != NULL);
 
-   origlsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
-   end_lsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
-   committime = *(TimestampTz *) pq_getmsgbytes(s, 8);
+   origlsn = pq_getmsgint64(s);
+   end_lsn = pq_getmsgint64(s);
+   committime = pq_getmsgint64(s);
 
    elog(LOG, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
         (uint32) (origlsn >> 32), (uint32) origlsn,
index ab40ed757c5701148271ca78845b1dda894bf7f7..12f5ef9fa5ebb41b4798f76771e8a5a87a8ed587 100644 (file)
@@ -304,9 +304,9 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
        return;
 
    OutputPluginPrepareWrite(ctx, true);
-   appendStringInfoChar(ctx->out, 'B');        /* BEGIN */
-   appendBinaryStringInfo(ctx->out, (char *) &txn->final_lsn, sizeof(XLogRecPtr));
-   appendBinaryStringInfo(ctx->out, (char *) &txn->commit_time, sizeof(TimestampTz));
+   pq_sendbyte(ctx->out, 'B');     /* BEGIN */
+   pq_sendint64(ctx->out, txn->final_lsn);
+   pq_sendint64(ctx->out, txn->commit_time);
    OutputPluginWrite(ctx, true);
    return;
 }
@@ -324,10 +324,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        return;
 
    OutputPluginPrepareWrite(ctx, true);
-   appendStringInfoChar(ctx->out, 'C');        /* sending COMMIT */
-   appendBinaryStringInfo(ctx->out, (char *) &commit_lsn, sizeof(XLogRecPtr));
-   appendBinaryStringInfo(ctx->out, (char *) &txn->end_lsn, sizeof(XLogRecPtr));
-   appendBinaryStringInfo(ctx->out, (char *) &txn->commit_time, sizeof(TimestampTz));
+   pq_sendbyte(ctx->out, 'C');     /* sending COMMIT */
+   pq_sendint64(ctx->out, commit_lsn);
+   pq_sendint64(ctx->out, txn->end_lsn);
+   pq_sendint64(ctx->out, txn->commit_time);
    OutputPluginWrite(ctx, true);
 }
 
@@ -352,35 +352,35 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    switch (change->action)
    {
        case REORDER_BUFFER_CHANGE_INSERT:
-           appendStringInfoChar(ctx->out, 'I');        /* action INSERT */
+           pq_sendbyte(ctx->out, 'I');     /* action INSERT */
            write_rel(ctx->out, relation);
-           appendStringInfoChar(ctx->out, 'N');        /* new tuple follows */
+           pq_sendbyte(ctx->out, 'N');     /* new tuple follows */
            write_tuple(data, ctx->out, relation, &change->data.tp.newtuple->tuple);
            break;
        case REORDER_BUFFER_CHANGE_UPDATE:
-           appendStringInfoChar(ctx->out, 'U');        /* action UPDATE */
+           pq_sendbyte(ctx->out, 'U');     /* action UPDATE */
            write_rel(ctx->out, relation);
            if (change->data.tp.oldtuple != NULL)
            {
-               appendStringInfoChar(ctx->out, 'K');    /* old key follows */
+               pq_sendbyte(ctx->out, 'K'); /* old key follows */
                write_tuple(data, ctx->out, relation,
                            &change->data.tp.oldtuple->tuple);
            }
-           appendStringInfoChar(ctx->out, 'N');        /* new tuple follows */
+           pq_sendbyte(ctx->out, 'N');     /* new tuple follows */
            write_tuple(data, ctx->out, relation,
                        &change->data.tp.newtuple->tuple);
            break;
        case REORDER_BUFFER_CHANGE_DELETE:
-           appendStringInfoChar(ctx->out, 'D');        /* action DELETE */
+           pq_sendbyte(ctx->out, 'D');     /* action DELETE */
            write_rel(ctx->out, relation);
            if (change->data.tp.oldtuple != NULL)
            {
-               appendStringInfoChar(ctx->out, 'K');    /* old key follows */
+               pq_sendbyte(ctx->out, 'K'); /* old key follows */
                write_tuple(data, ctx->out, relation,
                            &change->data.tp.oldtuple->tuple);
            }
            else
-               appendStringInfoChar(ctx->out, 'E');    /* empty */
+               pq_sendbyte(ctx->out, 'E'); /* empty */
            break;
        default:
            Assert(false);
@@ -462,7 +462,7 @@ write_tuple(BdrOutputData *data, StringInfo out, Relation rel,
 
    desc = RelationGetDescr(rel);
 
-   appendStringInfoChar(out, 'T');         /* tuple follows */
+   pq_sendbyte(out, 'T');          /* tuple follows */
 
    pq_sendint(out, desc->natts, 4);        /* number of attributes */
 
@@ -489,12 +489,12 @@ write_tuple(BdrOutputData *data, StringInfo out, Relation rel,
 
        if (isnull[i] || att->attisdropped)
        {
-           appendStringInfoChar(out, 'n'); /* null column */
+           pq_sendbyte(out, 'n');  /* null column */
            continue;
        }
        else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
        {
-           appendStringInfoChar(out, 'u'); /* unchanged toast column */
+           pq_sendbyte(out, 'u');  /* unchanged toast column */
            continue;
        }
 
@@ -507,7 +507,7 @@ write_tuple(BdrOutputData *data, StringInfo out, Relation rel,
 
        if (use_binary)
        {
-           appendStringInfoChar(out, 'b'); /* binary data follows */
+           pq_sendbyte(out, 'b');  /* binary data follows */
 
            /* pass by value */
            if (att->attbyval)
@@ -558,7 +558,7 @@ write_tuple(BdrOutputData *data, StringInfo out, Relation rel,
            bytea      *outputbytes;
            int         len;
 
-           appendStringInfoChar(out, 's'); /* 'send' data follows */
+           pq_sendbyte(out, 's');  /* 'send' data follows */
 
            outputbytes =
                OidSendFunctionCall(typclass->typsend, values[i]);
@@ -573,7 +573,7 @@ write_tuple(BdrOutputData *data, StringInfo out, Relation rel,
            char       *outputstr;
            int         len;
 
-           appendStringInfoChar(out, 's'); /* 'text' data follows */
+           pq_sendbyte(out, 's');  /* 'text' data follows */
 
            outputstr =
                OidOutputFunctionCall(typclass->typoutput, values[i]);