Assert(bdr_apply_con != NULL);
- origlsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
- committime = *(TimestampTz *) pq_getmsgbytes(s, 8);
+ origlsn = pq_getmsgint64(s);
+ committime = pq_getmsgint64(s);
/* setup state for commit and conflict detection */
replication_origin_lsn = origlsn;
/* don't want the overhead otherwise */
if (bdr_apply_con->apply_delay > 0)
{
- current = GetCurrentTimestamp();
-#ifndef HAVE_INT64_TIMESTAMP
-#error "we require integer timestamps"
-#endif
+ current = GetCurrentIntegerTimestamp();
+
/* ensure no weirdness due to clock drift */
if (current > replication_origin_timestamp)
{
Assert(bdr_apply_con != NULL);
- origlsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
- end_lsn = *(XLogRecPtr *) pq_getmsgbytes(s, 8);
- committime = *(TimestampTz *) pq_getmsgbytes(s, 8);
+ origlsn = pq_getmsgint64(s);
+ end_lsn = pq_getmsgint64(s);
+ committime = pq_getmsgint64(s);
elog(LOG, "COMMIT origin(lsn, end, timestamp): %X/%X, %X/%X, %s",
(uint32) (origlsn >> 32), (uint32) origlsn,
return;
OutputPluginPrepareWrite(ctx, true);
- appendStringInfoChar(ctx->out, 'B'); /* BEGIN */
- appendBinaryStringInfo(ctx->out, (char *) &txn->final_lsn, sizeof(XLogRecPtr));
- appendBinaryStringInfo(ctx->out, (char *) &txn->commit_time, sizeof(TimestampTz));
+ pq_sendbyte(ctx->out, 'B'); /* BEGIN */
+ pq_sendint64(ctx->out, txn->final_lsn);
+ pq_sendint64(ctx->out, txn->commit_time);
OutputPluginWrite(ctx, true);
return;
}
return;
OutputPluginPrepareWrite(ctx, true);
- appendStringInfoChar(ctx->out, 'C'); /* sending COMMIT */
- appendBinaryStringInfo(ctx->out, (char *) &commit_lsn, sizeof(XLogRecPtr));
- appendBinaryStringInfo(ctx->out, (char *) &txn->end_lsn, sizeof(XLogRecPtr));
- appendBinaryStringInfo(ctx->out, (char *) &txn->commit_time, sizeof(TimestampTz));
+ pq_sendbyte(ctx->out, 'C'); /* sending COMMIT */
+ pq_sendint64(ctx->out, commit_lsn);
+ pq_sendint64(ctx->out, txn->end_lsn);
+ pq_sendint64(ctx->out, txn->commit_time);
OutputPluginWrite(ctx, true);
}
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
- appendStringInfoChar(ctx->out, 'I'); /* action INSERT */
+ pq_sendbyte(ctx->out, 'I'); /* action INSERT */
write_rel(ctx->out, relation);
- appendStringInfoChar(ctx->out, 'N'); /* new tuple follows */
+ pq_sendbyte(ctx->out, 'N'); /* new tuple follows */
write_tuple(data, ctx->out, relation, &change->data.tp.newtuple->tuple);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
- appendStringInfoChar(ctx->out, 'U'); /* action UPDATE */
+ pq_sendbyte(ctx->out, 'U'); /* action UPDATE */
write_rel(ctx->out, relation);
if (change->data.tp.oldtuple != NULL)
{
- appendStringInfoChar(ctx->out, 'K'); /* old key follows */
+ pq_sendbyte(ctx->out, 'K'); /* old key follows */
write_tuple(data, ctx->out, relation,
&change->data.tp.oldtuple->tuple);
}
- appendStringInfoChar(ctx->out, 'N'); /* new tuple follows */
+ pq_sendbyte(ctx->out, 'N'); /* new tuple follows */
write_tuple(data, ctx->out, relation,
&change->data.tp.newtuple->tuple);
break;
case REORDER_BUFFER_CHANGE_DELETE:
- appendStringInfoChar(ctx->out, 'D'); /* action DELETE */
+ pq_sendbyte(ctx->out, 'D'); /* action DELETE */
write_rel(ctx->out, relation);
if (change->data.tp.oldtuple != NULL)
{
- appendStringInfoChar(ctx->out, 'K'); /* old key follows */
+ pq_sendbyte(ctx->out, 'K'); /* old key follows */
write_tuple(data, ctx->out, relation,
&change->data.tp.oldtuple->tuple);
}
else
- appendStringInfoChar(ctx->out, 'E'); /* empty */
+ pq_sendbyte(ctx->out, 'E'); /* empty */
break;
default:
Assert(false);
desc = RelationGetDescr(rel);
- appendStringInfoChar(out, 'T'); /* tuple follows */
+ pq_sendbyte(out, 'T'); /* tuple follows */
pq_sendint(out, desc->natts, 4); /* number of attributes */
if (isnull[i] || att->attisdropped)
{
- appendStringInfoChar(out, 'n'); /* null column */
+ pq_sendbyte(out, 'n'); /* null column */
continue;
}
else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
{
- appendStringInfoChar(out, 'u'); /* unchanged toast column */
+ pq_sendbyte(out, 'u'); /* unchanged toast column */
continue;
}
if (use_binary)
{
- appendStringInfoChar(out, 'b'); /* binary data follows */
+ pq_sendbyte(out, 'b'); /* binary data follows */
/* pass by value */
if (att->attbyval)
bytea *outputbytes;
int len;
- appendStringInfoChar(out, 's'); /* 'send' data follows */
+ pq_sendbyte(out, 's'); /* 'send' data follows */
outputbytes =
OidSendFunctionCall(typclass->typsend, values[i]);
char *outputstr;
int len;
- appendStringInfoChar(out, 's'); /* 'text' data follows */
+ pq_sendbyte(out, 's'); /* 'text' data follows */
outputstr =
OidOutputFunctionCall(typclass->typoutput, values[i]);