pgq: rewrite insert_event() in C
authorMarko Kreen <markokr@gmail.com>
Tue, 17 Jul 2007 13:54:54 +0000 (13:54 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 17 Jul 2007 13:54:54 +0000 (13:54 +0000)
sql/pgq/Makefile
sql/pgq/expected/pgq_core.out
sql/pgq/pgq_insert.c [new file with mode: 0644]
sql/pgq/pgq_insert.sql.in [new file with mode: 0644]
sql/pgq/sql/pgq_core.sql
sql/pgq/sql/pgq_init.sql
sql/pgq/structure/func_internal.sql

index 2ea1084d6a95ca8bea73419dc741c57e888f7c74..e99a90ce9508a3571f28579e2ba320be83d9419e 100644 (file)
@@ -1,11 +1,14 @@
 
+MODULES = pgq_insert
+
 DOCS = README.pgq
-DATA_built = pgq.sql pgq.upgrade.sql
+DATA_built = pgq.sql pgq.upgrade.sql pgq_insert.sql
 DATA = structure/uninstall_pgq.sql
 
 SRCS = $(wildcard structure/*.sql) \
        $(wildcard functions/*.sql) \
-       $(wildcard triggers/*.sql)
+       $(wildcard triggers/*.sql) \
+       pgq_insert.sql
 
 REGRESS = pgq_init pgq_core logutriga sqltriga
 REGRESS_OPTS = --load-language=plpythonu --load-language=plpgsql
@@ -37,12 +40,12 @@ cleandox:
 
 clean: cleandox
 
-test:
-       #-dropdb pgq
-       #createdb pgq
-       #psql -f structure/pgq.sql pgq
+test: pgq.sql
        $(MAKE) installcheck || { less regression.diffs; exit 1; }
 
+ack:
+       cp results/*.out expected/
+
 upload: dox
        rsync -az --delete docs/html/* data1:public_html/pgq-new/
 
index 733e97cbca99145227cff6e7bf7e1adee3fdf219..21baed32736c8e33aab22b95e73a80bdfaa8decb 100644 (file)
@@ -153,7 +153,7 @@ select pgq.insert_event('myqueue', 'r1', 'data');
             1
 (1 row)
 
-select pgq.insert_event('myqueue', 'r2', 'data');
+select pgq.insert_event('myqueue', 'r2', 'data', 'extra1', 'extra2', 'extra3', 'extra4');
  insert_event 
 --------------
             2
@@ -187,7 +187,7 @@ select ev_id,ev_retry,ev_type,ev_data,ev_extra1,ev_extra2,ev_extra3,ev_extra4 fr
  ev_id | ev_retry | ev_type | ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4 
 -------+----------+---------+---------+-----------+-----------+-----------+-----------
      1 |          | r1      | data    |           |           |           | 
-     2 |          | r2      | data    |           |           |           | 
+     2 |          | r2      | data    | extra1    | extra2    | extra3    | extra4
      3 |          | r3      | data    |           |           |           | 
 (3 rows)
 
diff --git a/sql/pgq/pgq_insert.c b/sql/pgq/pgq_insert.c
new file mode 100644 (file)
index 0000000..4d9e5b2
--- /dev/null
@@ -0,0 +1,337 @@
+/*
+ * pgq_insert_event_raw - PgQ insertion implemented in C.
+ *
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include "postgres.h"
+#include "funcapi.h"
+
+#include "catalog/pg_type.h"
+#include "executor/spi.h"
+#include "utils/builtins.h"
+#include "utils/hsearch.h"
+
+/*
+ * Module tag
+ */
+#ifdef PG_MODULE_MAGIC
+PG_MODULE_MAGIC;
+#endif
+
+/*
+ * Function tag
+ */
+Datum pgq_insert_event_raw(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(pgq_insert_event_raw);
+
+/*
+ * Queue info fetching
+ */
+#define QUEUE_SQL \
+       "select queue_id::int4, queue_data_pfx::text," \
+       " queue_cur_table::int4, queue_event_seq::text " \
+       " from pgq.queue where queue_name = $1"
+#define COL_QUEUE_ID   1
+#define COL_PREFIX             2
+#define COL_TBLNO              3
+#define COL_SEQNAME            4
+
+/*
+ * Fetch next value from sequence.
+ */
+#define SEQ_SQL                "select nextval($1)::int8"
+#define COL_SEQVAL     1
+
+/*
+ * Plan cache entry in HTAB.
+ */
+struct InsertCacheEntry {
+       Oid queue_id;   /* actually int32, but we want to use oid_hash */
+       int cur_table;
+       void *plan;
+};
+
+/*
+ * helper structure to pass values.
+ */
+struct QueueState {
+       int queue_id;
+       int cur_table;
+       char *table_prefix;
+       Datum seq_name;
+};
+
+/*
+ * Cached plans.
+ */
+static void *queue_plan;
+static void *seq_plan;
+static HTAB *insert_cache;
+
+/*
+ * Prepare utility plans and plan cache.
+ */
+static void
+init_cache(void)
+{
+       static int init_done = 0;
+       Oid types[1] = { TEXTOID };
+       HASHCTL     ctl;
+       int         flags;
+       int         max_queues = 128;
+
+       if (init_done)
+               return;
+
+       /*
+        * Init plans.
+        */
+       queue_plan = SPI_saveplan(SPI_prepare(QUEUE_SQL, 1, types));
+       if (queue_plan == NULL)
+               elog(ERROR, "pgq_insert: SPI_prepare() failed");
+       seq_plan = SPI_saveplan(SPI_prepare(SEQ_SQL, 1, types));
+       if (seq_plan == NULL)
+               elog(ERROR, "pgq_insert: SPI_prepare() failed");
+
+       /*
+        * init insert plan cache.
+        */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(Oid);
+       ctl.entrysize = sizeof(struct InsertCacheEntry);
+       ctl.hash = oid_hash;
+       flags = HASH_ELEM | HASH_FUNCTION;
+       insert_cache = hash_create("pgq_insert_raw plans cache", max_queues, &ctl, flags);
+
+       init_done = 1;
+}
+
+/*
+ * Create new plan for insertion into current queue table.
+ */
+static void *make_plan(struct QueueState *state)
+{
+       void *plan;
+       StringInfo sql;
+       static Oid types[10] = {
+               INT8OID, TIMESTAMPTZOID, INT4OID, INT4OID, TEXTOID,
+               TEXTOID, TEXTOID, TEXTOID, TEXTOID, TEXTOID
+       };
+
+       /*
+        * create sql
+        */
+       sql = makeStringInfo();
+       appendStringInfo(sql, "insert into %s_%d (ev_id, ev_time, ev_owner, ev_retry,"
+                                        " ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4)"
+                                        " values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
+                                        state->table_prefix, state->cur_table);
+       /*
+        * create plan
+        */
+       plan = SPI_prepare(sql->data, 10, types);
+       return SPI_saveplan(plan);
+}
+
+/*
+ * fetch insert plan from cache.
+ */
+static void *load_insert_plan(struct QueueState *state)
+{
+        struct InsertCacheEntry  *entry;
+        Oid queue_id = state->queue_id;
+        bool did_exist = false;
+
+        entry = hash_search(insert_cache, &queue_id, HASH_ENTER, &did_exist);
+        if (did_exist)
+        {
+                if (state->cur_table == entry->cur_table)
+                        return entry->plan;
+                SPI_freeplan(entry->plan);
+        }
+        entry->cur_table = state->cur_table;
+        entry->plan = make_plan(state);
+        return entry->plan;
+}
+
+/*
+ * load queue info from pgq.queue table.
+ */
+static void load_queue_info(Datum queue_name, struct QueueState *state)
+{
+       Datum values[1];
+       int res;
+       TupleDesc   desc;
+       HeapTuple   row;
+       bool isnull;
+
+       values[0] = queue_name;
+       res = SPI_execute_plan(queue_plan, values, NULL, false, 0);
+       if (res != SPI_OK_SELECT)
+               elog(ERROR, "Queue fetch failed");
+       if (SPI_processed == 0)
+               elog(ERROR, "No such queue");
+
+       row = SPI_tuptable->vals[0];
+       desc = SPI_tuptable->tupdesc;
+       state->seq_name = SPI_getbinval(row, desc, COL_SEQNAME, &isnull);
+       if (isnull)
+               elog(ERROR, "Seq name NULL");
+       state->cur_table = DatumGetInt32(SPI_getbinval(row, desc, COL_TBLNO, &isnull));
+       if (isnull)
+               elog(ERROR, "table nr NULL");
+       state->queue_id = DatumGetInt32(SPI_getbinval(row, desc, COL_QUEUE_ID, &isnull));
+       if (isnull)
+               elog(ERROR, "queue id NULL");
+       state->table_prefix = SPI_getvalue(row, desc, COL_PREFIX);
+       if (state->table_prefix == NULL)
+               elog(ERROR, "table prefix NULL");
+}
+
+/*
+ * fetch next value from event_id sequence
+ */
+static Datum fetch_seq(struct QueueState *state)
+{
+       Datum values[1];
+       TupleDesc   desc;
+       HeapTuple   row;
+       int res;
+       Datum id;
+       bool isnull = false;
+
+       values[0] = state->seq_name;
+       res = SPI_execute_plan(seq_plan, values, NULL, false, 0);
+       if (res != SPI_OK_SELECT)
+               elog(ERROR, "Seq fetch failed");
+       if (SPI_processed == 0)
+               elog(ERROR, "Seq fetch crashed");
+
+       row = SPI_tuptable->vals[0];
+       desc = SPI_tuptable->tupdesc;
+       state->seq_name = SPI_getbinval(row, desc, COL_SEQVAL, &isnull);
+       id = SPI_getbinval(row, desc, COL_SEQVAL, &isnull);
+
+       return id;
+}
+
+/*
+ * Arguments:
+ * 0: queue_name  text          NOT NULL
+ * 1: ev_id       int8                 if NULL take from SEQ
+ * 2: ev_time     timestamptz   if NULL use now()
+ * 3: ev_owner    int4
+ * 4: ev_retry    int4
+ * 5: ev_type     text
+ * 6: ev_date     text
+ * 7: ev_extra1   text
+ * 8: ev_extra2   text
+ * 9: ev_extra3   text
+ * 10:ev_extra4   text
+ */
+Datum
+pgq_insert_event_raw(PG_FUNCTION_ARGS)
+{
+       Datum values[11];
+       char nulls[11];
+       struct QueueState state;
+       int64 ret_id;
+       void *ins_plan;
+       Datum ev_id, ev_time;
+       int i, res;
+
+       /*
+        * sanity check
+        */
+       if (PG_NARGS() < 6)
+               elog(ERROR, "pgq_insert_raw: too few args");
+       if (PG_ARGISNULL(0))
+               elog(ERROR, "Queue name must not be NULL");
+
+        /*
+         * Connect to the SPI manager
+         */
+       if (SPI_connect() < 0)
+               elog(ERROR, "logtriga: SPI_connect() failed");
+       
+       /*
+        * Prepare plans
+        */
+       init_cache();
+
+       /*
+        * load queue info
+        */
+       load_queue_info(PG_GETARG_DATUM(0), &state);
+
+       /*
+        * load event id.
+        *
+        * seq must be increased event if id is given,
+        * to notify ticker about new event.
+        */
+       ev_id = fetch_seq(&state);
+       if (!PG_ARGISNULL(1))
+               ev_id = PG_GETARG_DATUM(1);
+       ret_id = DatumGetInt64(ev_id);
+
+       /*
+        * load current time.
+        */
+       if (PG_ARGISNULL(2))
+               ev_time = DirectFunctionCall1(now, 0);
+       else
+               ev_time = PG_GETARG_DATUM(2);
+
+       /*
+        * load insert plan.
+        */
+       ins_plan = load_insert_plan(&state);
+
+       
+       /*
+        * Perform INSERT into queue table.
+        */
+       values[0] = ev_id;
+       nulls[0] = ' ';
+       values[1] = ev_time;
+       nulls[1] = ' ';
+       for (i = 3; i < 11; i++) {
+               int dst = i - 1;
+               if (i >= PG_NARGS() || PG_ARGISNULL(i)) {
+                       values[dst] = (Datum)NULL;
+                       nulls[dst] = 'n';
+               } else {
+                       values[dst] = PG_GETARG_DATUM(i);
+                       nulls[dst] = ' ';
+               }
+       }
+       res = SPI_execute_plan(ins_plan, values, nulls, false, 0);
+       if (res != SPI_OK_INSERT)
+               elog(ERROR, "Queue insert failed");
+
+       /*
+        * Done with SPI.
+        *
+        * Datum for ev_id wont pass further from here,
+        * thus conversion to int64.
+        */
+       if (SPI_finish() < 0)
+               elog(ERROR, "SPI_finish failed");
+
+       PG_RETURN_INT64(ret_id);
+}
+
diff --git a/sql/pgq/pgq_insert.sql.in b/sql/pgq/pgq_insert.sql.in
new file mode 100644 (file)
index 0000000..257e2f1
--- /dev/null
@@ -0,0 +1,29 @@
+
+-- ----------------------------------------------------------------------
+-- Function: pgq.insert_event_raw(11)
+--
+--      Actual event insertion.  Used also by retry queue maintenance.
+--
+-- Parameters:
+--      queue_name      - Name of the queue
+--      ev_id           - Event ID.  If NULL, will be taken from seq.
+--      ev_time         - Event creation time.
+--             ev_owner                - Subscription ID when retry event. If NULL, the event is for everybody.
+--             ev_retry                - Retry count. NULL for first-time events.
+--      ev_type         - user data
+--      ev_data         - user data
+--      ev_extra1       - user data
+--      ev_extra2       - user data
+--      ev_extra3       - user data
+--      ev_extra4       - user data
+--
+-- Returns:
+--      Event ID.
+-- ----------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION pgq.insert_event_raw(
+    queue_name text, ev_id bigint, ev_time timestamptz,
+    ev_owner integer, ev_retry integer, ev_type text, ev_data text,
+    ev_extra1 text, ev_extra2 text, ev_extra3 text, ev_extra4 text
+)
+RETURNS int8 AS 'MODULE_PATHNAME', 'pgq_insert_event_raw' LANGUAGE C;
+
index 82515d60df449c1094d2a275acb7f7eb141609c3..ca8de0cf43be49090c4b049977e185e89577da81 100644 (file)
@@ -37,7 +37,7 @@ select * from pgq.get_batch_events(2);
 select pgq.finish_batch(2);
 
 select pgq.insert_event('myqueue', 'r1', 'data');
-select pgq.insert_event('myqueue', 'r2', 'data');
+select pgq.insert_event('myqueue', 'r2', 'data', 'extra1', 'extra2', 'extra3', 'extra4');
 select pgq.insert_event('myqueue', 'r3', 'data');
 select pgq.current_event_table('myqueue');
 select pgq.ticker();
index 565659eabc77298b3de65bf3e7e4fed901b6cc31..38dd79183b33bb324585f4fa7595d2eab7ca4189 100644 (file)
@@ -1,7 +1,7 @@
 
 \set ECHO none
 \i ../txid/txid.sql
-\i structure/install.sql
+\i pgq.sql
 
 \set ECHO all
 
index 87ff4e6942353cb0c7d01418bb038797b4b617de..eb98fb356703bdbdb039b4a2c8c2cf1a1f1df8f1 100644 (file)
@@ -5,7 +5,9 @@
 \i functions/pgq.batch_event_sql.sql
 \i functions/pgq.batch_event_tables.sql
 \i functions/pgq.event_retry_raw.sql
-\i functions/pgq.insert_event_raw.sql
+
+-- \i functions/pgq.insert_event_raw.sql
+\i pgq_insert.sql
 
 -- Group: Ticker