pgq.insert_event: support working with pgq 2.x schema
authorMarko Kreen <markokr@gmail.com>
Thu, 17 Nov 2011 18:03:06 +0000 (20:03 +0200)
committerMarko Kreen <markokr@gmail.com>
Thu, 17 Nov 2011 18:15:49 +0000 (20:15 +0200)
this allows to upgrade module without uprading schema.

sql/pgq/lowlevel/insert_event.c

index f63890d8112ec9beaf913e01101b13282f1a4e0c..b5e673be39d38dea2f76dff3e43098a78bae9d66 100644 (file)
@@ -63,6 +63,21 @@ PG_FUNCTION_INFO_V1(pgq_insert_event_raw);
 #define COL_DISABLED   5
 #define COL_LIMIT      6
 
+/*
+ * Support inserting into pgq 2 queues.
+ */
+#define QUEUE_SQL_OLD \
+       "select queue_id::int4, queue_data_pfx::text," \
+       " queue_cur_table::int4, nextval(queue_event_seq)::int8," \
+       " false::bool as queue_disable_insert," \
+       " null::int4 as queue_per_tx_limit" \
+       " from pgq.queue where queue_name = $1"
+
+#define QUEUE_CHECK_NEW \
+       "select 1 from pg_catalog.pg_attribute" \
+       " where attname = 'queue_per_tx_limit'" \
+       " and attrelid = 'pgq.queue'::regclass"
+
 /*
  * Plan cache entry in HTAB.
  */
@@ -91,6 +106,7 @@ struct QueueState {
 /*
  * Cached plans.
  */
+static void *queue_check_plan;
 static void *queue_plan;
 static HTAB *insert_cache;
 
@@ -103,15 +119,32 @@ static void init_cache(void)
        Oid types[1] = { TEXTOID };
        HASHCTL ctl;
        int flags;
+       int res;
        int max_queues = 128;
+       const char *sql;
 
        if (init_done)
                return;
 
+       /*
+        * Check if old (v2.x) or new (v3.x) queue table.
+        *
+        * Needed for upgrades.
+        */
+       res = SPI_execute(QUEUE_CHECK_NEW, 1, 0);
+       if (res < 0)
+               elog(ERROR, "pgq.insert_event: QUEUE_CHECK_NEW failed");
+
+       if (SPI_processed > 0) {
+               sql = QUEUE_SQL;
+       } else {
+               sql = QUEUE_SQL_OLD;
+       }
+
        /*
         * Init plans.
         */
-       queue_plan = SPI_saveplan(SPI_prepare(QUEUE_SQL, 1, types));
+       queue_plan = SPI_saveplan(SPI_prepare(sql, 1, types));
        if (queue_plan == NULL)
                elog(ERROR, "pgq_insert: SPI_prepare() failed");