XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecord *record;
- XLogSubRecord *subrecord;
- XLogRecPtr RecPtr;
- uint32 len = hdrlen + buflen,
- freespace,
- wlen;
- uint16 curridx;
- bool updrqst = false;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogRecord *record;
+ XLogSubRecord *subrecord;
+ XLogRecPtr RecPtr;
+ uint32 len = hdrlen + buflen,
+ freespace,
+ wlen;
+ uint16 curridx;
+ bool updrqst = false;
+ bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
+
+ if (info & XLR_INFO_MASK)
+ {
+ if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
+ elog(STOP, "XLogInsert: invalid info mask %02X",
+ (info & XLR_INFO_MASK));
+ no_tran = true;
+ info &= ~XLR_INFO_MASK;
+ }
- Assert(!(info & XLR_INFO_MASK));
if (len == 0 || len > MAXLOGRECSZ)
elog(STOP, "XLogInsert: invalid record len %u", len);
freespace -= SizeOfXLogRecord;
record = (XLogRecord *) Insert->currpos;
record->xl_prev = Insert->PrevRecord;
- if (rmid != RM_XLOG_ID)
- record->xl_xact_prev = MyLastRecPtr;
- else
+ if (no_tran)
{
record->xl_xact_prev.xlogid = 0;
record->xl_xact_prev.xrecoff = 0;
}
+ else
+ record->xl_xact_prev = MyLastRecPtr;
+
record->xl_xid = GetCurrentTransactionId();
record->xl_len = (len > freespace) ? freespace : len;
record->xl_info = (len > freespace) ?
RecPtr.xrecoff =
XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
Insert->currpos - ((char *) Insert->currpage);
- if (MyLastRecPtr.xrecoff == 0 && rmid != RM_XLOG_ID)
+ if (MyLastRecPtr.xrecoff == 0 && !no_tran)
{
SpinAcquire(SInvalLock);
MyProc->logRec = RecPtr;
#define SEQ_MAXVALUE ((int4)0x7FFFFFFF)
#define SEQ_MINVALUE -(SEQ_MAXVALUE)
-typedef struct FormData_pg_sequence
-{
- NameData sequence_name;
- int4 last_value;
- int4 increment_by;
- int4 max_value;
- int4 min_value;
- int4 cache_value;
- char is_cycled;
- char is_called;
-} FormData_pg_sequence;
-
-typedef FormData_pg_sequence *Form_pg_sequence;
+/*
+ * We don't want to log each fetching values from sequences,
+ * so we pre-log a few fetches in advance. In the event of
+ * crash we can lose as much as we pre-logged.
+ */
+#define SEQ_LOG_VALS 32
typedef struct sequence_magic
{
coldef->colname = "cache_value";
value[i - 1] = Int32GetDatum(new.cache_value);
break;
+ case SEQ_COL_LOG:
+ typnam->name = "int4";
+ coldef->colname = "log_cnt";
+ value[i - 1] = Int32GetDatum((int32)1);
+ break;
case SEQ_COL_CYCLE:
typnam->name = "char";
coldef->colname = "is_cycled";
int32 incby,
maxv,
minv,
- cache;
+ cache,
+ log,
+ fetch,
+ last;
int32 result,
next,
rescnt = 0;
+ bool logit = false;
if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
- next = result = seq->last_value;
+ last = next = result = seq->last_value;
incby = seq->increment_by;
maxv = seq->max_value;
minv = seq->min_value;
- cache = seq->cache_value;
+ fetch = cache = seq->cache_value;
+ log = seq->log_cnt;
if (seq->is_called != 't')
+ {
rescnt++; /* last_value if not called */
+ fetch--;
+ log--;
+ }
- while (rescnt < cache) /* try to fetch cache numbers */
+ if (log < fetch)
+ {
+ fetch = log = fetch - log + SEQ_LOG_VALS;
+ logit = true;
+ }
+
+ while (fetch) /* try to fetch cache [+ log ] numbers */
{
/*
(maxv < 0 && next + incby > maxv))
{
if (rescnt > 0)
- break; /* stop caching */
+ break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MAXVALUE (%d)",
elm->name, maxv);
(minv >= 0 && next + incby < minv))
{
if (rescnt > 0)
- break; /* stop caching */
+ break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MINVALUE (%d)",
elm->name, minv);
else
next += incby;
}
- rescnt++; /* got result */
- if (rescnt == 1) /* if it's first one - */
- result = next; /* it's what to return */
+ fetch--;
+ if (rescnt < cache)
+ {
+ log--;
+ rescnt++;
+ last = next;
+ if (rescnt == 1) /* if it's first result - */
+ result = next; /* it's what to return */
+ }
}
/* save info in local cache */
elm->last = result; /* last returned number */
- elm->cached = next; /* last cached number */
+ elm->cached = last; /* last fetched number */
+
+ if (logit)
+ {
+ xl_seq_rec xlrec;
+ XLogRecPtr recptr;
+
+ if (fetch) /* not all numbers were fetched */
+ log -= fetch;
+
+ xlrec.node = elm->rel->rd_node;
+ xlrec.value = next;
+
+ recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN,
+ (char*) &xlrec, sizeof(xlrec), NULL, 0);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+ }
/* save info in sequence relation */
- seq->last_value = next; /* last fetched number */
+ seq->last_value = last; /* last fetched number */
+ Assert(log >= 0);
+ seq->log_cnt = log; /* how much is logged */
seq->is_called = 't';
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled ? 't' : 'f';
+ seq->log_cnt = (iscalled) ? 0 : 1;
+
+ {
+ xl_seq_rec xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.node = elm->rel->rd_node;
+ xlrec.value = next;
+
+ recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN,
+ (char*) &xlrec, sizeof(xlrec), NULL, 0);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetSUI(BufferGetPage(buf), ThisStartUpID);
+ }
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
-
static int
get_param(DefElem *def)
{
elog(ERROR, "DefineSequence: \"%s\" is to be integer", def->defname);
return -1;
}
+
+void seq_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ Relation reln;
+ Buffer buffer;
+ Page page;
+ ItemId lp;
+ HeapTupleData tuple;
+ Form_pg_sequence seq;
+ xl_seq_rec *xlrec;
+
+ if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET)
+ elog(STOP, "seq_redo: unknown op code %u", info);
+
+ xlrec = (xl_seq_rec*) XLogRecGetData(record);
+
+ reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
+ if (!RelationIsValid(reln))
+ return;
+
+ buffer = XLogReadBuffer(false, reln, 0);
+ if (!BufferIsValid(buffer))
+ elog(STOP, "seq_redo: can't read block of %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page) ||
+ ((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC)
+ elog(STOP, "seq_redo: uninitialized page of %u/%u",
+ xlrec->node.tblNode, xlrec->node.relNode);
+
+ if (XLByteLE(lsn, PageGetLSN(page)))
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ lp = PageGetItemId(page, FirstOffsetNumber);
+ Assert(ItemIdIsUsed(lp));
+ tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
+
+ seq = (Form_pg_sequence) GETSTRUCT(&tuple);
+
+ seq->last_value = xlrec->value; /* last logged value */
+ seq->is_called = 't';
+ seq->log_cnt = 0;
+
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+
+ return;
+}
+
+void seq_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+}
+
+void seq_desc(char *buf, uint8 xl_info, char* rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec = (xl_seq_rec*) rec;
+
+ if (info == XLOG_SEQ_LOG)
+ strcat(buf, "log: ");
+ else if (info == XLOG_SEQ_SET)
+ strcat(buf, "set: ");
+ else
+ {
+ strcat(buf, "UNKNOWN");
+ return;
+ }
+
+ sprintf(buf + strlen(buf), "node %u/%u; value %d",
+ xlrec->node.tblNode, xlrec->node.relNode, xlrec->value);
+}
#define SEQUENCE_H
#include "nodes/parsenodes.h"
+#include "access/xlog.h"
+
+typedef struct FormData_pg_sequence
+{
+ NameData sequence_name;
+ int4 last_value;
+ int4 increment_by;
+ int4 max_value;
+ int4 min_value;
+ int4 cache_value;
+ int4 log_cnt;
+ char is_cycled;
+ char is_called;
+} FormData_pg_sequence;
+
+typedef FormData_pg_sequence *Form_pg_sequence;
/*
* Columns of a sequence relation
#define SEQ_COL_MAXVALUE 4
#define SEQ_COL_MINVALUE 5
#define SEQ_COL_CACHE 6
-#define SEQ_COL_CYCLE 7
-#define SEQ_COL_CALLED 8
+#define SEQ_COL_LOG 7
+#define SEQ_COL_CYCLE 8
+#define SEQ_COL_CALLED 9
#define SEQ_COL_FIRSTCOL SEQ_COL_NAME
#define SEQ_COL_LASTCOL SEQ_COL_CALLED
+/* XLOG stuff */
+#define XLOG_SEQ_LOG 0x00
+#define XLOG_SEQ_SET 0x10
+
+typedef struct xl_seq_rec
+{
+ RelFileNode node;
+ int4 value; /* last logged value */
+} xl_seq_rec;
+
extern Datum nextval(PG_FUNCTION_ARGS);
extern Datum currval(PG_FUNCTION_ARGS);
extern Datum setval(PG_FUNCTION_ARGS);
extern void DefineSequence(CreateSeqStmt *stmt);
extern void CloseSequences(void);
+extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr);
+extern void seq_undo(XLogRecPtr lsn, XLogRecord *rptr);
+extern void seq_desc(char *buf, uint8 xl_info, char* rec);
+
#endif /* SEQUENCE_H */