--- /dev/null
+
+include ../../config.mak
+
+PROGRAM = pgqd
+
+SRCS = connection.c pgqd.c util.c maint.c ticker.c retry.c
+HDRS = connection.h pgqd.h util.h list.h
+
+OBJS = $(SRCS:.c=.o)
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport) -levent
+
+include $(PGXS)
+
+$(OBJS): $(HDRS)
+
--- /dev/null
+#include "connection.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <math.h>
+#include <event.h>
+
+#include "util.h"
+
+#define W_NONE 0
+#define W_SOCK 1
+#define W_TIME 2
+
+typedef void (*libev_cb)(int sock, short flags, void *arg);
+
+struct PgSocket {
+ struct event ev;
+
+ unsigned wait_type:4; // 0 - no wait, 1 - socket, 2 - timeout
+
+ PGconn *con;
+ db_handler_f handler_func;
+ void *handler_arg;
+};
+
+static void send_event(struct PgSocket *db, enum PgEvent ev)
+{
+ db->handler_func(db, db->handler_arg, ev, NULL);
+}
+
+static void wait_event(struct PgSocket *db, short ev, libev_cb fn)
+{
+ Assert(!db->wait_type);
+
+ event_set(&db->ev, PQsocket(db->con), ev, fn, db);
+ if (event_add(&db->ev, NULL) < 0)
+ fatal_perror("event_add");
+
+ db->wait_type = W_SOCK;
+}
+
+static void timeout_cb(int sock, short flags, void *arg)
+{
+ struct PgSocket *db = arg;
+
+ db->wait_type = 0;
+
+ send_event(db, DB_TIMEOUT);
+}
+
+void db_sleep(struct PgSocket *db, double timeout)
+{
+ struct timeval tv;
+
+ Assert(!db->wait_type);
+ Assert(!db->time_wait);
+
+ tv.tv_sec = floor(timeout);
+ tv.tv_usec = (timeout - tv.tv_sec) * USEC;
+
+ evtimer_set(&db->ev, timeout_cb, db);
+ if (evtimer_add(&db->ev, &tv) < 0)
+ fatal_perror("event_add");
+
+ db->wait_type = W_TIME;
+}
+
+
+/* some error happened */
+static void conn_error(struct PgSocket *db, enum PgEvent ev, const char *desc)
+{
+ log_error("connection error: %s", desc);
+ send_event(db, ev);
+}
+
+/*
+ * Called when select() told that conn is avail for reading/writing.
+ *
+ * It should call postgres handlers and then change state if needed.
+ */
+static void result_cb(int sock, short flags, void *arg)
+{
+ struct PgSocket *db = arg;
+ PGresult *res, *res_saved = NULL;
+
+ db->wait_type = 0;
+
+ if (!PQconsumeInput(db->con)) {
+ conn_error(db, DB_RESULT_BAD, "PQconsumeInput");
+ return;
+ }
+
+ /* loop until PQgetResult returns NULL */
+ while (1) {
+ /* incomplete result? */
+ if (PQisBusy(db->con)) {
+ wait_event(db, EV_READ, result_cb);
+ return;
+ }
+
+ /* next result */
+ res = PQgetResult(db->con);
+ if (!res)
+ break;
+
+ if (res_saved) {
+ printf("multiple results?\n");
+ PQclear(res_saved);
+ }
+ res_saved = res;
+ }
+
+ db->handler_func(db, db->handler_arg, DB_RESULT_OK, res_saved);
+}
+
+static void send_cb(int sock, short flags, void *arg)
+{
+ int res;
+ struct PgSocket *db = arg;
+
+ db->wait_type = 0;
+
+ res = PQflush(db->con);
+ if (res > 0) {
+ wait_event(db, EV_WRITE, send_cb);
+ } else if (res == 0) {
+ wait_event(db, EV_READ, result_cb);
+ } else
+ conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
+
+static void connect_cb(int sock, short flags, void *arg)
+{
+ struct PgSocket *db = arg;
+ PostgresPollingStatusType poll_res;
+
+ db->wait_type = 0;
+
+ poll_res = PQconnectPoll(db->con);
+ switch (poll_res) {
+ case PGRES_POLLING_WRITING:
+ wait_event(db, EV_WRITE, connect_cb);
+ break;
+ case PGRES_POLLING_READING:
+ wait_event(db, EV_READ, connect_cb);
+ break;
+ case PGRES_POLLING_OK:
+ //log_debug("login ok: fd=%d", PQsocket(db->con));
+ send_event(db, DB_CONNECT_OK);
+ break;
+ default:
+ conn_error(db, DB_CONNECT_FAILED, "PQconnectPoll");
+ }
+}
+
+/*
+ * Public API
+ */
+
+struct PgSocket *db_create(db_handler_f fn, void *handler_arg)
+{
+ struct PgSocket *db;
+
+ db = calloc(1, sizeof(*db));
+ if (!db)
+ return NULL;
+
+ db->handler_func = fn;
+ db->handler_arg = handler_arg;
+
+ return db;
+}
+
+void db_connect(struct PgSocket *db, const char *connstr)
+{
+ db->con = PQconnectStart(connstr);
+ if (db->con == NULL) {
+ conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
+ return;
+ }
+
+ if (PQstatus(db->con) == CONNECTION_BAD) {
+ conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
+ return;
+ }
+
+ wait_event(db, EV_WRITE, connect_cb);
+}
+
+
+void db_disconnect(struct PgSocket *db)
+{
+ if (db->con) {
+ PQfinish(db->con);
+ db->con = NULL;
+ }
+}
+
+void db_free(struct PgSocket *db)
+{
+ if (db) {
+ if (db->con)
+ db_disconnect(db);
+ free(db);
+ }
+}
+
+void db_send_query_simple(struct PgSocket *db, const char *q)
+{
+ int res;
+
+ log_debug("%s", q);
+ res = PQsendQuery(db->con, q);
+ if (!res) {
+ conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
+ return;
+ }
+
+ res = PQflush(db->con);
+ if (res > 0) {
+ wait_event(db, EV_WRITE, send_cb);
+ } else if (res == 0) {
+ wait_event(db, EV_READ, result_cb);
+ } else
+ conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
+void db_send_query_params(struct PgSocket *db, const char *q, int cnt, ...)
+{
+ int res, i;
+ va_list ap;
+ const char * args[10];
+
+ if (cnt > 10) cnt = 10;
+
+ va_start(ap, cnt);
+ for (i = 0; i < cnt; i++)
+ args[i] = va_arg(ap, char *);
+ va_end(ap);
+
+ res = PQsendQueryParams(db->con, q, cnt, NULL, args, NULL, NULL, 0);
+ if (!res) {
+ conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
+ return;
+ }
+
+ res = PQflush(db->con);
+ if (res > 0) {
+ wait_event(db, EV_WRITE, send_cb);
+ } else if (res == 0) {
+ wait_event(db, EV_READ, result_cb);
+ } else
+ conn_error(db, DB_RESULT_BAD, "PQflush");
+}
+
--- /dev/null
+
+#ifndef __CONNECTION_H__
+#define __CONNECTION_H__
+
+#include <libpq-fe.h>
+
+enum PgEvent {
+ DB_CONNECT_OK,
+ DB_CONNECT_FAILED,
+ DB_RESULT_OK,
+ DB_RESULT_BAD,
+ DB_TIMEOUT,
+};
+
+struct PgSocket;
+
+typedef void (*db_handler_f)(struct PgSocket *pgs, void *arg, enum PgEvent dbev, PGresult *res);
+
+struct PgSocket *db_create(db_handler_f fn, void *arg);
+void db_free(struct PgSocket *db);
+
+void db_connect(struct PgSocket *db, const char *cstr);
+void db_disconnect(struct PgSocket *db);
+
+void db_send_query_simple(struct PgSocket *db, const char *query);
+void db_send_query_params(struct PgSocket *db, const char *query, int args, ...);
+
+void db_sleep(struct PgSocket *db, double timeout);
+
+#endif
+
--- /dev/null
+/*
+ * PgBouncer - Lightweight connection pooler for PostgreSQL.
+ *
+ * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/*
+ * Circular doubly linked list implementation.
+ *
+ * Basic idea from <linux/list.h>.
+ *
+ * <sys/queue.h> seemed usable, but overcomplicated.
+ */
+
+#ifndef __LIST_H_
+#define __LIST_H_
+
+/* turn on slow checking */
+/* #define LIST_DEBUG */
+
+/* give offset of a field inside struct */
+#ifndef offsetof
+#define offsetof(type, field) ((unsigned long)&(((type *)0)->field))
+#endif
+
+/* given pointer to field inside struct, return pointer to struct */
+#ifndef container_of
+#define container_of(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
+#endif
+
+/* list type */
+typedef struct List List;
+struct List {
+ List *next;
+ List *prev;
+};
+
+#define LIST(var) List var = { &var, &var }
+
+/* initialize struct */
+static inline void list_init(List *list)
+{
+ list->next = list->prev = list;
+}
+
+/* is list empty? */
+static inline bool list_empty(const List *list)
+{
+ return list->next == list;
+}
+
+/* add item to the start of the list */
+static inline List *list_prepend(List *item, List *list)
+{
+ Assert(list_empty(item));
+
+ item->next = list->next;
+ item->prev = list;
+ list->next->prev = item;
+ list->next = item;
+ return item;
+}
+
+/* add item to the end of the list */
+static inline List *list_append(List *item, List *list)
+{
+ Assert(list_empty(item));
+
+ item->next = list;
+ item->prev = list->prev;
+ list->prev->next = item;
+ list->prev = item;
+ return item;
+}
+
+/* remove item from list */
+static inline List *list_del(List *item)
+{
+ item->prev->next = item->next;
+ item->next->prev = item->prev;
+ item->next = item->prev = item;
+ return item;
+}
+
+/* remove first from list and return */
+static inline List *list_pop(List *list)
+{
+ if (list_empty(list))
+ return NULL;
+ return list_del(list->next);
+}
+
+/* remove first from list and return */
+static inline List *list_first(const List *list)
+{
+ if (list_empty(list))
+ return NULL;
+ return list->next;
+}
+
+/* put all elems in one list in the start of another list */
+static inline void list_prepend_list(List *src, List *dst)
+{
+ if (list_empty(src))
+ return;
+ src->next->prev = dst;
+ src->prev->next = dst->next;
+ dst->next->prev = src->prev;
+ dst->next = src->next;
+
+ src->next = src->prev = src;
+}
+
+/* put all elems in one list in the end of another list */
+static inline void list_append_list(List *src, List *dst)
+{
+ if (list_empty(src))
+ return;
+ src->next->prev = dst->prev;
+ src->prev->next = dst;
+ dst->prev->next = src->next;
+ dst->prev = src->prev;
+
+ src->next = src->prev = src;
+}
+
+/* remove first elem from list and return with casting */
+#define list_pop_type(list, typ, field) \
+ (list_empty(list) ? NULL \
+ : container_of(list_del((list)->next), typ, field))
+
+/* loop over list */
+#define list_for_each(item, list) \
+ for ((item) = (list)->next; \
+ (item) != (list); \
+ (item) = (item)->next)
+
+/* loop over list and allow removing item */
+#define list_for_each_safe(item, list, tmp) \
+ for ((item) = (list)->next, (tmp) = (list)->next->next; \
+ (item) != (list); \
+ (item) = (tmp), (tmp) = (tmp)->next)
+
+static inline bool item_in_list(const List *item, const List *list)
+{
+ List *tmp;
+ list_for_each(tmp, list)
+ if (tmp == item)
+ return 1;
+ return 0;
+}
+
+
+/*
+ * wrapper for List that keeps track of number of items
+ */
+
+typedef struct StatList StatList;
+struct StatList {
+ List head;
+ int cur_count;
+#ifdef LIST_DEBUG
+ const char *name;
+#endif
+};
+
+#ifdef LIST_DEBUG
+#define STATLIST(var) StatList var = { {&var.head, &var.head}, 0, #var }
+#else
+#define STATLIST(var) StatList var = { {&var.head, &var.head}, 0 }
+#endif
+
+static inline void statlist_inc_count(StatList *list, int val)
+{
+ list->cur_count += val;
+}
+
+static inline void statlist_prepend(List *item, StatList *list)
+{
+ list_prepend(item, &list->head);
+ statlist_inc_count(list, 1);
+}
+
+static inline void statlist_append(List *item, StatList *list)
+{
+ list_append(item, &list->head);
+ statlist_inc_count(list, 1);
+}
+
+static inline void statlist_put_before(List *item, StatList *list, List *pos)
+{
+ list_append(item, pos);
+ statlist_inc_count(list, 1);
+}
+
+static inline void statlist_remove(List *item, StatList *list)
+{
+#ifdef LIST_DEBUG
+ /* sanity check */
+ if (!item_in_list(item, &list->head))
+ fatal("item in wrong list, expected: %s", list->name);
+#endif
+
+ list_del(item);
+ list->cur_count--;
+
+ Assert(list->cur_count >= 0);
+}
+
+static inline void statlist_init(StatList *list, const char *name)
+{
+ list_init(&list->head);
+ list->cur_count = 0;
+#ifdef LIST_DEBUG
+ list->name = name;
+#endif
+}
+
+static inline int statlist_count(const StatList *list)
+{
+ Assert(list->cur_count > 0 || list_empty(&list->head));
+ return list->cur_count;
+}
+
+static inline List *statlist_pop(StatList *list)
+{
+ List *item = list_pop(&list->head);
+
+ if (item)
+ list->cur_count--;
+
+ Assert(list->cur_count >= 0);
+
+ return item;
+}
+
+static inline void statlist_prepend_list(StatList *src, StatList *dst)
+{
+ list_prepend_list(&src->head, &dst->head);
+ statlist_inc_count(dst, src->cur_count);
+ src->cur_count = 0;
+}
+
+static inline void statlist_append_list(StatList *src, StatList *dst)
+{
+ list_append_list(&src->head, &dst->head);
+ statlist_inc_count(dst, src->cur_count);
+ src->cur_count = 0;
+}
+
+static inline List *statlist_first(const StatList *list)
+{
+ return list_first(&list->head);
+}
+
+static inline bool statlist_empty(const StatList *list)
+{
+ return list_empty(&list->head);
+}
+
+#define statlist_for_each(item, list) list_for_each(item, &((list)->head))
+#define statlist_for_each_safe(item, list, tmp) list_for_each_safe(item, &((list)->head), tmp)
+
+#endif /* __LIST_H_ */
+
--- /dev/null
+
+#include "pgqd.h"
+
+struct MaintItem {
+ List head;
+ const char *name;
+};
+
+static void add_maint_item(struct PgDatabase *db, const char *name)
+{
+ struct MaintItem *item = calloc(1, sizeof(*item));
+ if (!item)
+ return;
+ list_init(&item->head);
+ item->name = strdup(name);
+ if (!item->name) {
+ free(item);
+ return;
+ }
+ statlist_append(&item->head, &db->maint_item_list);
+}
+
+static const char *pop_maint_item(struct PgDatabase *db)
+{
+ struct MaintItem *item;
+ struct List *el;
+ const char *name;
+
+ el = statlist_pop(&db->maint_item_list);
+ if (!el)
+ return NULL;
+
+ item = container_of(el, struct MaintItem, head);
+ name = item->name;
+ free(item);
+ return name;
+}
+
+static void free_maint_list(struct PgDatabase *db)
+{
+ const char *name;
+ while (1) {
+ name = pop_maint_item(db);
+ if (!name)
+ break;
+ free(name);
+ }
+}
+
+
+static void fill_items(struct PgDatabase *db, PGresult *res)
+{
+ int i;
+ for (i = 0; i < PQntuples(res); i++) {
+ const char *item = PQgetvalue(res, i, 0);
+ if (item)
+ add_maint_item(db, item);
+ }
+}
+
+static void run_queue_list(struct PgDatabase *db)
+{
+ const char *q = "select queue_name from pgq.get_queue_info()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_maint, q);
+ db->maint_state = DB_MAINT_LOAD_QUEUES;
+}
+
+static void run_vacuum_list(struct PgDatabase *db)
+{
+ const char *q = "select * from pgq.maint_tables_to_vacuum()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_maint, q);
+ db->maint_state = DB_MAINT_VACUUM_LIST;
+}
+
+static void run_rotate_part1(struct PgDatabase *db)
+{
+ const char *q;
+ const char *qname;
+ qname = pop_maint_item(db);
+ q = "select pgq.maint_rotate_part1($1)";
+ log_debug("%s: %s [%s]", db->name, q, qname);
+ db_send_query_params(db->c_maint, q, 1, qname);
+ free(qname);
+ db->maint_state = DB_MAINT_ROT1;
+}
+
+static void run_rotate_part2(struct PgDatabase *db)
+{
+ const char *q = "select pgq.maint_rotate_part2()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_maint, q);
+ db->maint_state = DB_MAINT_ROT2;
+}
+
+static void run_vacuum(struct PgDatabase *db)
+{
+ char qbuf[256];
+ const char *table;
+ table = pop_maint_item(db);
+ snprintf(qbuf, sizeof(qbuf), "vacuum %s", table);
+ log_debug("%s: %s", db->name, qbuf);
+ db_send_query_simple(db->c_maint, qbuf);
+ free(table);
+ db->maint_state = DB_MAINT_DO_VACUUM;
+}
+
+static void close_maint(struct PgDatabase *db, int sleep_time)
+{
+ log_debug("%s: close_maint, %d", db->name, sleep_time);
+ db->maint_state = DB_CLOSED;
+ db_disconnect(db->c_maint);
+ db_sleep(db->c_maint, sleep_time);
+}
+
+static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+ struct PgDatabase *db = arg;
+
+ switch (ev) {
+ case DB_CONNECT_OK:
+ run_queue_list(db);
+ break;
+ case DB_RESULT_OK:
+ switch (db->maint_state) {
+ case DB_MAINT_LOAD_QUEUES:
+ fill_items(db, res);
+ case DB_MAINT_ROT1:
+ PQclear(res);
+ if (!statlist_empty(&db->maint_item_list)) {
+ run_rotate_part1(db);
+ } else {
+ run_rotate_part2(db);
+ }
+ break;
+ case DB_MAINT_ROT2:
+ PQclear(res);
+ run_vacuum_list(db);
+ break;
+ case DB_MAINT_VACUUM_LIST:
+ fill_items(db, res);
+ case DB_MAINT_DO_VACUUM:
+ PQclear(res);
+ if (!statlist_empty(&db->maint_item_list)) {
+ run_vacuum(db);
+ } else {
+ close_maint(db, 2*60);
+ }
+ break;
+ default:
+ printf("bad state\n");
+ exit(1);
+ }
+ break;
+ case DB_TIMEOUT:
+ log_debug("%s: maint timeout", db->name);
+ run_queue_list(db);
+ break;
+ default:
+ printf("failure\n");
+ exit(1);
+ }
+}
+
+void launch_maint(struct PgDatabase *db)
+{
+ const char *cstr = make_connstr(db->name);
+ if (db->c_maint) {
+ log_error("%s: maint already initialized", db->name);
+ return;
+ }
+ free_maint_list(db);
+
+ log_debug("%s: launch_maint", db->name);
+ db->c_maint = db_create(maint_handler, db);
+ db_connect(db->c_maint, cstr);
+ free(cstr);
+}
+
--- /dev/null
+#include "pgqd.h"
+
+#include <getopt.h>
+#include <event.h>
+
+static const char *usage_str =
+"usage: pgq-ticker [switches] [db ..]\n"
+"Switches:\n"
+" -T db Set initial db name to connect to (default: template1)\n"
+" -p port port\n"
+" -U user Username to use\n"
+" -h host Host to use\n"
+" -H Show help\n"
+"Not implemented:\n"
+" -v Increase verbosity\n"
+" -V Show version\n"
+" -d Daemonize\n"
+"";
+
+struct Config cf = {
+ .db_template = "template1",
+ .verbose = 1,
+};
+
+static struct PgSocket *db_template;
+
+static STATLIST(database_list);
+
+const char *make_connstr(const char *dbname)
+{
+ char buf[512];
+
+ snprintf(buf, sizeof(buf), "dbname=%s", dbname);
+ if (cf.db_host) {
+ strlcat(buf, " host=", sizeof(buf));
+ strlcat(buf, cf.db_host, sizeof(buf));
+ }
+ if (cf.db_port) {
+ strlcat(buf, " port=", sizeof(buf));
+ strlcat(buf, cf.db_port, sizeof(buf));
+ }
+ if (cf.db_username) {
+ strlcat(buf, " user=", sizeof(buf));
+ strlcat(buf, cf.db_username, sizeof(buf));
+ }
+ return strdup(buf);
+}
+
+static void launch_db(const char *dbname)
+{
+ struct PgDatabase *db;
+
+ db = calloc(1, sizeof(*db));
+ db->name = strdup(dbname);
+ statlist_init(&db->maint_item_list, "maint_item_list");
+ list_init(&db->head);
+ statlist_append(&db->head, &database_list);
+
+ launch_ticker(db);
+}
+
+static void detect_handler(struct PgSocket *db, void *arg, enum PgEvent ev, PGresult *res)
+{
+ int i;
+ const char *s;
+
+ switch (ev) {
+ case DB_CONNECT_OK:
+ db_send_query_simple(db, "select datname from pg_database"
+ " where not datistemplate and datallowconn");
+ break;
+ case DB_RESULT_OK:
+ for (i = 0; i < PQntuples(res); i++) {
+ s = PQgetvalue(res, i, 0);
+ launch_db(s);
+ }
+ PQclear(res);
+ db_free(db_template);
+ db_template = NULL;
+ break;
+ default:
+ printf("failure\n");
+ exit(1);
+ }
+}
+
+static void detect_dbs(void)
+{
+ const char *cstr = make_connstr(cf.db_template);
+ db_template = db_create(detect_handler, NULL);
+ db_connect(db_template, cstr);
+ free(cstr);
+}
+
+static void main_loop_once(void)
+{
+ reset_time_cache();
+ event_loop(EVLOOP_ONCE);
+}
+
+int main(int argc, char *argv[])
+{
+ int c, i;
+
+ while ((c = getopt(argc, argv, "T:p:U:h:HvVd") != -1)) {
+ switch (c) {
+ case 'T':
+ cf.db_template = optarg;
+ break;
+ case 'p':
+ cf.db_port = optarg;
+ break;
+ case 'U':
+ cf.db_username = optarg;
+ break;
+ case 'h':
+ cf.db_host = optarg;
+ break;
+ case 'H':
+ printf(usage_str);
+ return 0;
+ default:
+ printf(usage_str);
+ return 1;
+ }
+ }
+
+ event_init();
+
+ if (optind == argc) {
+ printf("auto-detecting dbs");
+ detect_dbs();
+ } else {
+ for (i = optind; i < argc; i++) {
+ launch_db(argv[i]);
+ }
+ }
+
+ while (1)
+ main_loop_once();
+
+ return 0;
+}
--- /dev/null
+
+#ifndef __PGQD_H__
+#define __PGQD_H__
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define Assert(x)
+
+#include "connection.h"
+#include "util.h"
+#include "list.h"
+
+enum DbState {
+ DB_CLOSED,
+ DB_TICKER_CHECK_PGQ,
+ DB_TICKER_CHECK_VERSION,
+ DB_TICKER_RUN,
+ DB_MAINT_LOAD_QUEUES,
+ DB_MAINT_ROT1,
+ DB_MAINT_ROT2,
+ DB_MAINT_VACUUM_LIST,
+ DB_MAINT_DO_VACUUM,
+};
+
+struct PgDatabase {
+ List head;
+ const char *name;
+ struct PgSocket *c_ticker;
+ struct PgSocket *c_maint;
+ struct PgSocket *c_retry;
+ bool has_pgq;
+ enum DbState state;
+ enum DbState maint_state;
+ StatList maint_item_list;
+};
+
+struct Config {
+ const char *logfile;
+ const char *pidfile;
+ const char *db_host;
+ const char *db_port;
+ const char *db_username;
+ const char *db_template;
+ int verbose;
+};
+
+extern struct Config cf;
+
+
+void launch_ticker(struct PgDatabase *db);
+void launch_maint(struct PgDatabase *db);
+void launch_retry(struct PgDatabase *db);
+
+const char *make_connstr(const char *dbname);
+
+#endif
+
--- /dev/null
+
+#include "pgqd.h"
+
+
+static void close_retry(struct PgDatabase *db, int sleep_time)
+{
+ log_debug("%s: close_retry, %d", db->name, sleep_time);
+ db_disconnect(db->c_retry);
+ db_sleep(db->c_retry, sleep_time);
+}
+
+static void run_retry(struct PgDatabase *db)
+{
+ const char *q = "select * from pgq.maint_retry_events()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_retry, q);
+}
+
+static void parse_retry(struct PgDatabase *db, PGresult *res)
+{
+ if (PQntuples(res) == 1) {
+ char *val = PQgetvalue(res, 0, 0);
+ if (strcmp(val, "0") != 0) {
+ run_retry(db);
+ }
+ }
+ close_retry(db, 20);
+}
+
+static void retry_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+ struct PgDatabase *db = arg;
+
+ switch (ev) {
+ case DB_CONNECT_OK:
+ run_retry(db);
+ break;
+ case DB_RESULT_OK:
+ parse_retry(db, res);
+ break;
+ case DB_TIMEOUT:
+ log_debug("%s: retry timeout", db->name);
+ launch_retry(db);
+ break;
+ default:
+ printf("failure\n");
+ exit(1);
+ }
+}
+
+void launch_retry(struct PgDatabase *db)
+{
+ const char *cstr;
+ if (db->c_retry) {
+ log_error("%s: retry already initialized", db->name);
+ } else {
+ log_debug("%s: launch_retry", db->name);
+ db->c_retry = db_create(retry_handler, db);
+ }
+ cstr = make_connstr(db->name);
+ db_connect(db->c_retry, cstr);
+ free(cstr);
+}
+
--- /dev/null
+#include "pgqd.h"
+
+static void run_pgq_check(struct PgDatabase *db)
+{
+ const char *q = "select 1 from pg_catalog.pg_namespace where nspname='pgq'";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_ticker, q);
+ db->state = DB_TICKER_CHECK_PGQ;
+}
+
+static void run_version_check(struct PgDatabase *db)
+{
+ const char *q = "select pgq.version()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_ticker, q);
+ db->state = DB_TICKER_CHECK_VERSION;
+}
+
+static void run_ticker(struct PgDatabase *db)
+{
+ const char *q = "select pgq.ticker()";
+ log_debug("%s: %s", db->name, q);
+ db_send_query_simple(db->c_ticker, q);
+ db->state = DB_TICKER_RUN;
+}
+
+static void close_ticker(struct PgDatabase *db, int sleep_time)
+{
+ log_debug("%s: close_ticker, %d", db->name, sleep_time);
+ db->state = DB_CLOSED;
+ db_disconnect(db->c_ticker);
+ db_sleep(db->c_ticker, sleep_time);
+}
+
+static void parse_pgq_check(struct PgDatabase *db, PGresult *res)
+{
+ db->has_pgq = PQntuples(res) == 1;
+ PQclear(res);
+
+ if (!db->has_pgq) {
+ log_debug("%s: no pgq", db->name);
+ close_ticker(db, 60);
+ } else {
+ run_version_check(db);
+ }
+}
+
+static void parse_version_check(struct PgDatabase *db, PGresult *res)
+{
+ char *ver;
+ if (PQntuples(res) != 1) {
+ log_debug("%s: calling pgq.version() failed", db->name);
+ goto badpgq;
+ }
+ ver = PQgetvalue(res, 0, 0);
+ if (ver[0] < '3') {
+ log_debug("%s: bad pgq version: %s", db->name, ver);
+ goto badpgq;
+ }
+ log_debug("%s: pgq version ok: %s", db->name, ver);
+ PQclear(res);
+
+ run_ticker(db);
+ if (!db->c_maint)
+ launch_maint(db);
+ if (!db->c_retry)
+ launch_retry(db);
+ return;
+
+badpgq:
+ PQclear(res);
+ db->has_pgq = false;
+ close_ticker(db, 60);
+}
+
+static void parse_ticker_result(struct PgDatabase *db, PGresult *res)
+{
+ if (PQntuples(res) != 1) {
+ log_debug("%s: calling pgq.ticker() failed", db->name);
+ }
+ PQclear(res);
+
+ db_sleep(db->c_ticker, 2);
+}
+
+static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
+{
+ struct PgDatabase *db = arg;
+
+ switch (ev) {
+ case DB_CONNECT_OK:
+ run_pgq_check(db);
+ break;
+ case DB_RESULT_OK:
+ switch (db->state) {
+ case DB_TICKER_CHECK_PGQ:
+ parse_pgq_check(db, res);
+ break;
+ case DB_TICKER_CHECK_VERSION:
+ parse_version_check(db, res);
+ break;
+ case DB_TICKER_RUN:
+ parse_ticker_result(db, res);
+ break;
+ default:
+ printf("bad state\n");
+ exit(1);
+ }
+ break;
+ case DB_TIMEOUT:
+ log_debug("%s: tick timeout", db->name);
+ run_ticker(db);
+ break;
+ default:
+ printf("failure\n");
+ exit(1);
+ }
+}
+
+void launch_ticker(struct PgDatabase *db)
+{
+ const char *cstr = make_connstr(db->name);
+ log_debug("%s: launch_ticker", db->name);
+ db->c_ticker = db_create(tick_handler, db);
+ db_connect(db->c_ticker, cstr);
+ free(cstr);
+}
+
--- /dev/null
+
+#include "util.h"
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string.h>
+#include <errno.h>
+#include <stdarg.h>
+
+#include "pgqd.h"
+
+/*
+ * Things to test:
+ * - Conn per query
+ * - show tx
+ * - long tx
+ * - variable-size query
+ */
+
+static usec_t _time_cache;
+
+/*
+ * utility functions
+ */
+
+static usec_t get_time_usec(void)
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return (usec_t)tv.tv_sec * USEC + tv.tv_usec;
+}
+
+usec_t get_cached_time(void)
+{
+ if (!_time_cache)
+ _time_cache = get_time_usec();
+ return _time_cache;
+}
+void reset_time_cache(void)
+{
+ _time_cache = 0;
+}
+
+void fatal_perror(const char *err)
+{
+ log_error("%s: %s", err, strerror(errno));
+ exit(1);
+}
+
+void fatal_noexit(const char *fmt, ...)
+{
+ va_list ap;
+ char buf[1024];
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ printf("FATAL: %s\n", buf);
+}
+
+void fatal(const char *fmt, ...)
+{
+ va_list ap;
+ char buf[1024];
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ printf("FATAL: %s\n", buf);
+ exit(1);
+}
+
+void log_debug(const char *fmt, ...)
+{
+ va_list ap;
+ char buf[1024];
+ if (cf.verbose == 0)
+ return;
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ printf("dbg: %s\n", buf);
+}
+
+void log_error(const char *fmt, ...)
+{
+ va_list ap;
+ char buf[1024];
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+ printf("ERR: %s\n", buf);
+}
+
+
+/*
+ * Minimal spec-conforming implementations of strlcpy(), strlcat().
+ */
+
+size_t strlcpy(char *dst, const char *src, size_t n)
+{
+ size_t len = strlen(src);
+ if (len < n) {
+ memcpy(dst, src, len + 1);
+ } else if (n > 0) {
+ memcpy(dst, src, n - 1);
+ dst[n - 1] = 0;
+ }
+ return len;
+}
+
+size_t strlcat(char *dst, const char *src, size_t n)
+{
+ size_t pos = 0;
+ while (pos < n && dst[pos])
+ pos++;
+ return pos + strlcpy(dst + pos, src, n - pos);
+}
+
+/* SQL quote */
+size_t quote_literal(char *buf, int buflen, const char *src, bool std_quote)
+{
+ char *dst = buf;
+ char *end = buf + buflen - 2;
+
+ if (buflen < 3)
+ return 0;
+
+ *dst++ = '\'';
+ while (*src && dst < end) {
+ if (*src == '\'')
+ *dst++ = '\'';
+ else if (*src == '\\' && !std_quote)
+ *dst++ = '\\';
+ *dst++ = *src++;
+ }
+ if (*src || dst > end)
+ return 0;
+
+ *dst++ = '\'';
+ *dst = 0;
+
+ return dst - buf;
+}
+
+
--- /dev/null
+#ifndef __UTIL_H__
+#define __UTIL_H__
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+typedef uint64_t usec_t;
+#define USEC 1000000
+
+usec_t get_cached_time(void);
+void reset_time_cache(void);
+void fatal_perror(const char *err);
+
+void fatal_noexit(const char *fmt, ...);
+void fatal(const char *fmt, ...);
+void log_debug(const char *fmt, ...);
+void log_error(const char *fmt, ...);
+
+#define Assert(x)
+
+/* broken posix */
+static inline void sane_free(const void *p) { free((void*)p); }
+#define free sane_free
+
+/* braindead glibc */
+#define strlcpy my_strlcpy
+#define strlcat my_strlcat
+size_t strlcpy(char *dst, const char *src, size_t n);
+size_t strlcat(char *dst, const char *src, size_t n);
+
+size_t quote_literal(char *buf, int buflen, const char *src, bool std_quote);
+
+#endif
+