the connection.c was already generalized and moved to libusual.
make the ticker use it now.
-Subproject commit 916d31264cafcb016324d9639a3ad742764dc3e1
+Subproject commit 9f71a708ab258e680b1b2f001d4138834042c7f4
include ../../config.mak
-SRCS = connection.c pgqd.c maint.c ticker.c retry.c
-LOCAL_HDRS = connection.h pgqd.h
+SRCS = pgqd.c maint.c ticker.c retry.c
+LOCAL_HDRS = pgqd.h
USUAL_DIR = ../../lib
USUAL_OBJDIR = .
-USUAL_LOCAL_SRCS = $(SRCS) connection.h pgqd.h
+USUAL_LOCAL_SRCS = $(SRCS) $(LOCAL_HDRS)
include $(USUAL_DIR)/Setup.mk
PROGRAM = pgqd
clean:
rm -f $(PROGRAM) $(OBJS)
+tags:
+ ctags *.[ch] ../../lib/usual/*.[ch]
+
distclean: clean
installcheck:
+++ /dev/null
-#include "connection.h"
-
-#include <sys/types.h>
-#include <stdarg.h>
-#include <math.h>
-
-#include <usual/event.h>
-#include <usual/logging.h>
-#include <usual/time.h>
-
-#define W_NONE 0
-#define W_SOCK 1
-#define W_TIME 2
-
-typedef void (*libev_cb)(int sock, short flags, void *arg);
-
-struct PgSocket {
- struct event ev;
-
- unsigned wait_type:4; // 0 - no wait, 1 - socket, 2 - timeout
-
- PGconn *con;
- db_handler_f handler_func;
- void *handler_arg;
-
- PGresult *last_result;
-};
-
-static void send_event(struct PgSocket *db, enum PgEvent ev)
-{
- db->handler_func(db, db->handler_arg, ev, NULL);
-}
-
-static void wait_event(struct PgSocket *db, short ev, libev_cb fn)
-{
- Assert(!db->wait_type);
-
- event_set(&db->ev, PQsocket(db->con), ev, fn, db);
- if (event_add(&db->ev, NULL) < 0)
- fatal_perror("event_add");
-
- db->wait_type = W_SOCK;
-}
-
-static void timeout_cb(int sock, short flags, void *arg)
-{
- struct PgSocket *db = arg;
-
- db->wait_type = 0;
-
- send_event(db, DB_TIMEOUT);
-}
-
-void db_sleep(struct PgSocket *db, double timeout)
-{
- struct timeval tv;
-
- Assert(!db->wait_type);
-
- tv.tv_sec = floor(timeout);
- tv.tv_usec = (timeout - tv.tv_sec) * USEC;
-
- evtimer_set(&db->ev, timeout_cb, db);
- if (evtimer_add(&db->ev, &tv) < 0)
- fatal_perror("event_add");
-
- db->wait_type = W_TIME;
-}
-
-
-/* some error happened */
-static void conn_error(struct PgSocket *db, enum PgEvent ev, const char *desc)
-{
- log_error("connection error: %s", desc);
- log_error("libpq: %s", PQerrorMessage(db->con));
- send_event(db, ev);
-}
-
-/*
- * Called when select() told that conn is avail for reading/writing.
- *
- * It should call postgres handlers and then change state if needed.
- */
-static void result_cb(int sock, short flags, void *arg)
-{
- struct PgSocket *db = arg;
- PGresult *res;
-
- db->wait_type = 0;
-
- if (!PQconsumeInput(db->con)) {
- conn_error(db, DB_RESULT_BAD, "PQconsumeInput");
- return;
- }
-
- /* loop until PQgetResult returns NULL */
- while (1) {
- /* incomplete result? */
- if (PQisBusy(db->con)) {
- wait_event(db, EV_READ, result_cb);
- return;
- }
-
- /* next result */
- res = PQgetResult(db->con);
- if (!res)
- break;
-
- if (db->last_result) {
- log_warning("multiple results?");
- PQclear(db->last_result);
- }
- db->last_result = res;
- }
-
- res = db->last_result;
- db->last_result = NULL;
-
- db->handler_func(db, db->handler_arg, DB_RESULT_OK, res);
- PQclear(res);
-}
-
-static void send_cb(int sock, short flags, void *arg)
-{
- int res;
- struct PgSocket *db = arg;
-
- db->wait_type = 0;
-
- res = PQflush(db->con);
- if (res > 0) {
- wait_event(db, EV_WRITE, send_cb);
- } else if (res == 0) {
- wait_event(db, EV_READ, result_cb);
- } else
- conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-
-static void connect_cb(int sock, short flags, void *arg)
-{
- struct PgSocket *db = arg;
- PostgresPollingStatusType poll_res;
-
- db->wait_type = 0;
-
- poll_res = PQconnectPoll(db->con);
- switch (poll_res) {
- case PGRES_POLLING_WRITING:
- wait_event(db, EV_WRITE, connect_cb);
- break;
- case PGRES_POLLING_READING:
- wait_event(db, EV_READ, connect_cb);
- break;
- case PGRES_POLLING_OK:
- //log_debug("login ok: fd=%d", PQsocket(db->con));
- send_event(db, DB_CONNECT_OK);
- break;
- default:
- conn_error(db, DB_CONNECT_FAILED, "PQconnectPoll");
- }
-}
-
-/*
- * Public API
- */
-
-struct PgSocket *db_create(db_handler_f fn, void *handler_arg)
-{
- struct PgSocket *db;
-
- db = calloc(1, sizeof(*db));
- if (!db)
- return NULL;
-
- db->handler_func = fn;
- db->handler_arg = handler_arg;
-
- return db;
-}
-
-void db_connect(struct PgSocket *db, const char *connstr)
-{
- db->con = PQconnectStart(connstr);
- if (db->con == NULL) {
- conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
- return;
- }
-
- if (PQstatus(db->con) == CONNECTION_BAD) {
- conn_error(db, DB_CONNECT_FAILED, "PQconnectStart");
- return;
- }
-
- wait_event(db, EV_WRITE, connect_cb);
-}
-
-
-void db_disconnect(struct PgSocket *db)
-{
- if (db->con) {
- PQfinish(db->con);
- db->con = NULL;
- if (db->last_result)
- PQclear(db->last_result);
- db->last_result = NULL;
- }
-}
-
-void db_reconnect(struct PgSocket *db)
-{
- db_disconnect(db);
- db_sleep(db, 60);
-}
-
-void db_free(struct PgSocket *db)
-{
- if (db) {
- if (db->con)
- db_disconnect(db);
- if (db->last_result)
- PQclear(db->last_result);
- free(db);
- }
-}
-
-void db_send_query_simple(struct PgSocket *db, const char *q)
-{
- int res;
-
- log_debug("%s", q);
- res = PQsendQuery(db->con, q);
- if (!res) {
- conn_error(db, DB_RESULT_BAD, "PQsendQuery");
- return;
- }
-
- res = PQflush(db->con);
- if (res > 0) {
- wait_event(db, EV_WRITE, send_cb);
- } else if (res == 0) {
- wait_event(db, EV_READ, result_cb);
- } else
- conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-void db_send_query_params(struct PgSocket *db, const char *q, int cnt, ...)
-{
- int res, i;
- va_list ap;
- const char * args[10];
-
- if (cnt > 10) cnt = 10;
-
- va_start(ap, cnt);
- for (i = 0; i < cnt; i++)
- args[i] = va_arg(ap, char *);
- va_end(ap);
-
- res = PQsendQueryParams(db->con, q, cnt, NULL, args, NULL, NULL, 0);
- if (!res) {
- conn_error(db, DB_RESULT_BAD, "PQsendQueryParams");
- return;
- }
-
- res = PQflush(db->con);
- if (res > 0) {
- wait_event(db, EV_WRITE, send_cb);
- } else if (res == 0) {
- wait_event(db, EV_READ, result_cb);
- } else
- conn_error(db, DB_RESULT_BAD, "PQflush");
-}
-
-int db_connection_valid(struct PgSocket *db)
-{
- return (db->con != NULL);
-}
-
+++ /dev/null
-
-#ifndef __CONNECTION_H__
-#define __CONNECTION_H__
-
-#include <libpq-fe.h>
-
-enum PgEvent {
- DB_CONNECT_OK,
- DB_CONNECT_FAILED,
- DB_RESULT_OK,
- DB_RESULT_BAD,
- DB_TIMEOUT,
-};
-
-struct PgSocket;
-
-typedef void (*db_handler_f)(struct PgSocket *pgs, void *arg, enum PgEvent dbev, PGresult *res);
-
-struct PgSocket *db_create(db_handler_f fn, void *arg);
-void db_free(struct PgSocket *db);
-
-void db_connect(struct PgSocket *db, const char *cstr);
-void db_disconnect(struct PgSocket *db);
-void db_reconnect(struct PgSocket *db);
-
-void db_send_query_simple(struct PgSocket *db, const char *query);
-void db_send_query_params(struct PgSocket *db, const char *query, int args, ...);
-
-void db_sleep(struct PgSocket *db, double timeout);
-
-int db_connection_valid(struct PgSocket *db);
-
-#endif
-
{
const char *q = "select queue_name from pgq.get_queue_info()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_maint, q);
+ pgs_send_query_simple(db->c_maint, q);
db->maint_state = DB_MAINT_LOAD_QUEUES;
}
{
const char *q = "select * from pgq.maint_tables_to_vacuum()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_maint, q);
+ pgs_send_query_simple(db->c_maint, q);
db->maint_state = DB_MAINT_VACUUM_LIST;
}
qname = strlist_pop(db->maint_item_list);
q = "select pgq.maint_rotate_part1($1)";
log_debug("%s: %s [%s]", db->name, q, qname);
- db_send_query_params(db->c_maint, q, 1, qname);
+ pgs_send_query_params(db->c_maint, q, 1, qname);
free(qname);
db->maint_state = DB_MAINT_ROT1;
}
{
const char *q = "select pgq.maint_rotate_part2()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_maint, q);
+ pgs_send_query_simple(db->c_maint, q);
db->maint_state = DB_MAINT_ROT2;
}
table = strlist_pop(db->maint_item_list);
snprintf(qbuf, sizeof(qbuf), "vacuum %s", table);
log_debug("%s: %s", db->name, qbuf);
- db_send_query_simple(db->c_maint, qbuf);
+ pgs_send_query_simple(db->c_maint, qbuf);
free(table);
db->maint_state = DB_MAINT_DO_VACUUM;
}
{
log_debug("%s: close_maint, %f", db->name, sleep_time);
db->maint_state = DB_CLOSED;
- db_disconnect(db->c_maint);
- db_sleep(db->c_maint, sleep_time);
+ pgs_disconnect(db->c_maint);
+ pgs_sleep(db->c_maint, sleep_time);
}
static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
struct PgDatabase *db = arg;
switch (ev) {
- case DB_CONNECT_OK:
+ case PGS_CONNECT_OK:
log_debug("%s: starting maintenance", db->name);
run_queue_list(db);
break;
- case DB_RESULT_OK:
+ case PGS_RESULT_OK:
switch (db->maint_state) {
case DB_MAINT_LOAD_QUEUES:
if (!fill_items(db, res))
fatal("bad state");
}
break;
- case DB_TIMEOUT:
+ case PGS_TIMEOUT:
log_debug("%s: maint timeout", db->name);
- if (!db_connection_valid(db->c_maint))
+ if (!pgs_connection_valid(db->c_maint))
launch_maint(db);
else
run_queue_list(db);
break;
default:
- db_reconnect(db->c_maint);
+ pgs_reconnect(db->c_maint);
}
return;
mem_err:
strlist_free(db->maint_item_list);
db->maint_item_list = NULL;
}
- db_disconnect(db->c_maint);
- db_sleep(db->c_maint, 20);
+ pgs_disconnect(db->c_maint);
+ pgs_sleep(db->c_maint, 20);
}
void launch_maint(struct PgDatabase *db)
{
+ const char *cstr;
+
log_debug("%s: launch_maint", db->name);
if (!db->c_maint) {
strlist_free(db->maint_item_list);
db->maint_item_list = NULL;
}
- db->c_maint = db_create(maint_handler, db);
+ cstr = make_connstr(db->name);
+ db->c_maint = pgs_create(cstr, maint_handler, db);
}
- if (!db_connection_valid(db->c_maint)) {
- const char *cstr = make_connstr(db->name);
-
- db_connect(db->c_maint, cstr);
- free(cstr);
+ if (!pgs_connection_valid(db->c_maint)) {
+ pgs_connect(db->c_maint);
} else {
/* Already have a connection, what are we doing here */
log_error("%s: maint already initialized", db->name);
static void handle_sigint(int sock, short flags, void *arg)
{
log_info("Got SIGINT, shutting down");
- /* pidfile cleanup happens via atexit() */
+ /* notify main loop to exit */
got_sigint = 1;
}
const char *make_connstr(const char *dbname)
{
- char buf[512];
+ static char buf[512];
snprintf(buf, sizeof(buf), "%s dbname=%s ", cf.base_connstr, dbname);
- return strdup(buf);
+ return buf;
}
static void launch_db(const char *dbname)
static void drop_db(struct PgDatabase *db)
{
statlist_remove(&database_list, &db->head);
- db_free(db->c_ticker);
- db_free(db->c_maint);
- db_free(db->c_retry);
+ pgs_free(db->c_ticker);
+ pgs_free(db->c_maint);
+ pgs_free(db->c_retry);
strlist_free(db->maint_item_list);
free(db->name);
free(db);
const char *s;
switch (ev) {
- case DB_CONNECT_OK:
- db_send_query_simple(db, "select datname from pg_database"
+ case PGS_CONNECT_OK:
+ pgs_send_query_simple(db, "select datname from pg_database"
" where not datistemplate and datallowconn");
break;
- case DB_RESULT_OK:
+ case PGS_RESULT_OK:
for (i = 0; i < PQntuples(res); i++) {
s = PQgetvalue(res, i, 0);
launch_db(s);
}
- db_disconnect(db);
- db_sleep(db, cf.check_period);
+ pgs_disconnect(db);
+ pgs_sleep(db, cf.check_period);
break;
- case DB_TIMEOUT:
+ case PGS_TIMEOUT:
detect_dbs();
break;
default:
- db_disconnect(db);
- db_sleep(db, cf.check_period);
+ pgs_disconnect(db);
+ pgs_sleep(db, cf.check_period);
}
}
static void detect_dbs(void)
{
- const char *cstr = make_connstr(cf.initial_database);
- if (!db_template)
- db_template = db_create(detect_handler, NULL);
- db_connect(db_template, cstr);
- free(cstr);
+ if (!db_template) {
+ const char *cstr = make_connstr(cf.initial_database);
+ db_template = pgs_create(cstr, detect_handler, NULL);
+ }
+ pgs_connect(db_template);
}
static bool launch_db_cb(void *arg, const char *db)
drop_db(db);
}
if (db_template) {
- db_free(db_template);
+ pgs_free(db_template);
db_template = NULL;
}
} else if (!db_template) {
db = container_of(elem, struct PgDatabase, head);
drop_db(db);
}
- db_free(db_template);
+ pgs_free(db_template);
event_base_free(NULL);
}
#include <usual/list.h>
#include <usual/statlist.h>
#include <usual/logging.h>
-
-#include "connection.h"
+#include <usual/pgsocket.h>
enum DbState {
DB_CLOSED,
static void close_retry(struct PgDatabase *db, double sleep_time)
{
log_debug("%s: close_retry, %f", db->name, sleep_time);
- db_disconnect(db->c_retry);
- db_sleep(db->c_retry, sleep_time);
+ pgs_disconnect(db->c_retry);
+ pgs_sleep(db->c_retry, sleep_time);
}
static void run_retry(struct PgDatabase *db)
{
const char *q = "select * from pgq.maint_retry_events()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_retry, q);
+ pgs_send_query_simple(db->c_retry, q);
}
static void parse_retry(struct PgDatabase *db, PGresult *res)
struct PgDatabase *db = arg;
switch (ev) {
- case DB_CONNECT_OK:
+ case PGS_CONNECT_OK:
log_debug("%s: starting retry event processing", db->name);
run_retry(db);
break;
- case DB_RESULT_OK:
+ case PGS_RESULT_OK:
parse_retry(db, res);
break;
- case DB_TIMEOUT:
+ case PGS_TIMEOUT:
log_debug("%s: retry timeout", db->name);
launch_retry(db);
break;
default:
- db_reconnect(db->c_retry);
+ pgs_reconnect(db->c_retry);
}
}
log_debug("%s: retry already initialized", db->name);
} else {
log_debug("%s: launch_retry", db->name);
- db->c_retry = db_create(retry_handler, db);
+ cstr = make_connstr(db->name);
+ db->c_retry = pgs_create(cstr, retry_handler, db);
}
- cstr = make_connstr(db->name);
- db_connect(db->c_retry, cstr);
- free(cstr);
+ pgs_connect(db->c_retry);
}
{
const char *q = "select 1 from pg_catalog.pg_namespace where nspname='pgq'";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_ticker, q);
+ pgs_send_query_simple(db->c_ticker, q);
db->state = DB_TICKER_CHECK_PGQ;
}
{
const char *q = "select pgq.version()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_ticker, q);
+ pgs_send_query_simple(db->c_ticker, q);
db->state = DB_TICKER_CHECK_VERSION;
}
{
const char *q = "select pgq.ticker()";
log_debug("%s: %s", db->name, q);
- db_send_query_simple(db->c_ticker, q);
+ pgs_send_query_simple(db->c_ticker, q);
db->state = DB_TICKER_RUN;
}
{
log_debug("%s: close_ticker, %f", db->name, sleep_time);
db->state = DB_CLOSED;
- db_disconnect(db->c_ticker);
- db_sleep(db->c_ticker, sleep_time);
+ pgs_disconnect(db->c_ticker);
+ pgs_sleep(db->c_ticker, sleep_time);
}
static void parse_pgq_check(struct PgDatabase *db, PGresult *res)
log_debug("%s: calling pgq.ticker() failed", db->name);
}
- db_sleep(db->c_ticker, cf.ticker_period);
+ pgs_sleep(db->c_ticker, cf.ticker_period);
}
static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresult *res)
struct PgDatabase *db = arg;
switch (ev) {
- case DB_CONNECT_OK:
+ case PGS_CONNECT_OK:
run_pgq_check(db);
break;
- case DB_RESULT_OK:
+ case PGS_RESULT_OK:
switch (db->state) {
case DB_TICKER_CHECK_PGQ:
parse_pgq_check(db, res);
fatal("bad state");
}
break;
- case DB_TIMEOUT:
+ case PGS_TIMEOUT:
log_debug("%s: tick timeout", db->name);
- if (!db_connection_valid(db->c_ticker))
+ if (!pgs_connection_valid(db->c_ticker))
launch_ticker(db);
else
run_ticker(db);
break;
default:
- db_reconnect(db->c_ticker);
+ pgs_reconnect(db->c_ticker);
}
}
void launch_ticker(struct PgDatabase *db)
{
- const char *cstr = make_connstr(db->name);
log_debug("%s: launch_ticker", db->name);
- if (!db->c_ticker)
- db->c_ticker = db_create(tick_handler, db);
- db_connect(db->c_ticker, cstr);
- free(cstr);
+ if (!db->c_ticker) {
+ const char *cstr = make_connstr(db->name);
+ db->c_ticker = pgs_create(cstr, tick_handler, db);
+ }
+ pgs_connect(db->c_ticker);
}