/*
- * pgq_insert_event_raw - PgQ insertion implemented in C.
+ * insert_event.c - C implementation of pgq.insert_event_raw().
  *
  * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
  *
 PG_FUNCTION_INFO_V1(pgq_insert_event_raw);
 
 /*
- * Queue info fetching
+ * Queue info fetching.
+ *
+ * Always touch ev_id sequence, even if ev_id is given as arg,
+ * to notify ticker about new event.
  */
 #define QUEUE_SQL \
        "select queue_id::int4, queue_data_pfx::text," \
-       " queue_cur_table::int4, queue_event_seq::text " \
+       " queue_cur_table::int4, nextval(queue_event_seq)::int8 " \
        " from pgq.queue where queue_name = $1"
 #define COL_QUEUE_ID   1
 #define COL_PREFIX             2
 #define COL_TBLNO              3
-#define COL_SEQNAME            4
-
-/*
- * Fetch next value from sequence.
- */
-#define SEQ_SQL                "select nextval($1)::int8"
-#define COL_SEQVAL     1
+#define COL_EVENT_ID   4
 
 /*
  * Plan cache entry in HTAB.
        int queue_id;
        int cur_table;
        char *table_prefix;
-       Datum seq_name;
+       Datum next_event_id;
 };
 
 /*
  * Cached plans.
  */
 static void *queue_plan;
-static void *seq_plan;
 static HTAB *insert_cache;
 
 /*
        queue_plan = SPI_saveplan(SPI_prepare(QUEUE_SQL, 1, types));
        if (queue_plan == NULL)
                elog(ERROR, "pgq_insert: SPI_prepare() failed");
-       seq_plan = SPI_saveplan(SPI_prepare(SEQ_SQL, 1, types));
-       if (seq_plan == NULL)
-               elog(ERROR, "pgq_insert: SPI_prepare() failed");
 
        /*
         * init insert plan cache.
 
        row = SPI_tuptable->vals[0];
        desc = SPI_tuptable->tupdesc;
-       state->seq_name = SPI_getbinval(row, desc, COL_SEQNAME, &isnull);
+       state->queue_id = DatumGetInt32(SPI_getbinval(row, desc, COL_QUEUE_ID, &isnull));
        if (isnull)
-               elog(ERROR, "Seq name NULL");
+               elog(ERROR, "queue id NULL");
        state->cur_table = DatumGetInt32(SPI_getbinval(row, desc, COL_TBLNO, &isnull));
        if (isnull)
                elog(ERROR, "table nr NULL");
-       state->queue_id = DatumGetInt32(SPI_getbinval(row, desc, COL_QUEUE_ID, &isnull));
-       if (isnull)
-               elog(ERROR, "queue id NULL");
        state->table_prefix = SPI_getvalue(row, desc, COL_PREFIX);
        if (state->table_prefix == NULL)
                elog(ERROR, "table prefix NULL");
-}
-
-/*
- * fetch next value from event_id sequence
- */
-static Datum fetch_seq(struct QueueState *state)
-{
-       Datum values[1];
-       TupleDesc   desc;
-       HeapTuple   row;
-       int res;
-       Datum id;
-       bool isnull = false;
-
-       values[0] = state->seq_name;
-       res = SPI_execute_plan(seq_plan, values, NULL, false, 0);
-       if (res != SPI_OK_SELECT)
-               elog(ERROR, "Seq fetch failed");
-       if (SPI_processed == 0)
-               elog(ERROR, "Seq fetch crashed");
-
-       row = SPI_tuptable->vals[0];
-       desc = SPI_tuptable->tupdesc;
-       state->seq_name = SPI_getbinval(row, desc, COL_SEQVAL, &isnull);
-       id = SPI_getbinval(row, desc, COL_SEQVAL, &isnull);
-
-       return id;
+       state->next_event_id = SPI_getbinval(row, desc, COL_EVENT_ID, &isnull);
+       if (isnull)
+               elog(ERROR, "Seq name NULL");
 }
 
 /*
  * 3: ev_owner    int4
  * 4: ev_retry    int4
  * 5: ev_type     text
- * 6: ev_date     text
+ * 6: ev_data     text
  * 7: ev_extra1   text
  * 8: ev_extra2   text
  * 9: ev_extra3   text
        Datum ev_id, ev_time;
        int i, res;
 
-       /*
-        * sanity check
-        */
        if (PG_NARGS() < 6)
                elog(ERROR, "pgq_insert_raw: too few args");
        if (PG_ARGISNULL(0))
                elog(ERROR, "Queue name must not be NULL");
 
-        /*
-         * Connect to the SPI manager
-         */
        if (SPI_connect() < 0)
                elog(ERROR, "logtriga: SPI_connect() failed");
        
-       /*
-        * Prepare plans
-        */
        init_cache();
 
-       /*
-        * load queue info
-        */
        load_queue_info(PG_GETARG_DATUM(0), &state);
 
-       /*
-        * load event id.
-        *
-        * seq must be increased event if id is given,
-        * to notify ticker about new event.
-        */
-       ev_id = fetch_seq(&state);
-       if (!PG_ARGISNULL(1))
+       if (PG_ARGISNULL(1))
+               ev_id = state.next_event_id;
+       else
                ev_id = PG_GETARG_DATUM(1);
-       ret_id = DatumGetInt64(ev_id);
 
-       /*
-        * load current time.
-        */
        if (PG_ARGISNULL(2))
                ev_time = DirectFunctionCall1(now, 0);
        else
                ev_time = PG_GETARG_DATUM(2);
 
        /*
-        * load insert plan.
-        */
-       ins_plan = load_insert_plan(&state);
-
-       
-       /*
-        * Perform INSERT into queue table.
+        * Prepare arguments for INSERT
         */
        values[0] = ev_id;
        nulls[0] = ' ';
                        nulls[dst] = ' ';
                }
        }
+
+       /*
+        * Perform INSERT into queue table.
+        */
+       ins_plan = load_insert_plan(&state);
        res = SPI_execute_plan(ins_plan, values, nulls, false, 0);
        if (res != SPI_OK_INSERT)
                elog(ERROR, "Queue insert failed");
 
        /*
-        * Done with SPI.
-        *
-        * Datum for ev_id wont pass further from here,
-        * thus conversion to int64.
+        * ev_id cannot pass SPI_finish()
         */
+       ret_id = DatumGetInt64(ev_id);
+
        if (SPI_finish() < 0)
                elog(ERROR, "SPI_finish failed");