try to survive errors delicately, by dropping connection where possible.
Thats not yet possible for event_del() and evtimer_add(), exit immidiately then.
#define sbuf_socket(sbuf) ((sbuf)->sock)
void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg);
-void sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix);
-void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec);
+bool sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix) _MUSTCHECK;
+bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec) _MUSTCHECK;
void sbuf_pause(SBuf *sbuf);
void sbuf_continue(SBuf *sbuf);
bool sbuf_answer(SBuf *sbuf, const void *buf, int len) _MUSTCHECK;
-void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb);
+bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb) _MUSTCHECK;
/*
* Returns true if SBuf is has no data buffered
{
List *item;
PgPool *pool;
+ int err;
/* don't touch anything if takeover is in progress */
if (cf_reboot)
loader_users_check();
skip:
- evtimer_add(&full_maint_ev, &full_maint_period);
+ err = evtimer_add(&full_maint_ev, &full_maint_period);
+ if (err < 0)
+ /* fixme */
+ fatal_perror("evtimer_add");
}
/* first-time initializtion */
void janitor_setup(void)
{
+ int err;
+
/* launch maintenance */
evtimer_set(&full_maint_ev, do_full_maint, NULL);
- evtimer_add(&full_maint_ev, &full_maint_period);
+ err = evtimer_add(&full_maint_ev, &full_maint_period);
+ if (err < 0)
+ fatal_perror("janitor_setup: evtimer_add");
}
/* as [pgbouncer] section can be loaded after databases,
fatal_perror("sigprocmask");
/* install handlers */
+
signal_set(&ev_sigterm, SIGTERM, handle_sigterm, NULL);
- signal_add(&ev_sigterm, NULL);
+ err = signal_add(&ev_sigterm, NULL);
+ if (err < 0)
+ fatal_perror("signal_add");
+
signal_set(&ev_sigint, SIGINT, handle_sigint, NULL);
- signal_add(&ev_sigint, NULL);
+ err = signal_add(&ev_sigint, NULL);
+ if (err < 0)
+ fatal_perror("signal_add");
+
signal_set(&ev_sigusr1, SIGUSR1, handle_sigusr1, NULL);
- signal_add(&ev_sigusr1, NULL);
+ err = signal_add(&ev_sigusr1, NULL);
+ if (err < 0)
+ fatal_perror("signal_add");
+
signal_set(&ev_sigusr2, SIGUSR2, handle_sigusr2, NULL);
- signal_add(&ev_sigusr2, NULL);
+ err = signal_add(&ev_sigusr2, NULL);
+ if (err < 0)
+ fatal_perror("signal_add");
+
signal_set(&ev_sighup, SIGHUP, handle_sighup, NULL);
- signal_add(&ev_sighup, NULL);
+ err = signal_add(&ev_sighup, NULL);
+ if (err < 0)
+ fatal_perror("signal_add");
}
/*
static void main_loop_once(void)
{
+ int err;
+
reset_time_cache();
- event_loop(EVLOOP_ONCE);
- per_loop_maint();
- reuse_just_freed_objects();
+
+ err = event_loop(EVLOOP_ONCE);
+ if (err < 0) {
+ if (errno != EINTR)
+ log_warning("event_loop failed: %s", strerror(errno));
+ } else {
+ per_loop_maint();
+ reuse_just_freed_objects();
+ }
}
/* boot everything */
PgSocket *server;
int total;
const char *unix_dir = cf_unix_socket_dir;
+ bool res;
/* allow only small number of connection attempts at a time */
if (!statlist_empty(&pool->new_server_list)) {
unix_dir = server->pool->db->unix_socket_dir;
/* start connecting */
- sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
- cf_server_connect_timeout / USEC);
+ res = sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
+ cf_server_connect_timeout / USEC);
+ if (!res)
+ log_noise("failed to launch new connection");
}
/* new client connection attempt */
const struct sockaddr_in *addr,
bool is_unix)
{
+ bool res;
PgSocket *client;
/* get free PgSocket */
client = obj_alloc(client_cache);
- if (!client)
+ if (!client) {
+ safe_close(sock);
return NULL;
+ }
client->connect_time = client->request_time = get_cached_time();
client->query_start = 0;
if (cf_log_connections)
slog_debug(client, "got connection attempt");
- sbuf_accept(&client->sbuf, sock, is_unix);
+ res = sbuf_accept(&client->sbuf, sock, is_unix);
+ if (!res)
+ return NULL;
return client;
PktBuf tmp;
client = accept_client(fd, NULL, addr->is_unix);
+ if (client == NULL)
+ return false;
client->suspended = 1;
if (!set_pool(client, dbname, username))
PgPool *pool;
PgSocket *server;
PktBuf tmp;
+ bool res;
if (db->forced_user)
user = db->forced_user;
if (!server)
return false;
- sbuf_accept(&server->sbuf, fd, addr->is_unix);
+ res = sbuf_accept(&server->sbuf, fd, addr->is_unix);
+ if (!res)
+ return false;
+
server->suspended = 1;
server->pool = pool;
server->auth_user = user;
if (buf->send_pos < buf->write_pos) {
event_set(buf->ev, fd, EV_WRITE, pktbuf_send_func, buf);
- event_add(buf->ev, NULL);
+ res = event_add(buf->ev, NULL);
+ if (res < 0) {
+ log_error("pktbuf_send_func: %s", strerror(errno));
+ pktbuf_free(buf);
+ }
} else
pktbuf_free(buf);
}
/* got new connection, associate it with client struct */
static void pool_accept(int sock, short flags, void *is_unix)
{
- int fd;
+ int fd, err;
PgSocket *client;
union {
struct sockaddr_in in;
* wait a bit, hope that admin resolves somehow
*/
log_error("accept() failed: %s", strerror(errno));
- suspend_pooler();
evtimer_set(&ev_err, err_wait_func, NULL);
- evtimer_add(&ev_err, &err_timeout);
+ err = evtimer_add(&ev_err, &err_timeout);
+ if (err < 0)
+ log_error("pool_accept: evtimer_add: %s", strerror(errno));
+ else
+ suspend_pooler();
return;
}
client = accept_client(fd, &addr.in, false);
}
- if (!client) {
- log_debug("P: no mem for client struct");
- safe_close(fd);
- }
+ if (!client)
+ log_warning("P: no mem for client struct");
}
bool use_pooler_socket(int sock, bool is_unix)
{
suspended = 1;
- if (fd_net)
- event_del(&ev_net);
- if (fd_unix)
- event_del(&ev_unix);
+ if (fd_net) {
+ if (event_del(&ev_net) < 0)
+ /* fixme */
+ fatal_perror("event_del(ev_net)");
+ }
+ if (fd_unix) {
+ if (event_del(&ev_unix) < 0)
+ /* fixme */
+ fatal_perror("event_del(ev_unix)");
+ }
}
void resume_pooler(void)
if (fd_unix) {
event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
- event_add(&ev_unix, NULL);
+ if (event_add(&ev_unix, NULL) < 0)
+ /* fixme: less serious approach? */
+ fatal_perror("event_add(ev_unix)");
}
if (fd_net) {
event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
- event_add(&ev_net, NULL);
+ if (event_add(&ev_net, NULL) < 0)
+ /* fixme: less serious approach? */
+ fatal_perror("event_add(ev_net)");
}
}
} while (0)
/* declare static stuff */
-static void sbuf_queue_send(SBuf *sbuf);
-static bool sbuf_send_pending(SBuf *sbuf);
-static bool sbuf_process_pending(SBuf *sbuf);
+static bool sbuf_queue_send(SBuf *sbuf) _MUSTCHECK;
+static bool sbuf_send_pending(SBuf *sbuf) _MUSTCHECK;
+static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK;
static void sbuf_connect_cb(int sock, short flags, void *arg);
static void sbuf_recv_cb(int sock, short flags, void *arg);
static void sbuf_send_cb(int sock, short flags, void *arg);
static void sbuf_try_resync(SBuf *sbuf);
-static void sbuf_wait_for_data(SBuf *sbuf);
+static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
-static bool sbuf_call_proto(SBuf *sbuf, int event);
+static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
+static bool sbuf_actual_recv(SBuf *sbuf, int len) _MUSTCHECK;
+static bool sbuf_after_connect_check(SBuf *sbuf) _MUSTCHECK;
/*********************************
* Public functions
}
/* got new socket from accept() */
-void sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
+bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
{
+ bool res;
+
Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
AssertSanity(sbuf);
sbuf->is_unix = is_unix;
if (!cf_reboot) {
- sbuf_wait_for_data(sbuf);
-
+ res = sbuf_wait_for_data(sbuf);
+ if (!res) {
+ sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+ return false;
+ }
/* socket should already have some data (linux only) */
- if (cf_tcp_defer_accept && !is_unix)
+ if (cf_tcp_defer_accept && !is_unix) {
sbuf_main_loop(sbuf, DO_RECV);
+ if (!sbuf->sock)
+ return false;
+ }
}
+ return true;
}
/* need to connect() to get a socket */
-void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
+bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
{
int res, sock, domain;
struct sockaddr_in sa_in;
* common stuff
*/
sock = socket(domain, SOCK_STREAM, 0);
- if (sock < 0) {
- /* probably fd limit, try to survive */
- log_error("sbuf_connect: socket() failed: %s", strerror(errno));
- sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
- return;
- }
+ if (sock < 0)
+ /* probably fd limit */
+ goto failed;
tune_socket(sock, addr->is_unix);
if (res == 0) {
/* unix socket gives connection immidiately */
sbuf_connect_cb(sock, EV_WRITE, sbuf);
+ return true;
} else if (res < 0 && errno == EINPROGRESS) {
/* tcp socket needs waiting */
event_set(&sbuf->ev, sock, EV_WRITE, sbuf_connect_cb, sbuf);
- event_add(&sbuf->ev, &timeout);
- } else {
- /* failure */
- log_warning("connect failed: res=%d/err=%s", res, strerror(errno));
- close(sock);
- sbuf->sock = 0;
- sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+ res = event_add(&sbuf->ev, &timeout);
+ if (res >= 0)
+ return true;
}
+
+failed:
+ log_warning("sbuf_connect failed: %s", strerror(errno));
+
+ if (sock >= 0)
+ safe_close(sock);
+ sbuf->sock = 0;
+ sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+ return false;
}
/* don't wait for data on this socket */
AssertActive(sbuf);
Assert(sbuf->wait_send == 0);
- event_del(&sbuf->ev);
+ if (event_del(&sbuf->ev) < 0)
+ /* fixme */
+ fatal_perror("event_del");
}
/* resume from pause, start waiting for data */
void sbuf_continue(SBuf *sbuf)
{
bool do_recv = DO_RECV;
+ bool res;
AssertActive(sbuf);
- sbuf_wait_for_data(sbuf);
+ res = sbuf_wait_for_data(sbuf);
+ if (!res) {
+ /* drop if problems */
+ sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+ return;
+ }
/*
* It's tempting to try to avoid the recv() but that would
*
* The callback will be called with arg given to sbuf_init.
*/
-void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
+bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
{
+ int err;
+
AssertActive(sbuf);
event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
user_cb, sbuf->proto_cb_arg);
- event_add(&sbuf->ev, NULL);
+
+ err = event_add(&sbuf->ev, NULL);
+ if (err < 0) {
+ log_warning("sbuf_continue_with_callback: %s", strerror(errno));
+ return false;
+ }
+ return true;
}
/* socket cleanup & close */
{
/* keep handler & arg values */
if (sbuf->sock > 0) {
- event_del(&sbuf->ev);
+ if (event_del(&sbuf->ev) < 0)
+ fatal_perror("event_del");
safe_close(sbuf->sock);
}
sbuf->dst = NULL;
}
/* let's wait for new data */
-static void sbuf_wait_for_data(SBuf *sbuf)
+static bool sbuf_wait_for_data(SBuf *sbuf)
{
+ int err;
+
event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
- event_add(&sbuf->ev, NULL);
+ err = event_add(&sbuf->ev, NULL);
+ if (err < 0) {
+ log_warning("sbuf_wait_for_data: event_add: %s", strerror(errno));
+ return false;
+ }
+ return true;
}
/* libevent EV_WRITE: called when dest socket is writable again */
static void sbuf_send_cb(int sock, short flags, void *arg)
{
SBuf *sbuf = arg;
+ bool res;
/* sbuf was closed before in this loop */
if (!sbuf->sock)
/* prepare normal situation for sbuf_main_loop */
sbuf->wait_send = 0;
- sbuf_wait_for_data(sbuf);
-
- /* here we should certainly skip recv() */
- sbuf_main_loop(sbuf, SKIP_RECV);
+ res = sbuf_wait_for_data(sbuf);
+ if (res) {
+ /* here we should certainly skip recv() */
+ sbuf_main_loop(sbuf, SKIP_RECV);
+ } else
+ /* drop if problems */
+ sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
}
/* socket is full, wait until it's writable again */
-static void sbuf_queue_send(SBuf *sbuf)
+static bool sbuf_queue_send(SBuf *sbuf)
{
+ int err;
AssertActive(sbuf);
sbuf->wait_send = 1;
- event_del(&sbuf->ev);
+ err = event_del(&sbuf->ev);
+ if (err < 0) {
+ log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
+ return false;
+ }
event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
- event_add(&sbuf->ev, NULL);
+ err = event_add(&sbuf->ev, NULL);
+ if (err < 0) {
+ log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
+ return false;
+ }
+ return true;
}
/*
pos = sbuf->buf + sbuf->send_pos;
res = safe_send(sbuf->dst->sock, pos, avail, 0);
if (res < 0) {
- if (errno == EAGAIN)
- sbuf_queue_send(sbuf);
- else
+ if (errno == EAGAIN) {
+ if (!sbuf_queue_send(sbuf))
+ /* drop if queue failed */
+ sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
+ } else
sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
return false;
}
SBuf *sbuf = arg;
if (flags & EV_WRITE) {
- if (sbuf_after_connect_check(sbuf)) {
- if (sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
- sbuf_wait_for_data(sbuf);
- } else
- sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
- } else {
- /* EV_TIMEOUT */
- sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+ if (!sbuf_after_connect_check(sbuf))
+ goto failed;
+ if (!sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
+ return;
+ if (!sbuf_wait_for_data(sbuf))
+ goto failed;
+ return;
}
+failed:
+ sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
}
/* send some data to listening socket */
PgPool *pool;
struct timeval period = { cf_stats_period, 0 };
PgStats old_total, cur_total, avg;
+ int err;
reset_stats(&old_total);
reset_stats(&cur_total);
stat_add(&cur_total, &pool->stats);
stat_add(&old_total, &pool->older_stats);
}
- evtimer_add(&ev_stats, &period);
-
calc_average(&avg, &cur_total, &old_total);
/* send totals to logfile */
log_info("Stats: %llu req/s, in %llu b/s, "
"out %llu b/s, query %llu us",
avg.request_count, avg.client_bytes,
avg.server_bytes, avg.query_time);
+
+ err = evtimer_add(&ev_stats, &period);
+ if (err < 0) {
+ /* fixme */
+ log_warning("Cannot re-add stats timer, stats non-functional now: %s",
+ strerror(errno));
+ }
}
void stats_setup(void)
{
struct timeval period = { cf_stats_period, 0 };
+ int err;
new_stamp = get_cached_time();
old_stamp = new_stamp - USEC;
/* launch maintenance */
evtimer_set(&ev_stats, refresh_stats, NULL);
- evtimer_add(&ev_stats, &period);
+ err = evtimer_add(&ev_stats, &period);
+ if (err < 0)
+ fatal_perror("evtimer_add");
}
if (res) {
/* use own callback */
sbuf_pause(&bouncer->sbuf);
- sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+ res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+ if (!res)
+ fatal("takeover_login: sbuf_continue_with_callback failed");
} else {
- disconnect_server(bouncer, false, "failed to send command");
+ fatal("takeover_login: failed to send command");
}
return res;
}