sql/ticker: make new logic actually work
authorMarko Kreen <markokr@gmail.com>
Fri, 3 Sep 2010 14:08:38 +0000 (17:08 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 3 Sep 2010 14:08:38 +0000 (17:08 +0300)
lib
sql/ticker/maint.c
sql/ticker/pgqd.c
sql/ticker/pgqd.h
sql/ticker/retry.c
sql/ticker/ticker.c

diff --git a/lib b/lib
index a083d113aa7858bbd4bf15ac16f3418036fea7ad..4d87d575df228ffc1e4c29c112b260ab9baf61b7 160000 (submodule)
--- a/lib
+++ b/lib
@@ -1 +1 @@
-Subproject commit a083d113aa7858bbd4bf15ac16f3418036fea7ad
+Subproject commit 4d87d575df228ffc1e4c29c112b260ab9baf61b7
index f05b640d603303bd294f842dc08ef9034fefb002..5cf0f7d0e9545ebfcfcce39671c2d2697efcbd22 100644 (file)
@@ -81,7 +81,10 @@ static bool fill_op_list(struct PgDatabase *db, PGresult *res)
                        return false;
                list_init(&op->head);
                fname = PQgetvalue(res, i, 0);
-               farg = PQgetvalue(res, i, 1);
+               farg = NULL;
+               if (!PQgetisnull(res, i, 1))
+                       farg = PQgetvalue(res, i, 1);
+               log_debug("load_op: %s / %s", fname, farg ? farg : "NULL");
                op->func_name = strdup(fname);
                if (!op->func_name)
                        goto failed;
@@ -100,10 +103,10 @@ failed:
 
 static void run_op_list(struct PgDatabase *db)
 {
-       const char *q = "select queue_name from pgq.maint_operations()";
+       const char *q = "select func_name, func_arg from pgq.maint_operations()";
        log_debug("%s: %s", db->name, q);
        pgs_send_query_simple(db->c_maint, q);
-       db->maint_state = DB_MAINT_LOAD_QUEUES;
+       db->maint_state = DB_MAINT_LOAD_OPS;
 }
 
 static const char *stmt_names[] = {
@@ -112,79 +115,6 @@ static const char *stmt_names[] = {
        NULL
 };
 
-static inline bool idstart(unsigned char c)
-{
-       return isalpha(c) || c == '_';
-}
-
-static inline bool idbody(unsigned char c)
-{
-       return idstart(c) || isdigit(c);
-}
-
-static int copy_ident(const char *src, int srclen, char *dst, int dstlen)
-{
-       int i, j;
-
-       if (dstlen <= 2)
-               return -1;
-
-       if (!idstart(src[0]))
-               goto needs_quote;
-
-       for (i = 0; i < srclen; i++) {
-               if (!idbody(i))
-                       goto needs_quote;
-               if (i >= dstlen)
-                       return -1;
-               dst[i] = src[i];
-       }
-       if (i >= dstlen)
-               return -1;
-       dst[i] = 0;
-       return i;
-
-needs_quote:
-       dst[0] = '"';
-       for (i = 0, j = 1; i < srclen; i++) {
-               if (j >= dstlen)
-                       return -1;
-               dst[j++] = src[i];
-               if (src[i] == '"') {
-                       if (j >= dstlen)
-                               return -1;
-                       dst[j++] = src[i];
-               }
-       }
-       if (j >= dstlen - 2)
-               return -1;
-       dst[j++] = '"';
-       dst[j] = 0;
-       return j;
-}
-
-static bool quote_fqname(const char *name, char *dst, int dstlen)
-{
-       const char *dot;
-       const char *scm = "public";
-       int scmlen = strlen(scm);
-       int res;
-
-       dot = strchr(name, '.');
-       if (dot) {
-               scm = name;
-               scmlen = dot - name;
-               name = dot + 1;
-       }
-
-       res = copy_ident(scm, scmlen, dst, dstlen);
-       if (res < 0)
-               return false;
-       dst[res] = '.';
-       res = copy_ident(name, strlen(name), dst + res + 1, dstlen - res - 1);
-       return res > 0;
-}
-
 static void run_op(struct PgDatabase *db, PGresult *res)
 {
        struct MaintOp *op;
@@ -206,6 +136,7 @@ next:
        }
        op = next_op(db);
        if (!op) {
+               stats.n_maint++;
                close_maint(db, cf.maint_period);
                return;
        }
@@ -215,7 +146,7 @@ repeat:
        for (np = stmt_names; *np; np++) {
                if (strcasecmp(op->func_name, *np) != 0)
                        continue;
-               if (!quote_fqname(op->func_arg, namebuf, sizeof(namebuf))) {
+               if (!pg_quote_fqident(namebuf, op->func_arg, sizeof(namebuf))) {
                        log_error("Bad table name? - %s", op->func_arg);
                        goto next;
                }
@@ -227,13 +158,19 @@ repeat:
        }
 
        /* run as a function */
-       if (!quote_fqname(op->func_name, namebuf, sizeof(namebuf))) {
+       if (!pg_quote_fqident(namebuf, op->func_name, sizeof(namebuf))) {
                log_error("Bad func name? - %s", op->func_name);
                goto next;
        }
-       snprintf(buf, sizeof(buf), "select %s($1)", namebuf);
-       log_debug("%s: [%s]", db->name, buf);
-       pgs_send_query_params(db->c_maint, buf, 1, op->func_arg);
+       if (op->func_arg) {
+               snprintf(buf, sizeof(buf), "select %s($1)", namebuf);
+               log_debug("%s: [%s]", db->name, buf);
+               pgs_send_query_params(db->c_maint, buf, 1, op->func_arg);
+       } else {
+               snprintf(buf, sizeof(buf), "select %s()", namebuf);
+               log_debug("%s: [%s]", db->name, buf);
+               pgs_send_query_simple(db->c_maint, buf);
+       }
 done:
        db->maint_state = DB_MAINT_OP;
 }
@@ -310,15 +247,20 @@ static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresu
        switch (ev) {
        case PGS_CONNECT_OK:
                log_debug("%s: starting maintenance", db->name);
-               run_test_version(db);
+               if (db->has_maint_operations)
+                       run_op_list(db);
+               else
+                       run_test_version(db);
                break;
        case PGS_RESULT_OK:
                switch (db->maint_state) {
                case DB_MAINT_TEST_VERSION:
-                       if (has_ops(res))
+                       if (has_ops(res)) {
+                               db->has_maint_operations = true;
                                run_op_list(db);
-                       else
+                       } else {
                                run_queue_list(db);
+                       }
                        break;
                case DB_MAINT_LOAD_OPS:
                        if (!fill_op_list(db, res))
index 81d6d40286a1fda9bbbe122e50809eaa306cfaf1..f124db5794d9b3e56399c01b012e847fb03995d2 100644 (file)
@@ -34,6 +34,8 @@ static const char *sample_ini =
 
 struct Config cf;
 
+struct Stats stats;
+
 static struct PgSocket *db_template;
 
 static STATLIST(database_list);
@@ -52,6 +54,7 @@ static const struct CfKey conf_params[] = {
        { "maint_period", CF_REL_TIME_DOUBLE(maint_period), "120" },
        { "retry_period", CF_REL_TIME_DOUBLE(retry_period), "30" },
        { "ticker_period", CF_REL_TIME_DOUBLE(ticker_period), "1" },
+       { "stats_period", CF_REL_TIME_DOUBLE(stats_period), "30" },
        { NULL },
 };
 
@@ -162,6 +165,7 @@ static void launch_db(const char *dbname)
        db = calloc(1, sizeof(*db));
        db->name = strdup(dbname);
        list_init(&db->head);
+       statlist_init(&db->maint_op_list, "maint_op_list");
        statlist_append(&database_list, &db->head);
 
        /* start working on it */
@@ -268,6 +272,28 @@ static void recheck_dbs(void)
        }
 }
 
+static struct event stats_ev;
+
+static void stats_handler(int fd, short flags, void *arg)
+{
+       struct timeval tv = { cf.stats_period, 0 };
+
+       log_info("{ticks: %d, maint: %d, retry: %d}",
+                stats.n_ticks, stats.n_maint, stats.n_retry);
+       memset(&stats, 0, sizeof(stats));
+
+       if (evtimer_add(&stats_ev, &tv) < 0)
+               fatal_perror("evtimer_add");
+}
+
+static void stats_setup(void)
+{
+       struct timeval tv = { cf.stats_period, 0 };
+       evtimer_set(&stats_ev, stats_handler, NULL);
+       if (evtimer_add(&stats_ev, &tv) < 0)
+               fatal_perror("evtimer_add");
+}
+
 static void cleanup(void)
 {
        struct PgDatabase *db;
@@ -372,11 +398,14 @@ int main(int argc, char *argv[])
 
        signal_setup();
 
+       stats_setup();
+
        recheck_dbs();
 
        while (!got_sigint)
                main_loop_once();
 
+       _exit(1);
        cleanup();
 
        return 0;
index 15df3074f8896e5d8f0ac12410557b84363e53b2..97da7b68f53bd8913e6128fe168aecc3c341169e 100644 (file)
@@ -42,6 +42,8 @@ struct PgDatabase {
        struct StrList *maint_item_list;
        struct StatList maint_op_list;
        struct MaintOp *cur_maint;
+
+       bool has_maint_operations;
 };
 
 struct Config {
@@ -56,10 +58,17 @@ struct Config {
        double check_period;
        double maint_period;
        double ticker_period;
+       double stats_period;
 };
 
-extern struct Config cf;
+struct Stats {
+       int n_ticks;
+       int n_maint;
+       int n_retry;
+};
 
+extern struct Config cf;
+extern struct Stats stats;
 
 void launch_ticker(struct PgDatabase *db);
 void launch_maint(struct PgDatabase *db);
index e330862552966b94a8551844cbc0220cc6a79fd7..b70b7747c366f2f3c5b8f272364d67020a278f57 100644 (file)
@@ -19,6 +19,7 @@ static void parse_retry(struct PgDatabase *db, PGresult *res)
 {
        if (PQntuples(res) == 1) {
                char *val = PQgetvalue(res, 0, 0);
+               stats.n_retry += atoi(val);
                if (strcmp(val, "0") != 0) {
                        run_retry(db);
                        return;
index 8b2b9d8278b583e166cfebfbc1babf4f1f9582d8..a833c62b6d5ded108e1b8aae1b2745caa764338c 100644 (file)
@@ -19,7 +19,7 @@ static void run_version_check(struct PgDatabase *db)
 static void run_ticker(struct PgDatabase *db)
 {
        const char *q = "select pgq.ticker()";
-       log_debug("%s: %s", db->name, q);
+       log_noise("%s: %s", db->name, q);
        pgs_send_query_simple(db->c_ticker, q);
        db->state = DB_TICKER_RUN;
 }
@@ -74,6 +74,8 @@ static void parse_ticker_result(struct PgDatabase *db, PGresult *res)
 {
        if (PQntuples(res) != 1) {
                log_debug("%s: calling pgq.ticker() failed", db->name);
+       } else {
+               stats.n_ticks++;
        }
 
        pgs_sleep(db->c_ticker, cf.ticker_period);
@@ -103,7 +105,7 @@ static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresul
                }
                break;
        case PGS_TIMEOUT:
-               log_debug("%s: tick timeout", db->name);
+               log_noise("%s: tick timeout", db->name);
                if (!pgs_connection_valid(db->c_ticker))
                        launch_ticker(db);
                else