const char *client_end, const char *std_string, const char *datestyle, const char *timezone)
_MUSTCHECK;
-void pause_client(PgSocket *client);
void activate_client(PgSocket *client);
void change_client_state(PgSocket *client, SocketState newstate);
void resume_pooler(void);
void suspend_pooler(void);
void get_pooler_fds(int *p_net, int *p_unix);
+void per_loop_pooler_maint(void);
bool sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix) _MUSTCHECK;
bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec) _MUSTCHECK;
-void sbuf_pause(SBuf *sbuf);
+bool sbuf_pause(SBuf *sbuf) _MUSTCHECK;
void sbuf_continue(SBuf *sbuf);
-void sbuf_close(SBuf *sbuf);
+bool sbuf_close(SBuf *sbuf) _MUSTCHECK;
/* proto_fn can use those functions to order behaviour */
void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount);
&& sbuf->pkt_remain == 0;
}
+static inline bool sbuf_is_closed(SBuf *sbuf)
+{
+ return sbuf->sock == 0;
+}
+
bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
const uint8_t *new_hdr, int new_len) _MUSTCHECK;
bool suspend_socket(PgSocket *sk, bool force_suspend)
{
- bool done = true;
- if (!sk->suspended) {
- if (sbuf_is_empty(&sk->sbuf)) {
- sbuf_pause(&sk->sbuf);
+ if (sk->suspended)
+ return true;
+
+ if (sbuf_is_empty(&sk->sbuf)) {
+ if (sbuf_pause(&sk->sbuf))
sk->suspended = 1;
- } else
- done = false;
- }
- if (!done && force_suspend) {
- if (is_server_socket(sk))
- disconnect_server(sk, true, "suspend_timeout");
- else
- disconnect_client(sk, true, "suspend_timeout");
}
- return done;
+
+ if (sk->suspended || !force_suspend)
+ return sk->suspended;
+
+ if (is_server_socket(sk))
+ disconnect_server(sk, true, "suspend_timeout");
+ else
+ disconnect_client(sk, true, "suspend_timeout");
+ return true;
}
/* suspend all sockets in socket list */
per_loop_maint();
reuse_just_freed_objects();
rescue_timers();
+ per_loop_pooler_maint();
}
static void takeover_part1(void)
}
/* deactivate socket and put into wait queue */
-void pause_client(PgSocket *client)
+static void pause_client(PgSocket *client)
{
Assert(client->state == CL_ACTIVE);
slog_debug(client, "pause_client");
change_client_state(client, CL_WAITING);
- sbuf_pause(&client->sbuf);
+ if (!sbuf_pause(&client->sbuf))
+ disconnect_client(client, true, "pause failed");
}
/* wake client from wait */
server->link = client;
change_server_state(server, SV_ACTIVE);
if (varchange) {
- sbuf_pause(&client->sbuf);
- res = false; /* don't process client data yet */
server->setting_vars = 1;
server->ready = 0;
+ res = false; /* don't process client data yet */
+ if (!sbuf_pause(&client->sbuf))
+ disconnect_client(client, true, "pause failed");
} else
res = true;
} else {
pause_client(client);
- Assert(client->state == CL_WAITING);
res = false;
}
return res;
/* ignore result */
notify = false;
}
- sbuf_close(&server->sbuf);
change_server_state(server, SV_JUSTFREE);
+ if (!sbuf_close(&server->sbuf))
+ log_noise("sbuf_close failed, retry later");
}
/* drop client connection */
send_pooler_error(client, false, reason);
}
- sbuf_close(&client->sbuf);
-
change_client_state(client, CL_JUSTFREE);
+ if (!sbuf_close(&client->sbuf))
+ log_noise("sbuf_close failed, retry later");
}
/* the pool needs new connection, if possible */
return;
}
- /* drop the connection silently */
- sbuf_close(&req->sbuf);
+ /* drop the connection, if fails, retry later in justfree list */
+ if (!sbuf_close(&req->sbuf))
+ log_noise("sbuf_close failed, retry later");
/* remember server key */
server = main_client->link;
{
List *tmp, *item;
PgSocket *sk;
+ bool close_works = true;
/*
- * Obviously, if state would be set to *_FREE,
- * they could be moved in one go.
+ * event_del() may fail because of ENOMEM for event handlers
+ * that need only changes sent to kernel on each loop.
+ *
+ * Keep open sbufs in justfree lists until successful.
*/
+
statlist_for_each_safe(item, &justfree_client_list, tmp) {
sk = container_of(item, PgSocket, head);
- change_client_state(sk, CL_FREE);
+ if (sbuf_is_closed(&sk->sbuf))
+ change_client_state(sk, CL_FREE);
+ else if (close_works)
+ close_works = sbuf_close(&sk->sbuf);
}
statlist_for_each_safe(item, &justfree_server_list, tmp) {
sk = container_of(item, PgSocket, head);
- change_server_state(sk, SV_FREE);
+ if (sbuf_is_closed(&sk->sbuf))
+ change_server_state(sk, SV_FREE);
+ else if (close_works)
+ close_works = sbuf_close(&sk->sbuf);
}
}
-
static int fd_net = 0;
static int fd_unix = 0;
+
static struct event ev_net;
static struct event ev_unix;
-static int suspended = 0;
+
+/* if sockets are registered in libevent */
+static bool reg_net = false;
+static bool reg_unix = false;
+
+/* should listening sockets be active or suspended? */
+static bool pooler_active = false;
/* on accept() failure sleep 5 seconds */
static struct event ev_err;
static struct timeval err_timeout = {5, 0};
+/* atexit() cleanup func */
static void cleanup_unix_socket(void)
{
char fn[256];
- if (!cf_unix_socket_dir || suspended)
+
+ /* avoid cleanup if exit() while suspended */
+ if (!reg_unix)
return;
+
snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
cf_unix_socket_dir, cf_listen_port);
unlink(fn);
static void err_wait_func(int sock, short flags, void *arg)
{
- resume_pooler();
+ if (cf_pause_mode != P_SUSPEND)
+ resume_pooler();
}
/* got new connection, associate it with client struct */
void suspend_pooler(void)
{
- suspended = 1;
+ pooler_active = false;
- if (fd_net) {
- if (event_del(&ev_net) < 0)
- /* fixme */
- fatal_perror("event_del(ev_net)");
+ if (fd_net && reg_net) {
+ if (event_del(&ev_net) < 0) {
+ log_warning("suspend_pooler, event_del: %s", strerror(errno));
+ return;
+ }
+ reg_net = false;
}
- if (fd_unix) {
- if (event_del(&ev_unix) < 0)
- /* fixme */
- fatal_perror("event_del(ev_unix)");
+ if (fd_unix && reg_unix) {
+ if (event_del(&ev_unix) < 0) {
+ log_warning("suspend_pooler, event_del: %s", strerror(errno));
+ return;
+ }
+ reg_unix = false;
}
}
void resume_pooler(void)
{
- suspended = 0;
+ pooler_active = true;
- if (fd_unix) {
+ if (fd_unix && !reg_unix) {
event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
- if (event_add(&ev_unix, NULL) < 0)
- /* fixme: less serious approach? */
- fatal_perror("event_add(ev_unix)");
+ if (event_add(&ev_unix, NULL) < 0) {
+ log_warning("event_add failed: %s", strerror(errno));
+ return;
+ }
+ reg_unix = true;
}
- if (fd_net) {
+ if (fd_net && !reg_net) {
event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
- if (event_add(&ev_net, NULL) < 0)
- /* fixme: less serious approach? */
- fatal_perror("event_add(ev_net)");
+ if (event_add(&ev_net, NULL) < 0) {
+ log_warning("event_add failed: %s", strerror(errno));
+ }
+ reg_net = true;
}
}
resume_pooler();
}
+/* retry previously failed suspend_pooler() / resume_pooler() */
+void per_loop_pooler_maint(void)
+{
+ if (pooler_active) {
+ if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
+ resume_pooler();
+ } else {
+ if ((fd_unix && reg_unix) || (fd_net && reg_net))
+ suspend_pooler();
+ }
+}
+
}
/* don't wait for data on this socket */
-void sbuf_pause(SBuf *sbuf)
+bool sbuf_pause(SBuf *sbuf)
{
AssertActive(sbuf);
Assert(sbuf->wait_send == 0);
- if (event_del(&sbuf->ev) < 0)
- /* fixme */
- fatal_perror("event_del");
+ if (event_del(&sbuf->ev) < 0) {
+ log_warning("event_del: %s", strerror(errno));
+ return false;
+ }
+ return true;
}
/* resume from pause, start waiting for data */
}
/* socket cleanup & close */
-void sbuf_close(SBuf *sbuf)
+bool sbuf_close(SBuf *sbuf)
{
/* keep handler & arg values */
if (sbuf->sock > 0) {
- if (event_del(&sbuf->ev) < 0)
- fatal_perror("event_del");
+ if (event_del(&sbuf->ev) < 0) {
+ log_warning("event_del: %s", strerror(errno));
+ return false;
+ }
safe_close(sbuf->sock);
}
sbuf->dst = NULL;
sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0;
sbuf->pkt_action = sbuf->wait_send = 0;
sbuf->send_pos = sbuf->send_remain = 0;
+ return true;
}
/* proto_fn tells to send some bytes to socket */
int err;
AssertActive(sbuf);
- sbuf->wait_send = 1;
+ /* if false is returned, the socket will be closed later */
+
+ /* stop waiting for read events */
err = event_del(&sbuf->ev);
if (err < 0) {
log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
return false;
}
+
+ /* instead wait for EV_WRITE on destination socket */
event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
err = event_add(&sbuf->ev, NULL);
if (err < 0) {
log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
return false;
}
+
+ sbuf->wait_send = 1;
return true;
}
Assert(old_bouncer == NULL);
/* unregister bouncer from libevent */
- sbuf_pause(&bouncer->sbuf);
+ if (!sbuf_pause(&bouncer->sbuf))
+ fatal_perror("sbuf_pause failed");
old_bouncer = bouncer;
cf_reboot = 0;
log_info("disko over, going background");
SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;");
if (res) {
/* use own callback */
- sbuf_pause(&bouncer->sbuf);
+ if (!sbuf_pause(&bouncer->sbuf))
+ fatal("sbuf_pause failed");
res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
if (!res)
fatal("takeover_login: sbuf_continue_with_callback failed");