return false;
list_init(&op->head);
fname = PQgetvalue(res, i, 0);
- farg = PQgetvalue(res, i, 1);
+ farg = NULL;
+ if (!PQgetisnull(res, i, 1))
+ farg = PQgetvalue(res, i, 1);
+ log_debug("load_op: %s / %s", fname, farg ? farg : "NULL");
op->func_name = strdup(fname);
if (!op->func_name)
goto failed;
static void run_op_list(struct PgDatabase *db)
{
- const char *q = "select queue_name from pgq.maint_operations()";
+ const char *q = "select func_name, func_arg from pgq.maint_operations()";
log_debug("%s: %s", db->name, q);
pgs_send_query_simple(db->c_maint, q);
- db->maint_state = DB_MAINT_LOAD_QUEUES;
+ db->maint_state = DB_MAINT_LOAD_OPS;
}
static const char *stmt_names[] = {
NULL
};
-static inline bool idstart(unsigned char c)
-{
- return isalpha(c) || c == '_';
-}
-
-static inline bool idbody(unsigned char c)
-{
- return idstart(c) || isdigit(c);
-}
-
-static int copy_ident(const char *src, int srclen, char *dst, int dstlen)
-{
- int i, j;
-
- if (dstlen <= 2)
- return -1;
-
- if (!idstart(src[0]))
- goto needs_quote;
-
- for (i = 0; i < srclen; i++) {
- if (!idbody(i))
- goto needs_quote;
- if (i >= dstlen)
- return -1;
- dst[i] = src[i];
- }
- if (i >= dstlen)
- return -1;
- dst[i] = 0;
- return i;
-
-needs_quote:
- dst[0] = '"';
- for (i = 0, j = 1; i < srclen; i++) {
- if (j >= dstlen)
- return -1;
- dst[j++] = src[i];
- if (src[i] == '"') {
- if (j >= dstlen)
- return -1;
- dst[j++] = src[i];
- }
- }
- if (j >= dstlen - 2)
- return -1;
- dst[j++] = '"';
- dst[j] = 0;
- return j;
-}
-
-static bool quote_fqname(const char *name, char *dst, int dstlen)
-{
- const char *dot;
- const char *scm = "public";
- int scmlen = strlen(scm);
- int res;
-
- dot = strchr(name, '.');
- if (dot) {
- scm = name;
- scmlen = dot - name;
- name = dot + 1;
- }
-
- res = copy_ident(scm, scmlen, dst, dstlen);
- if (res < 0)
- return false;
- dst[res] = '.';
- res = copy_ident(name, strlen(name), dst + res + 1, dstlen - res - 1);
- return res > 0;
-}
-
static void run_op(struct PgDatabase *db, PGresult *res)
{
struct MaintOp *op;
}
op = next_op(db);
if (!op) {
+ stats.n_maint++;
close_maint(db, cf.maint_period);
return;
}
for (np = stmt_names; *np; np++) {
if (strcasecmp(op->func_name, *np) != 0)
continue;
- if (!quote_fqname(op->func_arg, namebuf, sizeof(namebuf))) {
+ if (!pg_quote_fqident(namebuf, op->func_arg, sizeof(namebuf))) {
log_error("Bad table name? - %s", op->func_arg);
goto next;
}
}
/* run as a function */
- if (!quote_fqname(op->func_name, namebuf, sizeof(namebuf))) {
+ if (!pg_quote_fqident(namebuf, op->func_name, sizeof(namebuf))) {
log_error("Bad func name? - %s", op->func_name);
goto next;
}
- snprintf(buf, sizeof(buf), "select %s($1)", namebuf);
- log_debug("%s: [%s]", db->name, buf);
- pgs_send_query_params(db->c_maint, buf, 1, op->func_arg);
+ if (op->func_arg) {
+ snprintf(buf, sizeof(buf), "select %s($1)", namebuf);
+ log_debug("%s: [%s]", db->name, buf);
+ pgs_send_query_params(db->c_maint, buf, 1, op->func_arg);
+ } else {
+ snprintf(buf, sizeof(buf), "select %s()", namebuf);
+ log_debug("%s: [%s]", db->name, buf);
+ pgs_send_query_simple(db->c_maint, buf);
+ }
done:
db->maint_state = DB_MAINT_OP;
}
switch (ev) {
case PGS_CONNECT_OK:
log_debug("%s: starting maintenance", db->name);
- run_test_version(db);
+ if (db->has_maint_operations)
+ run_op_list(db);
+ else
+ run_test_version(db);
break;
case PGS_RESULT_OK:
switch (db->maint_state) {
case DB_MAINT_TEST_VERSION:
- if (has_ops(res))
+ if (has_ops(res)) {
+ db->has_maint_operations = true;
run_op_list(db);
- else
+ } else {
run_queue_list(db);
+ }
break;
case DB_MAINT_LOAD_OPS:
if (!fill_op_list(db, res))
struct Config cf;
+struct Stats stats;
+
static struct PgSocket *db_template;
static STATLIST(database_list);
{ "maint_period", CF_REL_TIME_DOUBLE(maint_period), "120" },
{ "retry_period", CF_REL_TIME_DOUBLE(retry_period), "30" },
{ "ticker_period", CF_REL_TIME_DOUBLE(ticker_period), "1" },
+ { "stats_period", CF_REL_TIME_DOUBLE(stats_period), "30" },
{ NULL },
};
db = calloc(1, sizeof(*db));
db->name = strdup(dbname);
list_init(&db->head);
+ statlist_init(&db->maint_op_list, "maint_op_list");
statlist_append(&database_list, &db->head);
/* start working on it */
}
}
+static struct event stats_ev;
+
+static void stats_handler(int fd, short flags, void *arg)
+{
+ struct timeval tv = { cf.stats_period, 0 };
+
+ log_info("{ticks: %d, maint: %d, retry: %d}",
+ stats.n_ticks, stats.n_maint, stats.n_retry);
+ memset(&stats, 0, sizeof(stats));
+
+ if (evtimer_add(&stats_ev, &tv) < 0)
+ fatal_perror("evtimer_add");
+}
+
+static void stats_setup(void)
+{
+ struct timeval tv = { cf.stats_period, 0 };
+ evtimer_set(&stats_ev, stats_handler, NULL);
+ if (evtimer_add(&stats_ev, &tv) < 0)
+ fatal_perror("evtimer_add");
+}
+
static void cleanup(void)
{
struct PgDatabase *db;
signal_setup();
+ stats_setup();
+
recheck_dbs();
while (!got_sigint)
main_loop_once();
+ _exit(1);
cleanup();
return 0;
static void run_ticker(struct PgDatabase *db)
{
const char *q = "select pgq.ticker()";
- log_debug("%s: %s", db->name, q);
+ log_noise("%s: %s", db->name, q);
pgs_send_query_simple(db->c_ticker, q);
db->state = DB_TICKER_RUN;
}
{
if (PQntuples(res) != 1) {
log_debug("%s: calling pgq.ticker() failed", db->name);
+ } else {
+ stats.n_ticks++;
}
pgs_sleep(db->c_ticker, cf.ticker_period);
}
break;
case PGS_TIMEOUT:
- log_debug("%s: tick timeout", db->name);
+ log_noise("%s: tick timeout", db->name);
if (!pgs_connection_valid(db->c_ticker))
launch_ticker(db);
else