sql/ticker: use usual/pgsocket for connection handling
authorMarko Kreen <markokr@gmail.com>
Sat, 5 Dec 2009 16:20:06 +0000 (18:20 +0200)
committerMarko Kreen <markokr@gmail.com>
Sat, 5 Dec 2009 16:20:06 +0000 (18:20 +0200)
the connection.c was already generalized and moved to libusual.
make the ticker use it now.

lib
sql/ticker/Makefile
sql/ticker/connection.c [deleted file]
sql/ticker/connection.h [deleted file]
sql/ticker/maint.c
sql/ticker/pgqd.c
sql/ticker/pgqd.h
sql/ticker/retry.c
sql/ticker/ticker.c

diff --git a/lib b/lib
index 916d31264cafcb016324d9639a3ad742764dc3e1..9f71a708ab258e680b1b2f001d4138834042c7f4 160000 (submodule)
--- a/lib
+++ b/lib
@@ -1 +1 @@
-Subproject commit 916d31264cafcb016324d9639a3ad742764dc3e1
+Subproject commit 9f71a708ab258e680b1b2f001d4138834042c7f4
index 57876a4d78884600ad3368b6f843c798b05d6994..9b9591bf2c040151eb9d395b8a1272d3b908e8ab 100644 (file)
@@ -1,12 +1,12 @@
 
 include ../../config.mak
 
-SRCS = connection.c pgqd.c maint.c ticker.c retry.c
-LOCAL_HDRS = connection.h pgqd.h
+SRCS = pgqd.c maint.c ticker.c retry.c
+LOCAL_HDRS = pgqd.h
 
 USUAL_DIR = ../../lib
 USUAL_OBJDIR = .
-USUAL_LOCAL_SRCS = $(SRCS) connection.h pgqd.h
+USUAL_LOCAL_SRCS = $(SRCS) $(LOCAL_HDRS)
 include $(USUAL_DIR)/Setup.mk
 
 PROGRAM = pgqd
@@ -44,6 +44,9 @@ install: all
 clean:
        rm -f $(PROGRAM) $(OBJS)
 
+tags:
+       ctags *.[ch] ../../lib/usual/*.[ch]
+
 distclean: clean
 installcheck:
 
diff --git a/sql/ticker/connection.c b/sql/ticker/connection.c
deleted file mode 100644 (file)
index 659f9a1..0000000
+++ /dev/null
@@ -1,279 +0,0 @@
-#include "connection.h"
-
-#include <sys/types.h>
-#include <stdarg.h>
-#include <math.h>
-
-#include <usual/event.h>
-#include <usual/logging.h>
-#include <usual/time.h>
-
-#define W_NONE 0
-#define W_SOCK 1
-#define W_TIME 2
-
-typedef void (*libev_cb)(int sock, short flags, void *arg);
-
-struct PgSocket {
-       struct event ev;
-
-       unsigned wait_type:4; // 0 - no wait, 1 - socket, 2 - timeout
-
-       PGconn *con;
-       db_handler_f handler_func;
-       void *handler_arg;
-
-       PGresult *last_result;
-};
-
-static void send_event(struct PgSocket *db, enum PgEvent ev)
-{
-       db->handler_func(db, db->handler_arg, ev, NULL);
-}
-
-static void wait_event(struct PgSocket *db, short ev, libev_cb fn)
-{
-       Assert(!db->wait_type);
-
-       event_set(&db->ev, PQsocket(db->con), ev, fn, db);
-       if (event_add(&db->ev, NULL) < 0)
-               fatal_perror("event_add");
-
-       db->wait_type = W_SOCK;
-}
-
-static void timeout_cb(int sock, short flags, void *arg)
-{
-       struct PgSocket *db = arg;
-
-       db->wait_type = 0;
-
-       send_event(db, DB_TIMEOUT);
-}
-
-void db_sleep(struct PgSocket *db, double timeout)
-{
-       struct timeval tv;
-
-       Assert(!db->wait_type);
-
-       tv.tv_sec = floor(timeout);
-       tv.tv_usec = (timeout - tv.tv_sec) * USEC;
-
-       evtimer_set(&db->ev, timeout_cb, db);
-       if (evtimer_add(&db->ev, &tv) < 0)
-               fatal_perror("event_add");
-
-       db->wait_type = W_TIME;
-}
-
-
-/* some error happened */
-static void conn_error(struct PgSocket *db, enum PgEvent ev, const char *desc)
-{
-       log_error("connection error: %s", desc);
-       log_error("libpq: %s", PQerrorMessage(db->con));
-       send_event(db, ev);
-}
-
-/*
- * Called when select() told that conn is avail for reading/writing.
- *
- * It should call postgres handlers and then change state if needed.
- */
-static void result_cb(int sock, short flags, void *arg)
-{
-       struct PgSocket *db = arg;
-       PGresult *res;
-
-       db->wait_type = 0;
-
-       if (!PQconsumeInput(db->con)) {
-               conn_error(db, DB_RESULT_BAD, "PQconsumeInput");
-               return;
-       }
-
-       /* loop until PQgetResult returns NULL */
-       while (1) {
-               /* incomplete result? */
-               if (PQisBusy(db->con)) {
-                       wait_event(db, EV_READ, result_cb);
-                       return;
-               }
-
-               /* next result */
-               res = PQgetResult(db->con);
-               if (!res)
-                       break;
-
-               if (db->last_result) {
-                       log_warning("multiple results?");
-                       PQclear(db->last_result);
-               }
-               db->last_result = res;
-       }
-
-       res = db->last_result;
-       db->last_result = NULL;
-
-       db->handler_func(db, db->handler_arg, DB_RESULT_OK, res);
-       PQclear(res);
-}
-
-static void send_cb(int sock, short flags, void *arg)
-{
-       int res;
-       struct PgSocket *db = arg;
-
-       db->wait_type = 0;
-
-       res = PQflush(db->con);
-       if (res > 0) {
-               wait_event(db, EV_WRITE, send_cb);
-       } else if (res == 0) {
-               wait_event(db, EV_READ, result_cb);
-       } else
-               conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-
-static void connect_cb(int sock, short flags, void *arg)
-{
-       struct PgSocket *db = arg;
-       PostgresPollingStatusType poll_res;
-
-       db->wait_type = 0;
-
-       poll_res = PQconnectPoll(db->con);
-       switch (poll_res) {
-       case PGRES_POLLING_WRITING:
-               wait_event(db, EV_WRITE, connect_cb);
-               break;
-       case PGRES_POLLING_READING:
-               wait_event(db, EV_READ, connect_cb);
-               break;
-       case PGRES_POLLING_OK:
-               //log_debug("login ok: fd=%d", PQsocket(db->con));
-               send_event(db, DB_CONNECT_OK);
-               break;
-       default:
-               conn_error(db, DB_CONNECT_FAILED, "PQconnectPoll");
-       }
-}
-
-/*
- * Public API
- */
-
-struct PgSocket *db_create(db_handler_f fn, void *handler_arg)
-{
-       struct PgSocket *db;
-       
-       db = calloc(1, sizeof(*db));
-       if (!db)
-               return NULL;
-
-       db->handler_func = fn;
-       db->handler_arg = handler_arg;
-
-       return db;
-}
-
-void db_connect(struct PgSocket *db, const char *connstr)
-{
-       db->con = PQconnectStart(connstr);
-       if (db->con == NULL) {
-               conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
-               return;
-       }
-
-       if (PQstatus(db->con) == CONNECTION_BAD) {
-               conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
-               return;
-       }
-
-       wait_event(db, EV_WRITE, connect_cb);
-}
-
-
-void db_disconnect(struct PgSocket *db)
-{
-       if (db->con) {
-               PQfinish(db->con);
-               db->con = NULL;
-               if (db->last_result)
-                       PQclear(db->last_result);
-               db->last_result = NULL;
-       }
-}
-
-void db_reconnect(struct PgSocket *db)
-{
-       db_disconnect(db);
-       db_sleep(db, 60);
-}
-
-void db_free(struct PgSocket *db)
-{
-       if (db) {
-               if (db->con)
-                       db_disconnect(db);
-               if (db->last_result)
-                       PQclear(db->last_result);
-               free(db);
-       }
-}
-
-void db_send_query_simple(struct PgSocket *db, const char *q)
-{
-       int res;
-
-       log_debug("%s", q);
-       res = PQsendQuery(db->con, q);
-       if (!res) {
-               conn_error(db, DB_RESULT_BAD, "PQsendQuery");
-               return;
-       }
-
-       res = PQflush(db->con);
-       if (res > 0) {
-               wait_event(db, EV_WRITE, send_cb);
-       } else if (res == 0) {
-               wait_event(db, EV_READ, result_cb);
-       } else
-               conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-void db_send_query_params(struct PgSocket *db, const char *q, int cnt, ...)
-{
-       int res, i;
-       va_list ap;
-       const char * args[10];
-
-       if (cnt > 10) cnt = 10;
-
-       va_start(ap, cnt);
-       for (i = 0; i < cnt; i++)
-               args[i] = va_arg(ap, char *);
-       va_end(ap);
-
-       res = PQsendQueryParams(db->con, q, cnt, NULL, args, NULL, NULL, 0);
-       if (!res) {
-               conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
-               return;
-       }
-
-       res = PQflush(db->con);
-       if (res > 0) {
-               wait_event(db, EV_WRITE, send_cb);
-       } else if (res == 0) {
-               wait_event(db, EV_READ, result_cb);
-       } else
-               conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-int db_connection_valid(struct PgSocket *db)
-{
-       return (db->con != NULL);
-}
-
diff --git a/sql/ticker/connection.h b/sql/ticker/connection.h
deleted file mode 100644 (file)
index c8d071f..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-
-#ifndef __CONNECTION_H__
-#define __CONNECTION_H__
-
-#include <libpq-fe.h>
-
-enum PgEvent {
-       DB_CONNECT_OK,
-       DB_CONNECT_FAILED,
-       DB_RESULT_OK,
-       DB_RESULT_BAD,
-       DB_TIMEOUT,
-};
-
-struct PgSocket;
-
-typedef void (*db_handler_f)(struct PgSocket *pgs, void *arg, enum PgEvent dbev, PGresult *res);
-
-struct PgSocket *db_create(db_handler_f fn, void *arg);
-void db_free(struct PgSocket *db);
-
-void db_connect(struct PgSocket *db, const char *cstr);
-void db_disconnect(struct PgSocket *db);
-void db_reconnect(struct PgSocket *db);
-
-void db_send_query_simple(struct PgSocket *db, const char *query);
-void db_send_query_params(struct PgSocket *db, const char *query, int args, ...);
-
-void db_sleep(struct PgSocket *db, double timeout);
-
-int db_connection_valid(struct PgSocket *db);
-
-#endif
-
index c402bef9f05f5f905c97de09679a6833f05fedf0..237ad200a7d3fc857bffb949eb6b6bff14ab6684 100644 (file)
@@ -23,7 +23,7 @@ static void run_queue_list(struct PgDatabase *db)
 {
        const char *q = "select queue_name from pgq.get_queue_info()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_maint, q);
+       pgs_send_query_simple(db->c_maint, q);
        db->maint_state = DB_MAINT_LOAD_QUEUES;
 }
 
@@ -31,7 +31,7 @@ static void run_vacuum_list(struct PgDatabase *db)
 {
        const char *q = "select * from pgq.maint_tables_to_vacuum()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_maint, q);
+       pgs_send_query_simple(db->c_maint, q);
        db->maint_state = DB_MAINT_VACUUM_LIST;
 }
 
@@ -42,7 +42,7 @@ static void run_rotate_part1(struct PgDatabase *db)
        qname = strlist_pop(db->maint_item_list);
        q = "select pgq.maint_rotate_part1($1)";
        log_debug("%s: %s [%s]", db->name, q, qname);
-       db_send_query_params(db->c_maint, q, 1, qname);
+       pgs_send_query_params(db->c_maint, q, 1, qname);
        free(qname);
        db->maint_state = DB_MAINT_ROT1;
 }
@@ -51,7 +51,7 @@ static void run_rotate_part2(struct PgDatabase *db)
 {
        const char *q = "select pgq.maint_rotate_part2()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_maint, q);
+       pgs_send_query_simple(db->c_maint, q);
        db->maint_state = DB_MAINT_ROT2;
 }
 
@@ -62,7 +62,7 @@ static void run_vacuum(struct PgDatabase *db)
        table = strlist_pop(db->maint_item_list);
        snprintf(qbuf, sizeof(qbuf), "vacuum %s", table);
        log_debug("%s: %s", db->name, qbuf);
-       db_send_query_simple(db->c_maint, qbuf);
+       pgs_send_query_simple(db->c_maint, qbuf);
        free(table);
        db->maint_state = DB_MAINT_DO_VACUUM;
 }
@@ -71,8 +71,8 @@ static void close_maint(struct PgDatabase *db, double sleep_time)
 {
        log_debug("%s: close_maint, %f", db->name, sleep_time);
        db->maint_state = DB_CLOSED;
-       db_disconnect(db->c_maint);
-       db_sleep(db->c_maint, sleep_time);
+       pgs_disconnect(db->c_maint);
+       pgs_sleep(db->c_maint, sleep_time);
 }
 
 static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
@@ -80,11 +80,11 @@ static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresu
        struct PgDatabase *db = arg;
 
        switch (ev) {
-       case DB_CONNECT_OK:
+       case PGS_CONNECT_OK:
                log_debug("%s: starting maintenance", db->name);
                run_queue_list(db);
                break;
-       case DB_RESULT_OK:
+       case PGS_RESULT_OK:
                switch (db->maint_state) {
                case DB_MAINT_LOAD_QUEUES:
                        if (!fill_items(db, res))
@@ -113,15 +113,15 @@ static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresu
                        fatal("bad state");
                }
                break;
-       case DB_TIMEOUT:
+       case PGS_TIMEOUT:
                log_debug("%s: maint timeout", db->name);
-               if (!db_connection_valid(db->c_maint))
+               if (!pgs_connection_valid(db->c_maint))
                        launch_maint(db);
                else
                        run_queue_list(db);
                break;
        default:
-               db_reconnect(db->c_maint);
+               pgs_reconnect(db->c_maint);
        }
        return;
 mem_err:
@@ -129,12 +129,14 @@ mem_err:
                strlist_free(db->maint_item_list);
                db->maint_item_list = NULL;
        }
-       db_disconnect(db->c_maint);
-       db_sleep(db->c_maint, 20);
+       pgs_disconnect(db->c_maint);
+       pgs_sleep(db->c_maint, 20);
 }
 
 void launch_maint(struct PgDatabase *db)
 {
+       const char *cstr;
+
        log_debug("%s: launch_maint", db->name);
 
        if (!db->c_maint) {
@@ -142,14 +144,12 @@ void launch_maint(struct PgDatabase *db)
                        strlist_free(db->maint_item_list);
                        db->maint_item_list = NULL;
                }
-               db->c_maint = db_create(maint_handler, db);
+               cstr = make_connstr(db->name);
+               db->c_maint = pgs_create(cstr, maint_handler, db);
        }
 
-       if (!db_connection_valid(db->c_maint)) {
-               const char *cstr = make_connstr(db->name);
-
-               db_connect(db->c_maint, cstr);
-               free(cstr);
+       if (!pgs_connection_valid(db->c_maint)) {
+               pgs_connect(db->c_maint);
        } else {
                /* Already have a connection, what are we doing here */
                log_error("%s: maint already initialized", db->name);
index b6a600e711d99af782b0c038f2aacfb02d2a1dde..ab1ae83cd33b8cc2e1f8268880e7a52ff8a7ea4e 100644 (file)
@@ -78,7 +78,7 @@ static void handle_sigterm(int sock, short flags, void *arg)
 static void handle_sigint(int sock, short flags, void *arg)
 {
        log_info("Got SIGINT, shutting down");
-       /* pidfile cleanup happens via atexit() */
+       /* notify main loop to exit */
        got_sigint = 1;
 }
 
@@ -129,9 +129,9 @@ static void signal_setup(void)
 
 const char *make_connstr(const char *dbname)
 {
-       char buf[512];
+       static char buf[512];
        snprintf(buf, sizeof(buf), "%s dbname=%s ", cf.base_connstr, dbname);
-       return strdup(buf);
+       return buf;
 }
 
 static void launch_db(const char *dbname)
@@ -161,9 +161,9 @@ static void launch_db(const char *dbname)
 static void drop_db(struct PgDatabase *db)
 {
        statlist_remove(&database_list, &db->head);
-       db_free(db->c_ticker);
-       db_free(db->c_maint);
-       db_free(db->c_retry);
+       pgs_free(db->c_ticker);
+       pgs_free(db->c_maint);
+       pgs_free(db->c_retry);
        strlist_free(db->maint_item_list);
        free(db->name);
        free(db);
@@ -175,34 +175,34 @@ static void detect_handler(struct PgSocket *db, void *arg, enum PgEvent ev, PGre
        const char *s;
 
        switch (ev) {
-       case DB_CONNECT_OK:
-               db_send_query_simple(db, "select datname from pg_database"
+       case PGS_CONNECT_OK:
+               pgs_send_query_simple(db, "select datname from pg_database"
                                         " where not datistemplate and datallowconn");
                break;
-       case DB_RESULT_OK:
+       case PGS_RESULT_OK:
                for (i = 0; i < PQntuples(res); i++) {
                        s = PQgetvalue(res, i, 0);
                        launch_db(s);
                }
-               db_disconnect(db);
-               db_sleep(db, cf.check_period);
+               pgs_disconnect(db);
+               pgs_sleep(db, cf.check_period);
                break;
-       case DB_TIMEOUT:
+       case PGS_TIMEOUT:
                detect_dbs();
                break;
        default:
-               db_disconnect(db);
-               db_sleep(db, cf.check_period);
+               pgs_disconnect(db);
+               pgs_sleep(db, cf.check_period);
        }
 }
 
 static void detect_dbs(void)
 {
-       const char *cstr = make_connstr(cf.initial_database);
-       if (!db_template)
-               db_template = db_create(detect_handler, NULL);
-       db_connect(db_template, cstr);
-       free(cstr);
+       if (!db_template) {
+               const char *cstr = make_connstr(cf.initial_database);
+               db_template = pgs_create(cstr, detect_handler, NULL);
+       }
+       pgs_connect(db_template);
 }
 
 static bool launch_db_cb(void *arg, const char *db)
@@ -230,7 +230,7 @@ static void recheck_dbs(void)
                                drop_db(db);
                }
                if (db_template) {
-                       db_free(db_template);
+                       pgs_free(db_template);
                        db_template = NULL;
                }
        } else if (!db_template) {
@@ -248,7 +248,7 @@ static void cleanup(void)
                db = container_of(elem, struct PgDatabase, head);
                drop_db(db);
        }
-       db_free(db_template);
+       pgs_free(db_template);
 
        event_base_free(NULL);
 }
index 3e5a43fa9efa88fca25ce4dfe747b36485de8f7c..b92ed59c58c965028950bd22810660e9c35dae12 100644 (file)
@@ -9,8 +9,7 @@
 #include <usual/list.h>
 #include <usual/statlist.h>
 #include <usual/logging.h>
-
-#include "connection.h"
+#include <usual/pgsocket.h>
 
 enum DbState {
        DB_CLOSED,
index 49cb5232f4565f5e38d936d6878329b238d33e4e..f107cc475dbed73487925963ff15d2a15612b63d 100644 (file)
@@ -5,15 +5,15 @@
 static void close_retry(struct PgDatabase *db, double sleep_time)
 {
        log_debug("%s: close_retry, %f", db->name, sleep_time);
-       db_disconnect(db->c_retry);
-       db_sleep(db->c_retry, sleep_time);
+       pgs_disconnect(db->c_retry);
+       pgs_sleep(db->c_retry, sleep_time);
 }
 
 static void run_retry(struct PgDatabase *db)
 {
        const char *q = "select * from pgq.maint_retry_events()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_retry, q);
+       pgs_send_query_simple(db->c_retry, q);
 }
 
 static void parse_retry(struct PgDatabase *db, PGresult *res)
@@ -33,19 +33,19 @@ static void retry_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresu
        struct PgDatabase *db = arg;
 
        switch (ev) {
-       case DB_CONNECT_OK:
+       case PGS_CONNECT_OK:
                log_debug("%s: starting retry event processing", db->name);
                run_retry(db);
                break;
-       case DB_RESULT_OK:
+       case PGS_RESULT_OK:
                parse_retry(db, res);
                break;
-       case DB_TIMEOUT:
+       case PGS_TIMEOUT:
                log_debug("%s: retry timeout", db->name);
                launch_retry(db);
                break;
        default:
-               db_reconnect(db->c_retry);
+               pgs_reconnect(db->c_retry);
        }
 }
 
@@ -56,10 +56,9 @@ void launch_retry(struct PgDatabase *db)
                log_debug("%s: retry already initialized", db->name);
        } else {
                log_debug("%s: launch_retry", db->name);
-               db->c_retry = db_create(retry_handler, db);
+               cstr = make_connstr(db->name);
+               db->c_retry = pgs_create(cstr, retry_handler, db);
        }
-       cstr = make_connstr(db->name);
-       db_connect(db->c_retry, cstr);
-       free(cstr);
+       pgs_connect(db->c_retry);
 }
 
index 2094bba3223e6cfed2a5ce3ec4e89c9517fd026a..77566647b27c15a6290cc58f718ddc8abe65c8ff 100644 (file)
@@ -4,7 +4,7 @@ static void run_pgq_check(struct PgDatabase *db)
 {
        const char *q = "select 1 from pg_catalog.pg_namespace where nspname='pgq'";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_ticker, q);
+       pgs_send_query_simple(db->c_ticker, q);
        db->state = DB_TICKER_CHECK_PGQ;
 }
 
@@ -12,7 +12,7 @@ static void run_version_check(struct PgDatabase *db)
 {
        const char *q = "select pgq.version()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_ticker, q);
+       pgs_send_query_simple(db->c_ticker, q);
        db->state = DB_TICKER_CHECK_VERSION;
 }
 
@@ -20,7 +20,7 @@ static void run_ticker(struct PgDatabase *db)
 {
        const char *q = "select pgq.ticker()";
        log_debug("%s: %s", db->name, q);
-       db_send_query_simple(db->c_ticker, q);
+       pgs_send_query_simple(db->c_ticker, q);
        db->state = DB_TICKER_RUN;
 }
 
@@ -28,8 +28,8 @@ static void close_ticker(struct PgDatabase *db, double sleep_time)
 {
        log_debug("%s: close_ticker, %f", db->name, sleep_time);
        db->state = DB_CLOSED;
-       db_disconnect(db->c_ticker);
-       db_sleep(db->c_ticker, sleep_time);
+       pgs_disconnect(db->c_ticker);
+       pgs_sleep(db->c_ticker, sleep_time);
 }
 
 static void parse_pgq_check(struct PgDatabase *db, PGresult *res)
@@ -77,7 +77,7 @@ static void parse_ticker_result(struct PgDatabase *db, PGresult *res)
                log_debug("%s: calling pgq.ticker() failed", db->name);
        }
 
-       db_sleep(db->c_ticker, cf.ticker_period);
+       pgs_sleep(db->c_ticker, cf.ticker_period);
 }
 
 static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
@@ -85,10 +85,10 @@ static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresul
        struct PgDatabase *db = arg;
 
        switch (ev) {
-       case DB_CONNECT_OK:
+       case PGS_CONNECT_OK:
                run_pgq_check(db);
                break;
-       case DB_RESULT_OK:
+       case PGS_RESULT_OK:
                switch (db->state) {
                case DB_TICKER_CHECK_PGQ:
                        parse_pgq_check(db, res);
@@ -103,25 +103,25 @@ static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresul
                        fatal("bad state");
                }
                break;
-       case DB_TIMEOUT:
+       case PGS_TIMEOUT:
                log_debug("%s: tick timeout", db->name);
-               if (!db_connection_valid(db->c_ticker))
+               if (!pgs_connection_valid(db->c_ticker))
                        launch_ticker(db);
                else
                        run_ticker(db);
                break;
        default:
-               db_reconnect(db->c_ticker);
+               pgs_reconnect(db->c_ticker);
        }
 }
 
 void launch_ticker(struct PgDatabase *db)
 {
-       const char *cstr = make_connstr(db->name);
        log_debug("%s: launch_ticker", db->name);
-       if (!db->c_ticker)
-               db->c_ticker = db_create(tick_handler, db);
-       db_connect(db->c_ticker, cstr);
-       free(cstr);
+       if (!db->c_ticker) {
+               const char *cstr = make_connstr(db->name);
+               db->c_ticker = pgs_create(cstr, tick_handler, db);
+       }
+       pgs_connect(db->c_ticker);
 }