make failure from event_del() non-fatal
authorMarko Kreen <markokr@gmail.com>
Fri, 11 Jan 2008 13:22:49 +0000 (13:22 +0000)
committerMarko Kreen <markokr@gmail.com>
Fri, 11 Jan 2008 13:22:49 +0000 (13:22 +0000)
- pgsocket: keep open sbufs in justfree lists, retry close later
- pooler: keep track socket states, retry in per-loop main function

include/objects.h
include/pooler.h
include/sbuf.h
src/janitor.c
src/main.c
src/objects.c
src/pooler.c
src/sbuf.c
src/takeover.c

index 2bf349658e127de419a7573dcce2df33261b09f0..3e241cbd594d4b79eb85565ccb4b26e68fb04063 100644 (file)
@@ -51,7 +51,6 @@ bool use_server_socket(int fd, PgAddr *addr, const char *dbname, const char *use
                       const char *client_end, const char *std_string, const char *datestyle, const char *timezone)
                        _MUSTCHECK;
 
-void pause_client(PgSocket *client);
 void activate_client(PgSocket *client);
 
 void change_client_state(PgSocket *client, SocketState newstate);
index b43e47b75dad412bb9c80e6dea6fc6e40c98231b..5775d224334f9b0560e85f3edebdcffe8852f0ee 100644 (file)
@@ -21,4 +21,5 @@ bool use_pooler_socket(int fd, bool is_unix) _MUSTCHECK;
 void resume_pooler(void);
 void suspend_pooler(void);
 void get_pooler_fds(int *p_net, int *p_unix);
+void per_loop_pooler_maint(void);
 
index b21e5bab164df747d71ae057385e83012943234a..871a9005d9fc4e04ca8033526b4e9075b33cf4fc 100644 (file)
@@ -96,9 +96,9 @@ void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg);
 bool sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix)  _MUSTCHECK;
 bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)  _MUSTCHECK;
 
-void sbuf_pause(SBuf *sbuf);
+bool sbuf_pause(SBuf *sbuf) _MUSTCHECK;
 void sbuf_continue(SBuf *sbuf);
-void sbuf_close(SBuf *sbuf);
+bool sbuf_close(SBuf *sbuf) _MUSTCHECK;
 
 /* proto_fn can use those functions to order behaviour */
 void sbuf_prepare_send(SBuf *sbuf, SBuf *dst, int amount);
@@ -119,6 +119,11 @@ static inline bool sbuf_is_empty(SBuf *sbuf)
                && sbuf->pkt_remain == 0;
 }
 
+static inline bool sbuf_is_closed(SBuf *sbuf)
+{
+       return sbuf->sock == 0;
+}
+
 bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
                         const uint8_t *new_hdr, int new_len)  _MUSTCHECK;
 
index f6685b4c54b6a9ec14086084f92d2c5f772597f7..4c4d100d8a39709115cc46a4e565a2647f427498 100644 (file)
@@ -51,21 +51,22 @@ static void close_client_list(StatList *sk_list, const char *reason)
 
 bool suspend_socket(PgSocket *sk, bool force_suspend)
 {
-       bool done = true;
-       if (!sk->suspended) {
-               if (sbuf_is_empty(&sk->sbuf)) {
-                       sbuf_pause(&sk->sbuf);
+       if (sk->suspended)
+               return true;
+
+       if (sbuf_is_empty(&sk->sbuf)) {
+               if (sbuf_pause(&sk->sbuf))
                        sk->suspended = 1;
-               } else
-                       done = false;
-       }
-       if (!done && force_suspend) {
-               if (is_server_socket(sk))
-                       disconnect_server(sk, true, "suspend_timeout");
-               else
-                       disconnect_client(sk, true, "suspend_timeout");
        }
-       return done;
+
+       if (sk->suspended || !force_suspend)
+               return sk->suspended;
+
+       if (is_server_socket(sk))
+               disconnect_server(sk, true, "suspend_timeout");
+       else
+               disconnect_client(sk, true, "suspend_timeout");
+       return true;
 }
 
 /* suspend all sockets in socket list */
index 1239033511157da3806d4308b27056d83494da58..9d1c3f75666a1057930970f52edcefeac701082e 100644 (file)
@@ -502,6 +502,7 @@ static void main_loop_once(void)
        per_loop_maint();
        reuse_just_freed_objects();
        rescue_timers();
+       per_loop_pooler_maint();
 }
 
 static void takeover_part1(void)
index f29c376615473f9ba80bd9cb9afa766079c613f9..e1361101217ad19e06e72836572fc41a41239e39 100644 (file)
@@ -425,13 +425,14 @@ PgPool *get_pool(PgDatabase *db, PgUser *user)
 }
 
 /* deactivate socket and put into wait queue */
-void pause_client(PgSocket *client)
+static void pause_client(PgSocket *client)
 {
        Assert(client->state == CL_ACTIVE);
 
        slog_debug(client, "pause_client");
        change_client_state(client, CL_WAITING);
-       sbuf_pause(&client->sbuf);
+       if (!sbuf_pause(&client->sbuf))
+               disconnect_client(client, true, "pause failed");
 }
 
 /* wake client from wait */
@@ -485,15 +486,15 @@ bool find_server(PgSocket *client)
                server->link = client;
                change_server_state(server, SV_ACTIVE);
                if (varchange) {
-                       sbuf_pause(&client->sbuf);
-                       res = false; /* don't process client data yet */
                        server->setting_vars = 1;
                        server->ready = 0;
+                       res = false; /* don't process client data yet */
+                       if (!sbuf_pause(&client->sbuf))
+                               disconnect_client(client, true, "pause failed");
                } else
                        res = true;
        } else {
                pause_client(client);
-               Assert(client->state == CL_WAITING);
                res = false;
        }
        return res;
@@ -628,9 +629,10 @@ void disconnect_server(PgSocket *server, bool notify, const char *reason)
                        /* ignore result */
                        notify = false;
        }
-       sbuf_close(&server->sbuf);
 
        change_server_state(server, SV_JUSTFREE);
+       if (!sbuf_close(&server->sbuf))
+               log_noise("sbuf_close failed, retry later");
 }
 
 /* drop client connection */
@@ -673,9 +675,9 @@ void disconnect_client(PgSocket *client, bool notify, const char *reason)
                send_pooler_error(client, false, reason);
        }
 
-       sbuf_close(&client->sbuf);
-
        change_client_state(client, CL_JUSTFREE);
+       if (!sbuf_close(&client->sbuf))
+               log_noise("sbuf_close failed, retry later");
 }
 
 /* the pool needs new connection, if possible */
@@ -846,8 +848,9 @@ found:
                return;
        }
 
-       /* drop the connection silently */
-       sbuf_close(&req->sbuf);
+       /* drop the connection, if fails, retry later in justfree list */
+       if (!sbuf_close(&req->sbuf))
+               log_noise("sbuf_close failed, retry later");
 
        /* remember server key */
        server = main_client->link;
@@ -1014,19 +1017,28 @@ void reuse_just_freed_objects(void)
 {
        List *tmp, *item;
        PgSocket *sk;
+       bool close_works = true;
 
        /*
-        * Obviously, if state would be set to *_FREE,
-        * they could be moved in one go.
+        * event_del() may fail because of ENOMEM for event handlers
+        * that need only changes sent to kernel on each loop.
+        *
+        * Keep open sbufs in justfree lists until successful.
         */
+
        statlist_for_each_safe(item, &justfree_client_list, tmp) {
                sk = container_of(item, PgSocket, head);
-               change_client_state(sk, CL_FREE);
+               if (sbuf_is_closed(&sk->sbuf))
+                       change_client_state(sk, CL_FREE);
+               else if (close_works)
+                       close_works = sbuf_close(&sk->sbuf);
        }
        statlist_for_each_safe(item, &justfree_server_list, tmp) {
                sk = container_of(item, PgSocket, head);
-               change_server_state(sk, SV_FREE);
+               if (sbuf_is_closed(&sk->sbuf))
+                       change_server_state(sk, SV_FREE);
+               else if (close_works)
+                       close_works = sbuf_close(&sk->sbuf);
        }
 }
 
-
index fcbc6596b607820373a710aa9b054958d7c03c26..c37033d76aad5f7c7a596bd50106a37ba067f2f7 100644 (file)
 
 static int fd_net = 0;
 static int fd_unix = 0;
+
 static struct event ev_net;
 static struct event ev_unix;
-static int suspended = 0;
+
+/* if sockets are registered in libevent */
+static bool reg_net = false;
+static bool reg_unix = false;
+
+/* should listening sockets be active or suspended? */
+static bool pooler_active = false;
 
 /* on accept() failure sleep 5 seconds */
 static struct event ev_err;
 static struct timeval err_timeout = {5, 0};
 
+/* atexit() cleanup func */
 static void cleanup_unix_socket(void)
 {
        char fn[256];
-       if (!cf_unix_socket_dir || suspended)
+
+       /* avoid cleanup if exit() while suspended */
+       if (!reg_unix)
                return;
+
        snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
                        cf_unix_socket_dir, cf_listen_port);
        unlink(fn);
@@ -166,7 +177,8 @@ static int create_net_socket(const char *listen_addr, int listen_port)
 
 static void err_wait_func(int sock, short flags, void *arg)
 {
-       resume_pooler();
+       if (cf_pause_mode != P_SUSPEND)
+               resume_pooler();
 }
 
 /* got new connection, associate it with client struct */
@@ -244,36 +256,43 @@ bool use_pooler_socket(int sock, bool is_unix)
 
 void suspend_pooler(void)
 {
-       suspended = 1;
+       pooler_active = false;
 
-       if (fd_net) {
-               if (event_del(&ev_net) < 0)
-                       /* fixme */
-                       fatal_perror("event_del(ev_net)");
+       if (fd_net && reg_net) {
+               if (event_del(&ev_net) < 0) {
+                       log_warning("suspend_pooler, event_del: %s", strerror(errno));
+                       return;
+               }
+               reg_net = false;
        }
-       if (fd_unix) {
-               if (event_del(&ev_unix) < 0)
-                       /* fixme */
-                       fatal_perror("event_del(ev_unix)");
+       if (fd_unix && reg_unix) {
+               if (event_del(&ev_unix) < 0) {
+                       log_warning("suspend_pooler, event_del: %s", strerror(errno));
+                       return;
+               }
+               reg_unix = false;
        }
 }
 
 void resume_pooler(void)
 {
-       suspended = 0;
+       pooler_active = true;
 
-       if (fd_unix) {
+       if (fd_unix && !reg_unix) {
                event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
-               if (event_add(&ev_unix, NULL) < 0)
-                       /* fixme: less serious approach? */
-                       fatal_perror("event_add(ev_unix)");
+               if (event_add(&ev_unix, NULL) < 0) {
+                       log_warning("event_add failed: %s", strerror(errno));
+                       return;
+               }
+               reg_unix = true;
        }
 
-       if (fd_net) {
+       if (fd_net && !reg_net) {
                event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
-               if (event_add(&ev_net, NULL) < 0)
-                       /* fixme: less serious approach? */
-                       fatal_perror("event_add(ev_net)");
+               if (event_add(&ev_net, NULL) < 0) {
+                       log_warning("event_add failed: %s", strerror(errno));
+               }
+               reg_net = true;
        }
 }
 
@@ -292,3 +311,15 @@ void pooler_setup(void)
        resume_pooler();
 }
 
+/* retry previously failed suspend_pooler() / resume_pooler() */
+void per_loop_pooler_maint(void)
+{
+       if (pooler_active) {
+               if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
+                       resume_pooler();
+       } else {
+               if ((fd_unix && reg_unix) || (fd_net && reg_net))
+                       suspend_pooler();
+       }
+}
+
index b7a598ea58d23c4c2b95b4cc4a7ea4b6dcac4e4f..35c72b993ee3477ef481c507543894ed22700b17 100644 (file)
@@ -177,14 +177,16 @@ failed:
 }
 
 /* don't wait for data on this socket */
-void sbuf_pause(SBuf *sbuf)
+bool sbuf_pause(SBuf *sbuf)
 {
        AssertActive(sbuf);
        Assert(sbuf->wait_send == 0);
 
-       if (event_del(&sbuf->ev) < 0)
-               /* fixme */
-               fatal_perror("event_del");
+       if (event_del(&sbuf->ev) < 0) {
+               log_warning("event_del: %s", strerror(errno));
+               return false;
+       }
+       return true;
 }
 
 /* resume from pause, start waiting for data */
@@ -239,12 +241,14 @@ bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
 }
 
 /* socket cleanup & close */
-void sbuf_close(SBuf *sbuf)
+bool sbuf_close(SBuf *sbuf)
 {
        /* keep handler & arg values */
        if (sbuf->sock > 0) {
-               if (event_del(&sbuf->ev) < 0)
-                       fatal_perror("event_del");
+               if (event_del(&sbuf->ev) < 0) {
+                       log_warning("event_del: %s", strerror(errno));
+                       return false;
+               }
                safe_close(sbuf->sock);
        }
        sbuf->dst = NULL;
@@ -252,6 +256,7 @@ void sbuf_close(SBuf *sbuf)
        sbuf->pkt_pos = sbuf->pkt_remain = sbuf->recv_pos = 0;
        sbuf->pkt_action = sbuf->wait_send = 0;
        sbuf->send_pos = sbuf->send_remain = 0;
+       return true;
 }
 
 /* proto_fn tells to send some bytes to socket */
@@ -376,18 +381,24 @@ static bool sbuf_queue_send(SBuf *sbuf)
        int err;
        AssertActive(sbuf);
 
-       sbuf->wait_send = 1;
+       /* if false is returned, the socket will be closed later */
+
+       /* stop waiting for read events */
        err = event_del(&sbuf->ev);
        if (err < 0) {
                log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
                return false;
        }
+
+       /* instead wait for EV_WRITE on destination socket */
        event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
        err = event_add(&sbuf->ev, NULL);
        if (err < 0) {
                log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
                return false;
        }
+
+       sbuf->wait_send = 1;
        return true;
 }
 
index f11b49b53b8891257ef0e6e4ff2755b8b37953d9..ea7cad04dad588792c3d999c7d900b1d8ea4331f 100644 (file)
@@ -67,7 +67,8 @@ static void takeover_finish_part1(PgSocket *bouncer)
        Assert(old_bouncer == NULL);
 
        /* unregister bouncer from libevent */
-       sbuf_pause(&bouncer->sbuf);
+       if (!sbuf_pause(&bouncer->sbuf))
+               fatal_perror("sbuf_pause failed");
        old_bouncer = bouncer;
        cf_reboot = 0;
        log_info("disko over, going background");
@@ -305,7 +306,8 @@ bool takeover_login(PgSocket *bouncer)
        SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;");
        if (res) {
                /* use own callback */
-               sbuf_pause(&bouncer->sbuf);
+               if (!sbuf_pause(&bouncer->sbuf))
+                       fatal("sbuf_pause failed");
                res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
                if (!res)
                        fatal("takeover_login: sbuf_continue_with_callback failed");