sql/ticker: experimental multi-database ticker written in C
authorMarko Kreen <markokr@gmail.com>
Wed, 18 Mar 2009 16:26:02 +0000 (18:26 +0200)
committerMarko Kreen <markokr@gmail.com>
Thu, 2 Apr 2009 14:00:13 +0000 (17:00 +0300)
The new ticker is based on libevent, uses single-threaded
async  architecture.

It can handle all databases in one Postgres instance, so you
need only one ticker instance per Postgres instance.

By default it autodetects all accessible databases and
whether they have PGQ installed.

Missing features ATM:
- logging
- config files
- daemonizing
- error handling

So it's not production quality yet..

sql/ticker/Makefile [new file with mode: 0644]
sql/ticker/connection.c [new file with mode: 0644]
sql/ticker/connection.h [new file with mode: 0644]
sql/ticker/list.h [new file with mode: 0644]
sql/ticker/maint.c [new file with mode: 0644]
sql/ticker/pgqd.c [new file with mode: 0644]
sql/ticker/pgqd.h [new file with mode: 0644]
sql/ticker/retry.c [new file with mode: 0644]
sql/ticker/ticker.c [new file with mode: 0644]
sql/ticker/util.c [new file with mode: 0644]
sql/ticker/util.h [new file with mode: 0644]

diff --git a/sql/ticker/Makefile b/sql/ticker/Makefile
new file mode 100644 (file)
index 0000000..31afc2d
--- /dev/null
@@ -0,0 +1,16 @@
+
+include ../../config.mak
+
+PROGRAM = pgqd
+
+SRCS = connection.c pgqd.c util.c maint.c ticker.c retry.c
+HDRS = connection.h pgqd.h util.h list.h
+
+OBJS = $(SRCS:.c=.o)
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport) -levent
+
+include $(PGXS)
+
+$(OBJS): $(HDRS)
+
diff --git a/sql/ticker/connection.c b/sql/ticker/connection.c
new file mode 100644 (file)
index 0000000..c4f4b54
--- /dev/null
@@ -0,0 +1,257 @@
+#include "connection.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <math.h>
+#include <event.h>
+
+#include "util.h"
+
+#define W_NONE 0
+#define W_SOCK 1
+#define W_TIME 2
+
+typedef void (*libev_cb)(int sock, short flags, void *arg);
+
+struct PgSocket {
+       struct event ev;
+
+       unsigned wait_type:4; // 0 - no wait, 1 - socket, 2 - timeout
+
+       PGconn *con;
+       db_handler_f handler_func;
+       void *handler_arg;
+};
+
+static void send_event(struct PgSocket *db, enum PgEvent ev)
+{
+       db->handler_func(db, db->handler_arg, ev, NULL);
+}
+
+static void wait_event(struct PgSocket *db, short ev, libev_cb fn)
+{
+       Assert(!db->wait_type);
+
+       event_set(&db->ev, PQsocket(db->con), ev, fn, db);
+       if (event_add(&db->ev, NULL) < 0)
+               fatal_perror("event_add");
+
+       db->wait_type = W_SOCK;
+}
+
+static void timeout_cb(int sock, short flags, void *arg)
+{
+       struct PgSocket *db = arg;
+
+       db->wait_type = 0;
+
+       send_event(db, DB_TIMEOUT);
+}
+
+void db_sleep(struct PgSocket *db, double timeout)
+{
+       struct timeval tv;
+
+       Assert(!db->wait_type);
+       Assert(!db->time_wait);
+
+       tv.tv_sec = floor(timeout);
+       tv.tv_usec = (timeout - tv.tv_sec) * USEC;
+
+       evtimer_set(&db->ev, timeout_cb, db);
+       if (evtimer_add(&db->ev, &tv) < 0)
+               fatal_perror("event_add");
+
+       db->wait_type = W_TIME;
+}
+
+
+/* some error happened */
+static void conn_error(struct PgSocket *db, enum PgEvent ev, const char *desc)
+{
+       log_error("connection error: %s", desc);
+       send_event(db, ev);
+}
+
+/*
+ * Called when select() told that conn is avail for reading/writing.
+ *
+ * It should call postgres handlers and then change state if needed.
+ */
+static void result_cb(int sock, short flags, void *arg)
+{
+       struct PgSocket *db = arg;
+       PGresult *res, *res_saved = NULL;
+
+       db->wait_type = 0;
+
+       if (!PQconsumeInput(db->con)) {
+               conn_error(db, DB_RESULT_BAD, "PQconsumeInput");
+               return;
+       }
+
+       /* loop until PQgetResult returns NULL */
+       while (1) {
+               /* incomplete result? */
+               if (PQisBusy(db->con)) {
+                       wait_event(db, EV_READ, result_cb);
+                       return;
+               }
+
+               /* next result */
+               res = PQgetResult(db->con);
+               if (!res)
+                       break;
+
+               if (res_saved) {
+                       printf("multiple results?\n");
+                       PQclear(res_saved);
+               }
+               res_saved = res;
+       }
+
+       db->handler_func(db, db->handler_arg, DB_RESULT_OK, res_saved);
+}
+
+static void send_cb(int sock, short flags, void *arg)
+{
+       int res;
+       struct PgSocket *db = arg;
+
+       db->wait_type = 0;
+
+       res = PQflush(db->con);
+       if (res > 0) {
+               wait_event(db, EV_WRITE, send_cb);
+       } else if (res == 0) {
+               wait_event(db, EV_READ, result_cb);
+       } else
+               conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
+
+static void connect_cb(int sock, short flags, void *arg)
+{
+       struct PgSocket *db = arg;
+       PostgresPollingStatusType poll_res;
+
+       db->wait_type = 0;
+
+       poll_res = PQconnectPoll(db->con);
+       switch (poll_res) {
+       case PGRES_POLLING_WRITING:
+               wait_event(db, EV_WRITE, connect_cb);
+               break;
+       case PGRES_POLLING_READING:
+               wait_event(db, EV_READ, connect_cb);
+               break;
+       case PGRES_POLLING_OK:
+               //log_debug("login ok: fd=%d", PQsocket(db->con));
+               send_event(db, DB_CONNECT_OK);
+               break;
+       default:
+               conn_error(db, DB_CONNECT_FAILED, "PQconnectPoll");
+       }
+}
+
+/*
+ * Public API
+ */
+
+struct PgSocket *db_create(db_handler_f fn, void *handler_arg)
+{
+       struct PgSocket *db;
+       
+       db = calloc(1, sizeof(*db));
+       if (!db)
+               return NULL;
+
+       db->handler_func = fn;
+       db->handler_arg = handler_arg;
+
+       return db;
+}
+
+void db_connect(struct PgSocket *db, const char *connstr)
+{
+       db->con = PQconnectStart(connstr);
+       if (db->con == NULL) {
+               conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
+               return;
+       }
+
+       if (PQstatus(db->con) == CONNECTION_BAD) {
+               conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
+               return;
+       }
+
+       wait_event(db, EV_WRITE, connect_cb);
+}
+
+
+void db_disconnect(struct PgSocket *db)
+{
+       if (db->con) {
+               PQfinish(db->con);
+               db->con = NULL;
+       }
+}
+
+void db_free(struct PgSocket *db)
+{
+       if (db) {
+               if (db->con)
+                       db_disconnect(db);
+               free(db);
+       }
+}
+
+void db_send_query_simple(struct PgSocket *db, const char *q)
+{
+       int res;
+
+       log_debug("%s", q);
+       res = PQsendQuery(db->con, q);
+       if (!res) {
+               conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
+               return;
+       }
+
+       res = PQflush(db->con);
+       if (res > 0) {
+               wait_event(db, EV_WRITE, send_cb);
+       } else if (res == 0) {
+               wait_event(db, EV_READ, result_cb);
+       } else
+               conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
+void db_send_query_params(struct PgSocket *db, const char *q, int cnt, ...)
+{
+       int res, i;
+       va_list ap;
+       const char * args[10];
+
+       if (cnt > 10) cnt = 10;
+
+       va_start(ap, cnt);
+       for (i = 0; i < cnt; i++)
+               args[i] = va_arg(ap, char *);
+       va_end(ap);
+
+       res = PQsendQueryParams(db->con, q, cnt, NULL, args, NULL, NULL, 0);
+       if (!res) {
+               conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
+               return;
+       }
+
+       res = PQflush(db->con);
+       if (res > 0) {
+               wait_event(db, EV_WRITE, send_cb);
+       } else if (res == 0) {
+               wait_event(db, EV_READ, result_cb);
+       } else
+               conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
diff --git a/sql/ticker/connection.h b/sql/ticker/connection.h
new file mode 100644 (file)
index 0000000..bbdfd5a
--- /dev/null
@@ -0,0 +1,31 @@
+
+#ifndef __CONNECTION_H__
+#define __CONNECTION_H__
+
+#include <libpq-fe.h>
+
+enum PgEvent {
+       DB_CONNECT_OK,
+       DB_CONNECT_FAILED,
+       DB_RESULT_OK,
+       DB_RESULT_BAD,
+       DB_TIMEOUT,
+};
+
+struct PgSocket;
+
+typedef void (*db_handler_f)(struct PgSocket *pgs, void *arg, enum PgEvent dbev, PGresult *res);
+
+struct PgSocket *db_create(db_handler_f fn, void *arg);
+void db_free(struct PgSocket *db);
+
+void db_connect(struct PgSocket *db, const char *cstr);
+void db_disconnect(struct PgSocket *db);
+
+void db_send_query_simple(struct PgSocket *db, const char *query);
+void db_send_query_params(struct PgSocket *db, const char *query, int args, ...);
+
+void db_sleep(struct PgSocket *db, double timeout);
+
+#endif
+
diff --git a/sql/ticker/list.h b/sql/ticker/list.h
new file mode 100644 (file)
index 0000000..32e70d5
--- /dev/null
@@ -0,0 +1,277 @@
+/*
+ * PgBouncer - Lightweight connection pooler for PostgreSQL.
+ * 
+ * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
+ * 
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ * 
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/*
+ * Circular doubly linked list implementation.
+ *
+ * Basic idea from <linux/list.h>.
+ *
+ * <sys/queue.h> seemed usable, but overcomplicated.
+ */
+
+#ifndef __LIST_H_
+#define __LIST_H_
+
+/* turn on slow checking */
+/* #define LIST_DEBUG */
+
+/* give offset of a field inside struct */
+#ifndef offsetof
+#define offsetof(type, field) ((unsigned long)&(((type *)0)->field))
+#endif
+
+/* given pointer to field inside struct, return pointer to struct */
+#ifndef container_of
+#define container_of(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
+#endif
+
+/* list type */
+typedef struct List List;
+struct List {
+       List *next;
+       List *prev;
+};
+
+#define LIST(var) List var = { &var, &var }
+
+/* initialize struct */
+static inline void list_init(List *list)
+{
+       list->next = list->prev = list;
+}
+
+/* is list empty? */
+static inline bool list_empty(const List *list)
+{
+       return list->next == list;
+}
+
+/* add item to the start of the list */
+static inline List *list_prepend(List *item, List *list)
+{
+       Assert(list_empty(item));
+
+       item->next = list->next;
+       item->prev = list;
+       list->next->prev = item;
+       list->next = item;
+       return item;
+}
+
+/* add item to the end of the list */
+static inline List *list_append(List *item, List *list)
+{
+       Assert(list_empty(item));
+
+       item->next = list;
+       item->prev = list->prev;
+       list->prev->next = item;
+       list->prev = item;
+       return item;
+}
+
+/* remove item from list */
+static inline List *list_del(List *item)
+{
+       item->prev->next = item->next;
+       item->next->prev = item->prev;
+       item->next = item->prev = item;
+       return item;
+}
+
+/* remove first from list and return */
+static inline List *list_pop(List *list)
+{
+       if (list_empty(list))
+               return NULL;
+       return list_del(list->next);
+}
+
+/* remove first from list and return */
+static inline List *list_first(const List *list)
+{
+       if (list_empty(list))
+               return NULL;
+       return list->next;
+}
+
+/* put all elems in one list in the start of another list */
+static inline void list_prepend_list(List *src, List *dst)
+{
+       if (list_empty(src))
+               return;
+       src->next->prev = dst;
+       src->prev->next = dst->next;
+       dst->next->prev = src->prev;
+       dst->next = src->next;
+
+       src->next = src->prev = src;
+}
+
+/* put all elems in one list in the end of another list */
+static inline void list_append_list(List *src, List *dst)
+{
+       if (list_empty(src))
+               return;
+       src->next->prev = dst->prev;
+       src->prev->next = dst;
+       dst->prev->next = src->next;
+       dst->prev = src->prev;
+
+       src->next = src->prev = src;
+}
+
+/* remove first elem from list and return with casting */
+#define list_pop_type(list, typ, field) \
+       (list_empty(list) ? NULL \
+        : container_of(list_del((list)->next), typ, field))
+
+/* loop over list */
+#define list_for_each(item, list) \
+       for ((item) = (list)->next; \
+            (item) != (list); \
+            (item) = (item)->next)
+
+/* loop over list and allow removing item */
+#define list_for_each_safe(item, list, tmp) \
+       for ((item) = (list)->next, (tmp) = (list)->next->next; \
+            (item) != (list); \
+            (item) = (tmp), (tmp) = (tmp)->next)
+
+static inline bool item_in_list(const List *item, const List *list)
+{
+       List *tmp;
+       list_for_each(tmp, list)
+               if (tmp == item)
+                       return 1;
+       return 0;
+}
+
+
+/*
+ * wrapper for List that keeps track of number of items
+ */
+
+typedef struct StatList StatList;
+struct StatList {
+       List head;
+       int cur_count;
+#ifdef LIST_DEBUG
+       const char *name;
+#endif
+};
+
+#ifdef LIST_DEBUG
+#define STATLIST(var) StatList var = { {&var.head, &var.head}, 0, #var }
+#else
+#define STATLIST(var) StatList var = { {&var.head, &var.head}, 0 }
+#endif
+
+static inline void statlist_inc_count(StatList *list, int val)
+{
+       list->cur_count += val;
+}
+
+static inline void statlist_prepend(List *item, StatList *list)
+{
+       list_prepend(item, &list->head);
+       statlist_inc_count(list, 1);
+}
+
+static inline void statlist_append(List *item, StatList *list)
+{
+       list_append(item, &list->head);
+       statlist_inc_count(list, 1);
+}
+
+static inline void statlist_put_before(List *item, StatList *list, List *pos)
+{
+       list_append(item, pos);
+       statlist_inc_count(list, 1);
+}
+
+static inline void statlist_remove(List *item, StatList *list)
+{
+#ifdef LIST_DEBUG
+       /* sanity check */
+       if (!item_in_list(item, &list->head))
+               fatal("item in wrong list, expected: %s", list->name);
+#endif
+
+       list_del(item);
+       list->cur_count--;
+
+       Assert(list->cur_count >= 0);
+}
+
+static inline void statlist_init(StatList *list, const char *name)
+{
+       list_init(&list->head);
+       list->cur_count = 0;
+#ifdef LIST_DEBUG
+       list->name = name;
+#endif
+}
+
+static inline int statlist_count(const StatList *list)
+{
+       Assert(list->cur_count > 0 || list_empty(&list->head));
+       return list->cur_count;
+}
+
+static inline List *statlist_pop(StatList *list)
+{
+       List *item = list_pop(&list->head);
+
+       if (item)
+               list->cur_count--;
+
+       Assert(list->cur_count >= 0);
+
+       return item;
+}
+
+static inline void statlist_prepend_list(StatList *src, StatList *dst)
+{
+       list_prepend_list(&src->head, &dst->head);
+       statlist_inc_count(dst, src->cur_count);
+       src->cur_count = 0;
+}
+
+static inline void statlist_append_list(StatList *src, StatList *dst)
+{
+       list_append_list(&src->head, &dst->head);
+       statlist_inc_count(dst, src->cur_count);
+       src->cur_count = 0;
+}
+
+static inline List *statlist_first(const StatList *list)
+{
+       return list_first(&list->head);
+}
+
+static inline bool statlist_empty(const StatList *list)
+{
+       return list_empty(&list->head);
+}
+
+#define statlist_for_each(item, list) list_for_each(item, &((list)->head))
+#define statlist_for_each_safe(item, list, tmp) list_for_each_safe(item, &((list)->head), tmp)
+
+#endif /* __LIST_H_ */
+
diff --git a/sql/ticker/maint.c b/sql/ticker/maint.c
new file mode 100644 (file)
index 0000000..b59b4ff
--- /dev/null
@@ -0,0 +1,180 @@
+
+#include "pgqd.h"
+
+struct MaintItem {
+       List head;
+       const char *name;
+};
+
+static void add_maint_item(struct PgDatabase *db, const char *name)
+{
+       struct MaintItem *item = calloc(1, sizeof(*item));
+       if (!item)
+               return;
+       list_init(&item->head);
+       item->name = strdup(name);
+       if (!item->name) {
+               free(item);
+               return;
+       }
+       statlist_append(&item->head, &db->maint_item_list);
+}
+
+static const char *pop_maint_item(struct PgDatabase *db)
+{
+       struct MaintItem *item;
+       struct List *el;
+       const char *name;
+
+       el = statlist_pop(&db->maint_item_list);
+       if (!el)
+               return NULL;
+
+       item = container_of(el, struct MaintItem, head);
+       name = item->name;
+       free(item);
+       return name;
+}
+
+static void free_maint_list(struct PgDatabase *db)
+{
+       const char *name;
+       while (1) {
+               name = pop_maint_item(db);
+               if (!name)
+                       break;
+               free(name);
+       }
+}
+
+
+static void fill_items(struct PgDatabase *db, PGresult *res)
+{
+       int i;
+       for (i = 0; i < PQntuples(res); i++) {
+               const char *item = PQgetvalue(res, i, 0);
+               if (item)
+                       add_maint_item(db, item);
+       }
+}
+
+static void run_queue_list(struct PgDatabase *db)
+{
+       const char *q = "select queue_name from pgq.get_queue_info()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_maint, q);
+       db->maint_state = DB_MAINT_LOAD_QUEUES;
+}
+
+static void run_vacuum_list(struct PgDatabase *db)
+{
+       const char *q = "select * from pgq.maint_tables_to_vacuum()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_maint, q);
+       db->maint_state = DB_MAINT_VACUUM_LIST;
+}
+
+static void run_rotate_part1(struct PgDatabase *db)
+{
+       const char *q;
+       const char *qname;
+       qname = pop_maint_item(db);
+       q = "select pgq.maint_rotate_part1($1)";
+       log_debug("%s: %s [%s]", db->name, q, qname);
+       db_send_query_params(db->c_maint, q, 1, qname);
+       free(qname);
+       db->maint_state = DB_MAINT_ROT1;
+}
+
+static void run_rotate_part2(struct PgDatabase *db)
+{
+       const char *q = "select pgq.maint_rotate_part2()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_maint, q);
+       db->maint_state = DB_MAINT_ROT2;
+}
+
+static void run_vacuum(struct PgDatabase *db)
+{
+       char qbuf[256];
+       const char *table;
+       table = pop_maint_item(db);
+       snprintf(qbuf, sizeof(qbuf), "vacuum %s", table);
+       log_debug("%s: %s", db->name, qbuf);
+       db_send_query_simple(db->c_maint, qbuf);
+       free(table);
+       db->maint_state = DB_MAINT_DO_VACUUM;
+}
+
+static void close_maint(struct PgDatabase *db, int sleep_time)
+{
+       log_debug("%s: close_maint, %d", db->name, sleep_time);
+       db->maint_state = DB_CLOSED;
+       db_disconnect(db->c_maint);
+       db_sleep(db->c_maint, sleep_time);
+}
+
+static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+       struct PgDatabase *db = arg;
+
+       switch (ev) {
+       case DB_CONNECT_OK:
+               run_queue_list(db);
+               break;
+       case DB_RESULT_OK:
+               switch (db->maint_state) {
+               case DB_MAINT_LOAD_QUEUES:
+                       fill_items(db, res);
+               case DB_MAINT_ROT1:
+                       PQclear(res);
+                       if (!statlist_empty(&db->maint_item_list)) {
+                               run_rotate_part1(db);
+                       } else {
+                               run_rotate_part2(db);
+                       }
+                       break;
+               case DB_MAINT_ROT2:
+                       PQclear(res);
+                       run_vacuum_list(db);
+                       break;
+               case DB_MAINT_VACUUM_LIST:
+                       fill_items(db, res);
+               case DB_MAINT_DO_VACUUM:
+                       PQclear(res);
+                       if (!statlist_empty(&db->maint_item_list)) {
+                               run_vacuum(db);
+                       } else {
+                               close_maint(db, 2*60);
+                       }
+                       break;
+               default:
+                       printf("bad state\n");
+                       exit(1);
+               }
+               break;
+       case DB_TIMEOUT:
+               log_debug("%s: maint timeout", db->name);
+               run_queue_list(db);
+               break;
+       default:
+               printf("failure\n");
+               exit(1);
+       }
+}
+
+void launch_maint(struct PgDatabase *db)
+{
+       const char *cstr = make_connstr(db->name);
+       if (db->c_maint) {
+               log_error("%s: maint already initialized", db->name);
+               return;
+       }
+       free_maint_list(db);
+
+       log_debug("%s: launch_maint", db->name);
+       db->c_maint = db_create(maint_handler, db);
+       db_connect(db->c_maint, cstr);
+       free(cstr);
+}
+
diff --git a/sql/ticker/pgqd.c b/sql/ticker/pgqd.c
new file mode 100644 (file)
index 0000000..16943b0
--- /dev/null
@@ -0,0 +1,143 @@
+#include "pgqd.h"
+
+#include <getopt.h>
+#include <event.h>
+
+static const char *usage_str =
+"usage: pgq-ticker [switches] [db ..]\n"
+"Switches:\n"
+"  -T db     Set initial db name to connect to (default: template1)\n"
+"  -p port   port\n"
+"  -U user   Username to use\n"
+"  -h host   Host to use\n"
+"  -H        Show help\n"
+"Not implemented:\n"
+"  -v        Increase verbosity\n"
+"  -V        Show version\n"
+"  -d        Daemonize\n"
+"";
+
+struct Config cf = {
+       .db_template = "template1",
+       .verbose = 1,
+};
+
+static struct PgSocket *db_template;
+
+static STATLIST(database_list);
+
+const char *make_connstr(const char *dbname)
+{
+       char buf[512];
+
+       snprintf(buf, sizeof(buf), "dbname=%s", dbname);
+       if (cf.db_host) {
+               strlcat(buf, " host=", sizeof(buf));
+               strlcat(buf, cf.db_host, sizeof(buf));
+       }
+       if (cf.db_port) {
+               strlcat(buf, " port=", sizeof(buf));
+               strlcat(buf, cf.db_port, sizeof(buf));
+       }
+       if (cf.db_username) {
+               strlcat(buf, " user=", sizeof(buf));
+               strlcat(buf, cf.db_username, sizeof(buf));
+       }
+       return strdup(buf);
+}
+
+static void launch_db(const char *dbname)
+{
+       struct PgDatabase *db;
+
+       db = calloc(1, sizeof(*db));
+       db->name = strdup(dbname);
+       statlist_init(&db->maint_item_list, "maint_item_list");
+       list_init(&db->head);
+       statlist_append(&db->head, &database_list);
+
+       launch_ticker(db);
+}
+
+static void detect_handler(struct PgSocket *db, void *arg, enum PgEvent ev, PGresult *res)
+{
+       int i;
+       const char *s;
+
+       switch (ev) {
+       case DB_CONNECT_OK:
+               db_send_query_simple(db, "select datname from pg_database"
+                                        " where not datistemplate and datallowconn");
+               break;
+       case DB_RESULT_OK:
+               for (i = 0; i < PQntuples(res); i++) {
+                       s = PQgetvalue(res, i, 0);
+                       launch_db(s);
+               }
+               PQclear(res);
+               db_free(db_template);
+               db_template = NULL;
+               break;
+       default:
+               printf("failure\n");
+               exit(1);
+       }
+}
+
+static void detect_dbs(void)
+{
+       const char *cstr = make_connstr(cf.db_template);
+       db_template = db_create(detect_handler, NULL);
+       db_connect(db_template, cstr);
+       free(cstr);
+}
+
+static void main_loop_once(void)
+{
+       reset_time_cache();
+       event_loop(EVLOOP_ONCE);
+}
+
+int main(int argc, char *argv[])
+{
+       int c, i;
+
+       while ((c = getopt(argc, argv, "T:p:U:h:HvVd") != -1)) {
+               switch (c) {
+               case 'T':
+                       cf.db_template = optarg;
+                       break;
+               case 'p':
+                       cf.db_port = optarg;
+                       break;
+               case 'U':
+                       cf.db_username = optarg;
+                       break;
+               case 'h':
+                       cf.db_host = optarg;
+                       break;
+               case 'H':
+                       printf(usage_str);
+                       return 0;
+               default:
+                       printf(usage_str);
+                       return 1;
+               }
+       }
+
+       event_init();
+
+       if (optind == argc) {
+               printf("auto-detecting dbs");
+               detect_dbs();
+       } else {
+               for (i = optind; i < argc; i++) {
+                       launch_db(argv[i]);
+               }
+       }
+
+       while (1)
+               main_loop_once();
+
+       return 0;
+}
diff --git a/sql/ticker/pgqd.h b/sql/ticker/pgqd.h
new file mode 100644 (file)
index 0000000..0a5b85d
--- /dev/null
@@ -0,0 +1,60 @@
+
+#ifndef __PGQD_H__
+#define __PGQD_H__
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define Assert(x)
+
+#include "connection.h"
+#include "util.h"
+#include "list.h"
+
+enum DbState {
+       DB_CLOSED,
+       DB_TICKER_CHECK_PGQ,
+       DB_TICKER_CHECK_VERSION,
+       DB_TICKER_RUN,
+       DB_MAINT_LOAD_QUEUES,
+       DB_MAINT_ROT1,
+       DB_MAINT_ROT2,
+       DB_MAINT_VACUUM_LIST,
+       DB_MAINT_DO_VACUUM,
+};
+
+struct PgDatabase {
+       List head;
+       const char *name;
+       struct PgSocket *c_ticker;
+       struct PgSocket *c_maint;
+       struct PgSocket *c_retry;
+       bool has_pgq;
+       enum DbState state;
+       enum DbState maint_state;
+       StatList maint_item_list;
+};
+
+struct Config {
+       const char *logfile;
+       const char *pidfile;
+       const char *db_host;
+       const char *db_port;
+       const char *db_username;
+       const char *db_template;
+       int verbose;
+};
+
+extern struct Config cf;
+
+
+void launch_ticker(struct PgDatabase *db);
+void launch_maint(struct PgDatabase *db);
+void launch_retry(struct PgDatabase *db);
+
+const char *make_connstr(const char *dbname);
+
+#endif
+
diff --git a/sql/ticker/retry.c b/sql/ticker/retry.c
new file mode 100644 (file)
index 0000000..4f14087
--- /dev/null
@@ -0,0 +1,64 @@
+
+#include "pgqd.h"
+
+
+static void close_retry(struct PgDatabase *db, int sleep_time)
+{
+       log_debug("%s: close_retry, %d", db->name, sleep_time);
+       db_disconnect(db->c_retry);
+       db_sleep(db->c_retry, sleep_time);
+}
+
+static void run_retry(struct PgDatabase *db)
+{
+       const char *q = "select * from pgq.maint_retry_events()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_retry, q);
+}
+
+static void parse_retry(struct PgDatabase *db, PGresult *res)
+{
+       if (PQntuples(res) == 1) {
+               char *val = PQgetvalue(res, 0, 0);
+               if (strcmp(val, "0") != 0) {
+                       run_retry(db);
+               }
+       }
+       close_retry(db, 20);
+}
+
+static void retry_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+       struct PgDatabase *db = arg;
+
+       switch (ev) {
+       case DB_CONNECT_OK:
+               run_retry(db);
+               break;
+       case DB_RESULT_OK:
+               parse_retry(db, res);
+               break;
+       case DB_TIMEOUT:
+               log_debug("%s: retry timeout", db->name);
+               launch_retry(db);
+               break;
+       default:
+               printf("failure\n");
+               exit(1);
+       }
+}
+
+void launch_retry(struct PgDatabase *db)
+{
+       const char *cstr;
+       if (db->c_retry) {
+               log_error("%s: retry already initialized", db->name);
+       } else {
+               log_debug("%s: launch_retry", db->name);
+               db->c_retry = db_create(retry_handler, db);
+       }
+       cstr = make_connstr(db->name);
+       db_connect(db->c_retry, cstr);
+       free(cstr);
+}
+
diff --git a/sql/ticker/ticker.c b/sql/ticker/ticker.c
new file mode 100644 (file)
index 0000000..77eb4aa
--- /dev/null
@@ -0,0 +1,128 @@
+#include "pgqd.h"
+
+static void run_pgq_check(struct PgDatabase *db)
+{
+       const char *q = "select 1 from pg_catalog.pg_namespace where nspname='pgq'";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_ticker, q);
+       db->state = DB_TICKER_CHECK_PGQ;
+}
+
+static void run_version_check(struct PgDatabase *db)
+{
+       const char *q = "select pgq.version()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_ticker, q);
+       db->state = DB_TICKER_CHECK_VERSION;
+}
+
+static void run_ticker(struct PgDatabase *db)
+{
+       const char *q = "select pgq.ticker()";
+       log_debug("%s: %s", db->name, q);
+       db_send_query_simple(db->c_ticker, q);
+       db->state = DB_TICKER_RUN;
+}
+
+static void close_ticker(struct PgDatabase *db, int sleep_time)
+{
+       log_debug("%s: close_ticker, %d", db->name, sleep_time);
+       db->state = DB_CLOSED;
+       db_disconnect(db->c_ticker);
+       db_sleep(db->c_ticker, sleep_time);
+}
+
+static void parse_pgq_check(struct PgDatabase *db, PGresult *res)
+{
+       db->has_pgq = PQntuples(res) == 1;
+       PQclear(res);
+
+       if (!db->has_pgq) {
+               log_debug("%s: no pgq", db->name);
+               close_ticker(db, 60);
+       } else {
+               run_version_check(db);
+       }
+}
+
+static void parse_version_check(struct PgDatabase *db, PGresult *res)
+{
+       char *ver;
+       if (PQntuples(res) != 1) {
+               log_debug("%s: calling pgq.version() failed", db->name);
+               goto badpgq;
+       }
+       ver = PQgetvalue(res, 0, 0);
+       if (ver[0] < '3') {
+               log_debug("%s: bad pgq version: %s", db->name, ver);
+               goto badpgq;
+       }
+       log_debug("%s: pgq version ok: %s", db->name, ver);
+       PQclear(res);
+
+       run_ticker(db);
+       if (!db->c_maint)
+               launch_maint(db);
+       if (!db->c_retry)
+               launch_retry(db);
+       return;
+
+badpgq:
+       PQclear(res);
+       db->has_pgq = false;
+       close_ticker(db, 60);
+}
+
+static void parse_ticker_result(struct PgDatabase *db, PGresult *res)
+{
+       if (PQntuples(res) != 1) {
+               log_debug("%s: calling pgq.ticker() failed", db->name);
+       }
+       PQclear(res);
+
+       db_sleep(db->c_ticker, 2);
+}
+
+static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+       struct PgDatabase *db = arg;
+
+       switch (ev) {
+       case DB_CONNECT_OK:
+               run_pgq_check(db);
+               break;
+       case DB_RESULT_OK:
+               switch (db->state) {
+               case DB_TICKER_CHECK_PGQ:
+                       parse_pgq_check(db, res);
+                       break;
+               case DB_TICKER_CHECK_VERSION:
+                       parse_version_check(db, res);
+                       break;
+               case DB_TICKER_RUN:
+                       parse_ticker_result(db, res);
+                       break;
+               default:
+                       printf("bad state\n");
+                       exit(1);
+               }
+               break;
+       case DB_TIMEOUT:
+               log_debug("%s: tick timeout", db->name);
+               run_ticker(db);
+               break;
+       default:
+               printf("failure\n");
+               exit(1);
+       }
+}
+
+void launch_ticker(struct PgDatabase *db)
+{
+       const char *cstr = make_connstr(db->name);
+       log_debug("%s: launch_ticker", db->name);
+       db->c_ticker = db_create(tick_handler, db);
+       db_connect(db->c_ticker, cstr);
+       free(cstr);
+}
+
diff --git a/sql/ticker/util.c b/sql/ticker/util.c
new file mode 100644 (file)
index 0000000..c15d174
--- /dev/null
@@ -0,0 +1,145 @@
+
+#include "util.h"
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string.h>
+#include <errno.h>
+#include <stdarg.h>
+
+#include "pgqd.h"
+
+/*
+ * Things to test:
+ * - Conn per query
+ * - show tx
+ * - long tx
+ * - variable-size query
+ */
+
+static usec_t _time_cache;
+
+/*
+ * utility functions
+ */
+
+static usec_t get_time_usec(void)
+{
+       struct timeval tv;
+       gettimeofday(&tv, NULL);
+       return (usec_t)tv.tv_sec * USEC + tv.tv_usec;
+}
+
+usec_t get_cached_time(void)
+{
+       if (!_time_cache)
+               _time_cache = get_time_usec();
+       return _time_cache;
+}
+void reset_time_cache(void)
+{
+       _time_cache = 0;
+}
+
+void fatal_perror(const char *err)
+{
+       log_error("%s: %s", err, strerror(errno));
+       exit(1);
+}
+
+void fatal_noexit(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("FATAL: %s\n", buf);
+}
+
+void fatal(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("FATAL: %s\n", buf);
+       exit(1);
+}
+
+void log_debug(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       if (cf.verbose == 0)
+               return;
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("dbg: %s\n", buf);
+}
+
+void log_error(const char *fmt, ...)
+{
+       va_list ap;
+       char buf[1024];
+       va_start(ap, fmt);
+       vsnprintf(buf, sizeof(buf), fmt, ap);
+       va_end(ap);
+       printf("ERR: %s\n", buf);
+}
+
+
+/*
+ * Minimal spec-conforming implementations of strlcpy(), strlcat().
+ */
+
+size_t strlcpy(char *dst, const char *src, size_t n)
+{
+       size_t len = strlen(src);
+       if (len < n) {
+               memcpy(dst, src, len + 1);
+       } else if (n > 0) {
+               memcpy(dst, src, n - 1);
+               dst[n - 1] = 0;
+       }
+       return len;
+}
+
+size_t strlcat(char *dst, const char *src, size_t n)
+{
+       size_t pos = 0;
+       while (pos < n && dst[pos])
+               pos++;
+       return pos + strlcpy(dst + pos, src, n - pos);
+}
+
+/* SQL quote */
+size_t quote_literal(char *buf, int buflen, const char *src, bool std_quote)
+{
+       char *dst = buf;
+       char *end = buf + buflen - 2;
+
+       if (buflen < 3)
+               return 0;
+
+       *dst++ = '\'';
+       while (*src && dst < end) {
+               if (*src == '\'')
+                       *dst++ = '\'';
+               else if (*src == '\\' && !std_quote)
+                       *dst++ = '\\';
+               *dst++ = *src++;
+       }
+       if (*src || dst > end)
+               return 0;
+
+       *dst++ = '\'';
+       *dst = 0;
+
+       return dst - buf;
+}
+
+
diff --git a/sql/ticker/util.h b/sql/ticker/util.h
new file mode 100644 (file)
index 0000000..63230ba
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef __UTIL_H__
+#define __UTIL_H__
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+typedef uint64_t usec_t;
+#define USEC 1000000
+
+usec_t get_cached_time(void);
+void reset_time_cache(void);
+void fatal_perror(const char *err);
+
+void fatal_noexit(const char *fmt, ...);
+void fatal(const char *fmt, ...);
+void log_debug(const char *fmt, ...);
+void log_error(const char *fmt, ...);
+
+#define Assert(x)
+
+/* broken posix */
+static inline void sane_free(const void *p) { free((void*)p); }
+#define free sane_free
+
+/* braindead glibc */
+#define strlcpy my_strlcpy
+#define strlcat my_strlcat
+size_t strlcpy(char *dst, const char *src, size_t n);
+size_t strlcat(char *dst, const char *src, size_t n);
+
+size_t quote_literal(char *buf, int buflen, const char *src, bool std_quote);
+
+#endif
+