#define COL_DISABLED 5
#define COL_LIMIT 6
+/*
+ * Support inserting into pgq 2 queues.
+ */
+#define QUEUE_SQL_OLD \
+ "select queue_id::int4, queue_data_pfx::text," \
+ " queue_cur_table::int4, nextval(queue_event_seq)::int8," \
+ " false::bool as queue_disable_insert," \
+ " null::int4 as queue_per_tx_limit" \
+ " from pgq.queue where queue_name = $1"
+
+#define QUEUE_CHECK_NEW \
+ "select 1 from pg_catalog.pg_attribute" \
+ " where attname = 'queue_per_tx_limit'" \
+ " and attrelid = 'pgq.queue'::regclass"
+
/*
* Plan cache entry in HTAB.
*/
/*
* Cached plans.
*/
+static void *queue_check_plan;
static void *queue_plan;
static HTAB *insert_cache;
Oid types[1] = { TEXTOID };
HASHCTL ctl;
int flags;
+ int res;
int max_queues = 128;
+ const char *sql;
if (init_done)
return;
+ /*
+ * Check if old (v2.x) or new (v3.x) queue table.
+ *
+ * Needed for upgrades.
+ */
+ res = SPI_execute(QUEUE_CHECK_NEW, 1, 0);
+ if (res < 0)
+ elog(ERROR, "pgq.insert_event: QUEUE_CHECK_NEW failed");
+
+ if (SPI_processed > 0) {
+ sql = QUEUE_SQL;
+ } else {
+ sql = QUEUE_SQL_OLD;
+ }
+
/*
* Init plans.
*/
- queue_plan = SPI_saveplan(SPI_prepare(QUEUE_SQL, 1, types));
+ queue_plan = SPI_saveplan(SPI_prepare(sql, 1, types));
if (queue_plan == NULL)
elog(ERROR, "pgq_insert: SPI_prepare() failed");