From 58a138439736b52bcb9128bea2df73d38a57b7c8 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 3 Sep 2010 17:08:38 +0300 Subject: [PATCH] sql/ticker: make new logic actually work --- lib | 2 +- sql/ticker/maint.c | 110 +++++++++++--------------------------------- sql/ticker/pgqd.c | 29 ++++++++++++ sql/ticker/pgqd.h | 11 ++++- sql/ticker/retry.c | 1 + sql/ticker/ticker.c | 6 ++- 6 files changed, 71 insertions(+), 88 deletions(-) diff --git a/lib b/lib index a083d113..4d87d575 160000 --- a/lib +++ b/lib @@ -1 +1 @@ -Subproject commit a083d113aa7858bbd4bf15ac16f3418036fea7ad +Subproject commit 4d87d575df228ffc1e4c29c112b260ab9baf61b7 diff --git a/sql/ticker/maint.c b/sql/ticker/maint.c index f05b640d..5cf0f7d0 100644 --- a/sql/ticker/maint.c +++ b/sql/ticker/maint.c @@ -81,7 +81,10 @@ static bool fill_op_list(struct PgDatabase *db, PGresult *res) return false; list_init(&op->head); fname = PQgetvalue(res, i, 0); - farg = PQgetvalue(res, i, 1); + farg = NULL; + if (!PQgetisnull(res, i, 1)) + farg = PQgetvalue(res, i, 1); + log_debug("load_op: %s / %s", fname, farg ? farg : "NULL"); op->func_name = strdup(fname); if (!op->func_name) goto failed; @@ -100,10 +103,10 @@ failed: static void run_op_list(struct PgDatabase *db) { - const char *q = "select queue_name from pgq.maint_operations()"; + const char *q = "select func_name, func_arg from pgq.maint_operations()"; log_debug("%s: %s", db->name, q); pgs_send_query_simple(db->c_maint, q); - db->maint_state = DB_MAINT_LOAD_QUEUES; + db->maint_state = DB_MAINT_LOAD_OPS; } static const char *stmt_names[] = { @@ -112,79 +115,6 @@ static const char *stmt_names[] = { NULL }; -static inline bool idstart(unsigned char c) -{ - return isalpha(c) || c == '_'; -} - -static inline bool idbody(unsigned char c) -{ - return idstart(c) || isdigit(c); -} - -static int copy_ident(const char *src, int srclen, char *dst, int dstlen) -{ - int i, j; - - if (dstlen <= 2) - return -1; - - if (!idstart(src[0])) - goto needs_quote; - - for (i = 0; i < srclen; i++) { - if (!idbody(i)) - goto needs_quote; - if (i >= dstlen) - return -1; - dst[i] = src[i]; - } - if (i >= dstlen) - return -1; - dst[i] = 0; - return i; - -needs_quote: - dst[0] = '"'; - for (i = 0, j = 1; i < srclen; i++) { - if (j >= dstlen) - return -1; - dst[j++] = src[i]; - if (src[i] == '"') { - if (j >= dstlen) - return -1; - dst[j++] = src[i]; - } - } - if (j >= dstlen - 2) - return -1; - dst[j++] = '"'; - dst[j] = 0; - return j; -} - -static bool quote_fqname(const char *name, char *dst, int dstlen) -{ - const char *dot; - const char *scm = "public"; - int scmlen = strlen(scm); - int res; - - dot = strchr(name, '.'); - if (dot) { - scm = name; - scmlen = dot - name; - name = dot + 1; - } - - res = copy_ident(scm, scmlen, dst, dstlen); - if (res < 0) - return false; - dst[res] = '.'; - res = copy_ident(name, strlen(name), dst + res + 1, dstlen - res - 1); - return res > 0; -} - static void run_op(struct PgDatabase *db, PGresult *res) { struct MaintOp *op; @@ -206,6 +136,7 @@ next: } op = next_op(db); if (!op) { + stats.n_maint++; close_maint(db, cf.maint_period); return; } @@ -215,7 +146,7 @@ repeat: for (np = stmt_names; *np; np++) { if (strcasecmp(op->func_name, *np) != 0) continue; - if (!quote_fqname(op->func_arg, namebuf, sizeof(namebuf))) { + if (!pg_quote_fqident(namebuf, op->func_arg, sizeof(namebuf))) { log_error("Bad table name? - %s", op->func_arg); goto next; } @@ -227,13 +158,19 @@ repeat: } /* run as a function */ - if (!quote_fqname(op->func_name, namebuf, sizeof(namebuf))) { + if (!pg_quote_fqident(namebuf, op->func_name, sizeof(namebuf))) { log_error("Bad func name? - %s", op->func_name); goto next; } - snprintf(buf, sizeof(buf), "select %s($1)", namebuf); - log_debug("%s: [%s]", db->name, buf); - pgs_send_query_params(db->c_maint, buf, 1, op->func_arg); + if (op->func_arg) { + snprintf(buf, sizeof(buf), "select %s($1)", namebuf); + log_debug("%s: [%s]", db->name, buf); + pgs_send_query_params(db->c_maint, buf, 1, op->func_arg); + } else { + snprintf(buf, sizeof(buf), "select %s()", namebuf); + log_debug("%s: [%s]", db->name, buf); + pgs_send_query_simple(db->c_maint, buf); + } done: db->maint_state = DB_MAINT_OP; } @@ -310,15 +247,20 @@ static void maint_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresu switch (ev) { case PGS_CONNECT_OK: log_debug("%s: starting maintenance", db->name); - run_test_version(db); + if (db->has_maint_operations) + run_op_list(db); + else + run_test_version(db); break; case PGS_RESULT_OK: switch (db->maint_state) { case DB_MAINT_TEST_VERSION: - if (has_ops(res)) + if (has_ops(res)) { + db->has_maint_operations = true; run_op_list(db); - else + } else { run_queue_list(db); + } break; case DB_MAINT_LOAD_OPS: if (!fill_op_list(db, res)) diff --git a/sql/ticker/pgqd.c b/sql/ticker/pgqd.c index 81d6d402..f124db57 100644 --- a/sql/ticker/pgqd.c +++ b/sql/ticker/pgqd.c @@ -34,6 +34,8 @@ static const char *sample_ini = struct Config cf; +struct Stats stats; + static struct PgSocket *db_template; static STATLIST(database_list); @@ -52,6 +54,7 @@ static const struct CfKey conf_params[] = { { "maint_period", CF_REL_TIME_DOUBLE(maint_period), "120" }, { "retry_period", CF_REL_TIME_DOUBLE(retry_period), "30" }, { "ticker_period", CF_REL_TIME_DOUBLE(ticker_period), "1" }, + { "stats_period", CF_REL_TIME_DOUBLE(stats_period), "30" }, { NULL }, }; @@ -162,6 +165,7 @@ static void launch_db(const char *dbname) db = calloc(1, sizeof(*db)); db->name = strdup(dbname); list_init(&db->head); + statlist_init(&db->maint_op_list, "maint_op_list"); statlist_append(&database_list, &db->head); /* start working on it */ @@ -268,6 +272,28 @@ static void recheck_dbs(void) } } +static struct event stats_ev; + +static void stats_handler(int fd, short flags, void *arg) +{ + struct timeval tv = { cf.stats_period, 0 }; + + log_info("{ticks: %d, maint: %d, retry: %d}", + stats.n_ticks, stats.n_maint, stats.n_retry); + memset(&stats, 0, sizeof(stats)); + + if (evtimer_add(&stats_ev, &tv) < 0) + fatal_perror("evtimer_add"); +} + +static void stats_setup(void) +{ + struct timeval tv = { cf.stats_period, 0 }; + evtimer_set(&stats_ev, stats_handler, NULL); + if (evtimer_add(&stats_ev, &tv) < 0) + fatal_perror("evtimer_add"); +} + static void cleanup(void) { struct PgDatabase *db; @@ -372,11 +398,14 @@ int main(int argc, char *argv[]) signal_setup(); + stats_setup(); + recheck_dbs(); while (!got_sigint) main_loop_once(); + _exit(1); cleanup(); return 0; diff --git a/sql/ticker/pgqd.h b/sql/ticker/pgqd.h index 15df3074..97da7b68 100644 --- a/sql/ticker/pgqd.h +++ b/sql/ticker/pgqd.h @@ -42,6 +42,8 @@ struct PgDatabase { struct StrList *maint_item_list; struct StatList maint_op_list; struct MaintOp *cur_maint; + + bool has_maint_operations; }; struct Config { @@ -56,10 +58,17 @@ struct Config { double check_period; double maint_period; double ticker_period; + double stats_period; }; -extern struct Config cf; +struct Stats { + int n_ticks; + int n_maint; + int n_retry; +}; +extern struct Config cf; +extern struct Stats stats; void launch_ticker(struct PgDatabase *db); void launch_maint(struct PgDatabase *db); diff --git a/sql/ticker/retry.c b/sql/ticker/retry.c index e3308625..b70b7747 100644 --- a/sql/ticker/retry.c +++ b/sql/ticker/retry.c @@ -19,6 +19,7 @@ static void parse_retry(struct PgDatabase *db, PGresult *res) { if (PQntuples(res) == 1) { char *val = PQgetvalue(res, 0, 0); + stats.n_retry += atoi(val); if (strcmp(val, "0") != 0) { run_retry(db); return; diff --git a/sql/ticker/ticker.c b/sql/ticker/ticker.c index 8b2b9d82..a833c62b 100644 --- a/sql/ticker/ticker.c +++ b/sql/ticker/ticker.c @@ -19,7 +19,7 @@ static void run_version_check(struct PgDatabase *db) static void run_ticker(struct PgDatabase *db) { const char *q = "select pgq.ticker()"; - log_debug("%s: %s", db->name, q); + log_noise("%s: %s", db->name, q); pgs_send_query_simple(db->c_ticker, q); db->state = DB_TICKER_RUN; } @@ -74,6 +74,8 @@ static void parse_ticker_result(struct PgDatabase *db, PGresult *res) { if (PQntuples(res) != 1) { log_debug("%s: calling pgq.ticker() failed", db->name); + } else { + stats.n_ticks++; } pgs_sleep(db->c_ticker, cf.ticker_period); @@ -103,7 +105,7 @@ static void tick_handler(struct PgSocket *s, void *arg, enum PgEvent ev, PGresul } break; case PGS_TIMEOUT: - log_debug("%s: tick timeout", db->name); + log_noise("%s: tick timeout", db->name); if (!pgs_connection_valid(db->c_ticker)) launch_ticker(db); else -- 2.39.5