- binary search in long snapshots
authorMarko Kreen <markokr@gmail.com>
Mon, 17 Sep 2007 09:09:35 +0000 (09:09 +0000)
committerMarko Kreen <markokr@gmail.com>
Mon, 17 Sep 2007 09:09:35 +0000 (09:09 +0000)
- cleanup of text buffer handling

sql/txid/txid.c

index eb0e619a6b72b0d1e7e8ee651c5732e3a0ae5f7b..8b2544768d240aea517026960afd731da1f0936e 100644 (file)
@@ -31,6 +31,12 @@ PG_MODULE_MAGIC;
 #define SET_VARSIZE(x, len) VARATT_SIZEP(x) = len
 #endif
 
+/*
+ * If defined, use bsearch() function for searching
+ * txid's inside snapshots that have more than given values.
+ */
+#define USE_BSEARCH_FOR 100
+
 
 /*
  * public functions
@@ -66,89 +72,93 @@ static void sort_snapshot(TxidSnapshot *snap)
        qsort(snap->xip, snap->nxip, sizeof(txid), _cmp_txid);
 }
 
+static StringInfo
+buf_init(txid xmin, txid xmax)
+{
+       TxidSnapshot snap;
+       StringInfo buf;
+
+       snap.xmin = xmin;
+       snap.xmax = xmax;
+       snap.nxip = 0;
+
+       buf = makeStringInfo();
+       appendBinaryStringInfo(buf, (char *)&snap, offsetof(TxidSnapshot, xip));
+       return buf;
+}
+
+static void
+buf_add_txid(StringInfo buf, txid xid)
+{
+       TxidSnapshot *snap = (TxidSnapshot *)buf->data;
+       appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid));
+       snap->nxip++;
+}
+
 static TxidSnapshot *
-parse_snapshot(const char *str)
+buf_finalize(StringInfo buf)
 {
-       int     a_size;
-       txid *xip;
+       TxidSnapshot *snap = (TxidSnapshot *)buf->data;
+       SET_VARSIZE(snap, buf->len);
 
-       int                     a_used = 0;
+       /* buf is not needed anymore */
+       buf->data = NULL;
+       pfree(buf);
+
+       return snap;
+}
+
+static TxidSnapshot *
+parse_snapshot(const char *str)
+{
        txid            xmin;
        txid            xmax;
        txid            last_val = 0, val;
-       TxidSnapshot *snap;
-       int                     size;
-
        char       *endp;
-
-       a_size = 1024;
-       xip = (txid *) palloc(sizeof(txid) * a_size);
+       StringInfo  buf;
 
        xmin = (txid) strtoull(str, &endp, 0);
        if (*endp != ':')
-               elog(ERROR, "illegal txid_snapshot input format");
+               goto bad_format;
        str = endp + 1;
 
        xmax = (txid) strtoull(str, &endp, 0);
        if (*endp != ':')
-               elog(ERROR, "illegal txid_snapshot input format");
+               goto bad_format;
        str = endp + 1;
 
        /* it should look sane */
-       if (xmin >= xmax || xmin > MAX_INT64 || xmax > MAX_INT64
-                       || xmin == 0 || xmax == 0)
-               elog(ERROR, "illegal txid_snapshot input format");
+       if (xmin >= xmax || xmin == 0 || xmax > MAX_INT64)
+               goto bad_format;
+
+       /* allocate buffer */
+       buf = buf_init(xmin, xmax);
 
+       /* loop over values */
        while (*str != '\0')
        {
-               if (a_used >= a_size)
-               {
-                       a_size *= 2;
-                       xip = (txid *) repalloc(xip, sizeof(txid) * a_size);
-               }
-
                /* read next value */
-               if (*str == '\'')
-               {
-                       str++;
-                       val = (txid) strtoull(str, &endp, 0);
-                       if (*endp != '\'')
-                               elog(ERROR, "illegal txid_snapshot input format");
-                       str = endp + 1;
-               }
-               else
-               {
-                       val = (txid) strtoull(str, &endp, 0);
-                       str = endp;
-               }
+               val = (txid) strtoull(str, &endp, 0);
+               str = endp;
 
                /* require the input to be in order */
                if (val < xmin || val <= last_val || val >= xmax)
-                       elog(ERROR, "illegal txid_snapshot input format");
+                       goto bad_format;
                
-               xip[a_used++] = val;
+               buf_add_txid(buf, val);
                last_val = val;
 
                if (*str == ',')
                        str++;
-               else
-               {
-                       if (*str != '\0')
-                               elog(ERROR, "illegal txid_snapshot input format");
-               }
+               else if (*str != '\0')
+                       goto bad_format;
        }
 
-       size = offsetof(TxidSnapshot, xip) + sizeof(txid) * a_used;
-       snap = (TxidSnapshot *) palloc(size);
-       SET_VARSIZE(snap, size);
-       snap->xmin = xmin;
-       snap->xmax = xmax;
-       snap->nxip = a_used;
-       if (a_used > 0)
-               memcpy(&(snap->xip[0]), xip, sizeof(txid) * a_used);
-       pfree(xip);
+       return buf_finalize(buf);
 
-       return snap;
+bad_format:
+       elog(ERROR, "illegal txid_snapshot input format");
+       return NULL;
 }
 
 /*
@@ -222,28 +232,51 @@ Datum
 txid_snapshot_out(PG_FUNCTION_ARGS)
 {
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
-
-       char       *str = palloc(60 + snap->nxip * 30);
-       char       *cp = str;
+       StringInfo      str;
        int                     i;
 
-       snprintf(str, 60, "%llu:%llu:",
-                       (unsigned long long)snap->xmin,
-                       (unsigned long long)snap->xmax);
-       cp = str + strlen(str);
+       str = makeStringInfo();
+
+       appendStringInfo(str, "%llu:", (unsigned long long)snap->xmin);
+       appendStringInfo(str, "%llu:", (unsigned long long)snap->xmax);
 
        for (i = 0; i < snap->nxip; i++)
        {
-               snprintf(cp, 30, "%llu%s",
-                               (unsigned long long)snap->xip[i],
-                                (i < snap->nxip - 1) ? "," : "");
-               cp += strlen(cp);
+               appendStringInfo(str, "%s%llu", ((i > 0) ? "," : ""),
+                                                (unsigned long long)snap->xip[i]);
        }
 
-       PG_RETURN_CSTRING(str);
+       PG_RETURN_CSTRING(str->data);
 }
 
 
+static int
+_txid_in_snapshot(txid value, const TxidSnapshot *snap)
+{
+       if (value < snap->xmin)
+               return true;
+       else if (value >= snap->xmax)
+               return false;
+#ifdef USE_BSEARCH_FOR
+       else if (snap->nxip >= USE_BSEARCH_FOR)
+       {
+               void *res;
+               res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), _cmp_txid);
+               return (res) ? false : true;
+       }
+#endif
+       else
+       {
+               int                     i;
+               for (i = 0; i < snap->nxip; i++)
+               {
+                       if (value == snap->xip[i])
+                               return false;
+               }
+               return true;
+       }
+}
+
 /*
  * txid_in_snapshot    - is txid visible in snapshot ?
  */
@@ -252,22 +285,10 @@ txid_in_snapshot(PG_FUNCTION_ARGS)
 {
        txid value = PG_GETARG_INT64(0);
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
-       int                     i;
-       int                     res = true;
+       int                     res;
+       
+       res = _txid_in_snapshot(value, snap) ? true : false;
 
-       if (value < snap->xmin)
-               res = true;
-       else if (value >= snap->xmax)
-               res = false;
-       else
-       {
-               for (i = 0; i < snap->nxip; i++)
-                       if (value == snap->xip[i])
-                       {
-                               res = false;
-                               break;
-                       }
-       }
        PG_FREE_IF_COPY(snap, 1);
        PG_RETURN_BOOL(res);
 }
@@ -281,22 +302,10 @@ txid_not_in_snapshot(PG_FUNCTION_ARGS)
 {
        txid            value = PG_GETARG_INT64(0);
        TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
-       int                     i;
-       int                     res = false;
+       int                     res;
+
+       res = _txid_in_snapshot(value, snap) ? false : true;
 
-       if (value < snap->xmin)
-               res = false;
-       else if (value >= snap->xmax)
-               res = true;
-       else
-       {
-               for (i = 0; i < snap->nxip; i++)
-                       if (value == snap->xip[i])
-                       {
-                               res = true;
-                               break;
-                       }
-       }
        PG_FREE_IF_COPY(snap, 1);
        PG_RETURN_BOOL(res);
 }