error checks for all libevent calls
authorMarko Kreen <markokr@gmail.com>
Wed, 2 Jan 2008 15:35:50 +0000 (15:35 +0000)
committerMarko Kreen <markokr@gmail.com>
Wed, 2 Jan 2008 15:35:50 +0000 (15:35 +0000)
try to survive errors delicately, by dropping connection where possible.

Thats not yet possible for event_del() and evtimer_add(), exit immidiately then.

include/sbuf.h
src/janitor.c
src/main.c
src/objects.c
src/pktbuf.c
src/pooler.c
src/sbuf.c
src/stats.c
src/takeover.c

index 68e96bb312fa7225178dc113b9051bb638441042..18a2c743179ee426030b798d1c39f748ec211814 100644 (file)
@@ -91,8 +91,8 @@ struct SBuf {
 #define sbuf_socket(sbuf) ((sbuf)->sock)
 
 void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg);
-void sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix);
-void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec);
+bool sbuf_accept(SBuf *sbuf, int read_sock, bool is_unix)  _MUSTCHECK;
+bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)  _MUSTCHECK;
 
 void sbuf_pause(SBuf *sbuf);
 void sbuf_continue(SBuf *sbuf);
@@ -105,7 +105,7 @@ void sbuf_prepare_fetch(SBuf *sbuf, int amount);
 
 bool sbuf_answer(SBuf *sbuf, const void *buf, int len)  _MUSTCHECK;
 
-void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb);
+bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb)  _MUSTCHECK;
 
 /*
  * Returns true if SBuf is has no data buffered
index 58806298eb1afacd78e315652329bcf5c2ea2bb7..7d8800b12d8a4b462561281d5c59ef68aa33f972 100644 (file)
@@ -456,6 +456,7 @@ static void do_full_maint(int sock, short flags, void *arg)
 {
        List *item;
        PgPool *pool;
+       int err;
 
        /* don't touch anything if takeover is in progress */
        if (cf_reboot)
@@ -480,15 +481,22 @@ static void do_full_maint(int sock, short flags, void *arg)
                loader_users_check();
 
 skip:
-       evtimer_add(&full_maint_ev, &full_maint_period);
+       err = evtimer_add(&full_maint_ev, &full_maint_period);
+       if (err < 0)
+               /* fixme */
+               fatal_perror("evtimer_add");
 }
 
 /* first-time initializtion */
 void janitor_setup(void)
 {
+       int err;
+
        /* launch maintenance */
        evtimer_set(&full_maint_ev, do_full_maint, NULL);
-       evtimer_add(&full_maint_ev, &full_maint_period);
+       err = evtimer_add(&full_maint_ev, &full_maint_period);
+       if (err < 0)
+               fatal_perror("janitor_setup: evtimer_add");
 }
 
 /* as [pgbouncer] section can be loaded after databases,
index b8da2f81ddfff054d6671f3d647e8e4fd7a14ccf..9a3d3e78af41c45b3c4082256f0d0c52490afcec 100644 (file)
@@ -311,16 +311,31 @@ static void signal_setup(void)
                fatal_perror("sigprocmask");
 
        /* install handlers */
+
        signal_set(&ev_sigterm, SIGTERM, handle_sigterm, NULL);
-       signal_add(&ev_sigterm, NULL);
+       err = signal_add(&ev_sigterm, NULL);
+       if (err < 0)
+               fatal_perror("signal_add");
+
        signal_set(&ev_sigint, SIGINT, handle_sigint, NULL);
-       signal_add(&ev_sigint, NULL);
+       err = signal_add(&ev_sigint, NULL);
+       if (err < 0)
+               fatal_perror("signal_add");
+
        signal_set(&ev_sigusr1, SIGUSR1, handle_sigusr1, NULL);
-       signal_add(&ev_sigusr1, NULL);
+       err = signal_add(&ev_sigusr1, NULL);
+       if (err < 0)
+               fatal_perror("signal_add");
+
        signal_set(&ev_sigusr2, SIGUSR2, handle_sigusr2, NULL);
-       signal_add(&ev_sigusr2, NULL);
+       err = signal_add(&ev_sigusr2, NULL);
+       if (err < 0)
+               fatal_perror("signal_add");
+
        signal_set(&ev_sighup, SIGHUP, handle_sighup, NULL);
-       signal_add(&ev_sighup, NULL);
+       err = signal_add(&ev_sighup, NULL);
+       if (err < 0)
+               fatal_perror("signal_add");
 }
 
 /*
@@ -457,10 +472,18 @@ static void daemon_setup(void)
 
 static void main_loop_once(void)
 {
+       int err;
+
        reset_time_cache();
-       event_loop(EVLOOP_ONCE);
-       per_loop_maint();
-       reuse_just_freed_objects();
+
+       err = event_loop(EVLOOP_ONCE);
+       if (err < 0) {
+               if (errno != EINTR)
+                       log_warning("event_loop failed: %s", strerror(errno));
+       } else {
+               per_loop_maint();
+               reuse_just_freed_objects();
+       }
 }
 
 /* boot everything */
index 4c4ed09565b9e45b33010ae50121333bcb8ba1ba..d23cf5cbdd02e238d4010b742c7ced6f1b52a2de 100644 (file)
@@ -682,6 +682,7 @@ void launch_new_connection(PgPool *pool)
        PgSocket *server;
        int total;
        const char *unix_dir = cf_unix_socket_dir;
+       bool res;
 
        /* allow only small number of connection attempts at a time */
        if (!statlist_empty(&pool->new_server_list)) {
@@ -729,8 +730,10 @@ void launch_new_connection(PgPool *pool)
                unix_dir = server->pool->db->unix_socket_dir;
 
        /* start connecting */
-       sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
-                    cf_server_connect_timeout / USEC);
+       res = sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
+                          cf_server_connect_timeout / USEC);
+       if (!res)
+               log_noise("failed to launch new connection");
 }
 
 /* new client connection attempt */
@@ -738,12 +741,15 @@ PgSocket * accept_client(int sock,
                         const struct sockaddr_in *addr,
                         bool is_unix)
 {
+       bool res;
        PgSocket *client;
 
        /* get free PgSocket */
        client = obj_alloc(client_cache);
-       if (!client)
+       if (!client) {
+               safe_close(sock);
                return NULL;
+       }
 
        client->connect_time = client->request_time = get_cached_time();
        client->query_start = 0;
@@ -755,7 +761,9 @@ PgSocket * accept_client(int sock,
 
        if (cf_log_connections)
                slog_debug(client, "got connection attempt");
-       sbuf_accept(&client->sbuf, sock, is_unix);
+       res = sbuf_accept(&client->sbuf, sock, is_unix);
+       if (!res)
+               return NULL;
 
 
        return client;
@@ -880,6 +888,8 @@ bool use_client_socket(int fd, PgAddr *addr,
        PktBuf tmp;
 
        client = accept_client(fd, NULL, addr->is_unix);
+       if (client == NULL)
+               return false;
        client->suspended = 1;
 
        if (!set_pool(client, dbname, username))
@@ -914,6 +924,7 @@ bool use_server_socket(int fd, PgAddr *addr,
        PgPool *pool;
        PgSocket *server;
        PktBuf tmp;
+       bool res;
 
        if (db->forced_user)
                user = db->forced_user;
@@ -928,7 +939,10 @@ bool use_server_socket(int fd, PgAddr *addr,
        if (!server)
                return false;
 
-       sbuf_accept(&server->sbuf, fd, addr->is_unix);
+       res = sbuf_accept(&server->sbuf, fd, addr->is_unix);
+       if (!res)
+               return false;
+
        server->suspended = 1;
        server->pool = pool;
        server->auth_user = user;
index b87d6c572db35dadbbe6e79be444c39750fcef85..88b6d478dcb9651cb3bf427350bc6d3a565f3554 100644 (file)
@@ -105,7 +105,11 @@ static void pktbuf_send_func(int fd, short flags, void *arg)
 
        if (buf->send_pos < buf->write_pos) {
                event_set(buf->ev, fd, EV_WRITE, pktbuf_send_func, buf);
-               event_add(buf->ev, NULL);
+               res = event_add(buf->ev, NULL);
+               if (res < 0) {
+                       log_error("pktbuf_send_func: %s", strerror(errno));
+                       pktbuf_free(buf);
+               }
        } else
                pktbuf_free(buf);
 }
index 34aba0b70b63027ccdfd7aa1605b57cbd76831fb..62eb23de84b7767ca9365e6c681895b1f848df4c 100644 (file)
@@ -172,7 +172,7 @@ static void err_wait_func(int sock, short flags, void *arg)
 /* got new connection, associate it with client struct */
 static void pool_accept(int sock, short flags, void *is_unix)
 {
-       int fd;
+       int fd, err;
        PgSocket *client;
        union {
                struct sockaddr_in in;
@@ -189,9 +189,12 @@ static void pool_accept(int sock, short flags, void *is_unix)
                 * wait a bit, hope that admin resolves somehow
                 */
                log_error("accept() failed: %s", strerror(errno));
-               suspend_pooler();
                evtimer_set(&ev_err, err_wait_func, NULL);
-               evtimer_add(&ev_err, &err_timeout);
+               err = evtimer_add(&ev_err, &err_timeout);
+               if (err < 0)
+                       log_error("pool_accept: evtimer_add: %s", strerror(errno));
+               else
+                       suspend_pooler();
                return;
        }
 
@@ -213,10 +216,8 @@ static void pool_accept(int sock, short flags, void *is_unix)
                client = accept_client(fd, &addr.in, false);
        }
 
-       if (!client) {
-               log_debug("P: no mem for client struct");
-               safe_close(fd);
-       }
+       if (!client)
+               log_warning("P: no mem for client struct");
 }
 
 bool use_pooler_socket(int sock, bool is_unix)
@@ -234,10 +235,16 @@ void suspend_pooler(void)
 {
        suspended = 1;
 
-       if (fd_net)
-               event_del(&ev_net);
-       if (fd_unix)
-               event_del(&ev_unix);
+       if (fd_net) {
+               if (event_del(&ev_net) < 0)
+                       /* fixme */
+                       fatal_perror("event_del(ev_net)");
+       }
+       if (fd_unix) {
+               if (event_del(&ev_unix) < 0)
+                       /* fixme */
+                       fatal_perror("event_del(ev_unix)");
+       }
 }
 
 void resume_pooler(void)
@@ -246,12 +253,16 @@ void resume_pooler(void)
 
        if (fd_unix) {
                event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
-               event_add(&ev_unix, NULL);
+               if (event_add(&ev_unix, NULL) < 0)
+                       /* fixme: less serious approach? */
+                       fatal_perror("event_add(ev_unix)");
        }
 
        if (fd_net) {
                event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
-               event_add(&ev_net, NULL);
+               if (event_add(&ev_net, NULL) < 0)
+                       /* fixme: less serious approach? */
+                       fatal_perror("event_add(ev_net)");
        }
 }
 
index 7d3f5c4238eb41c22555487f9cdc08d1921a1360..677d2a5ddf89d6ab49f0aeac60b3d669396ed693 100644 (file)
 } while (0)
 
 /* declare static stuff */
-static void sbuf_queue_send(SBuf *sbuf);
-static bool sbuf_send_pending(SBuf *sbuf);
-static bool sbuf_process_pending(SBuf *sbuf);
+static bool sbuf_queue_send(SBuf *sbuf) _MUSTCHECK;
+static bool sbuf_send_pending(SBuf *sbuf) _MUSTCHECK;
+static bool sbuf_process_pending(SBuf *sbuf) _MUSTCHECK;
 static void sbuf_connect_cb(int sock, short flags, void *arg);
 static void sbuf_recv_cb(int sock, short flags, void *arg);
 static void sbuf_send_cb(int sock, short flags, void *arg);
 static void sbuf_try_resync(SBuf *sbuf);
-static void sbuf_wait_for_data(SBuf *sbuf);
+static bool sbuf_wait_for_data(SBuf *sbuf) _MUSTCHECK;
 static void sbuf_main_loop(SBuf *sbuf, bool skip_recv);
-static bool sbuf_call_proto(SBuf *sbuf, int event);
+static bool sbuf_call_proto(SBuf *sbuf, int event) /* _MUSTCHECK */;
+static bool sbuf_actual_recv(SBuf *sbuf, int len)  _MUSTCHECK;
+static bool sbuf_after_connect_check(SBuf *sbuf)  _MUSTCHECK;
 
 /*********************************
  * Public functions
@@ -75,8 +77,10 @@ void sbuf_init(SBuf *sbuf, sbuf_cb_t proto_fn, void *arg)
 }
 
 /* got new socket from accept() */
-void sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
+bool sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
 {
+       bool res;
+
        Assert(sbuf->recv_pos == 0 && sbuf->sock == 0);
        AssertSanity(sbuf);
 
@@ -85,16 +89,23 @@ void sbuf_accept(SBuf *sbuf, int sock, bool is_unix)
        sbuf->is_unix = is_unix;
 
        if (!cf_reboot) {
-               sbuf_wait_for_data(sbuf);
-
+               res = sbuf_wait_for_data(sbuf);
+               if (!res) {
+                       sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+                       return false;
+               }
                /* socket should already have some data (linux only) */
-               if (cf_tcp_defer_accept && !is_unix)
+               if (cf_tcp_defer_accept && !is_unix) {
                        sbuf_main_loop(sbuf, DO_RECV);
+                       if (!sbuf->sock)
+                               return false;
+               }
        }
+       return true;
 }
 
 /* need to connect() to get a socket */
-void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
+bool sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int timeout_sec)
 {
        int res, sock, domain;
        struct sockaddr_in sa_in;
@@ -129,12 +140,9 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int time
         * common stuff
         */
        sock = socket(domain, SOCK_STREAM, 0);
-       if (sock < 0) {
-               /* probably fd limit, try to survive */
-               log_error("sbuf_connect: socket() failed: %s", strerror(errno));
-               sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
-               return;
-       }
+       if (sock < 0)
+               /* probably fd limit */
+               goto failed;
 
        tune_socket(sock, addr->is_unix);
 
@@ -150,17 +158,23 @@ void sbuf_connect(SBuf *sbuf, const PgAddr *addr, const char *unix_dir, int time
        if (res == 0) {
                /* unix socket gives connection immidiately */
                sbuf_connect_cb(sock, EV_WRITE, sbuf);
+               return true;
        } else if (res < 0 && errno == EINPROGRESS) {
                /* tcp socket needs waiting */
                event_set(&sbuf->ev, sock, EV_WRITE, sbuf_connect_cb, sbuf);
-               event_add(&sbuf->ev, &timeout);
-       } else {
-               /* failure */
-               log_warning("connect failed: res=%d/err=%s", res, strerror(errno));
-               close(sock);
-               sbuf->sock = 0;
-               sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+               res = event_add(&sbuf->ev, &timeout);
+               if (res >= 0)
+                       return true;
        }
+
+failed:
+       log_warning("sbuf_connect failed: %s", strerror(errno));
+
+       if (sock >= 0)
+               safe_close(sock);
+       sbuf->sock = 0;
+       sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+       return false;
 }
 
 /* don't wait for data on this socket */
@@ -169,16 +183,24 @@ void sbuf_pause(SBuf *sbuf)
        AssertActive(sbuf);
        Assert(sbuf->wait_send == 0);
 
-       event_del(&sbuf->ev);
+       if (event_del(&sbuf->ev) < 0)
+               /* fixme */
+               fatal_perror("event_del");
 }
 
 /* resume from pause, start waiting for data */
 void sbuf_continue(SBuf *sbuf)
 {
        bool do_recv = DO_RECV;
+       bool res;
        AssertActive(sbuf);
 
-       sbuf_wait_for_data(sbuf);
+       res = sbuf_wait_for_data(sbuf);
+       if (!res) {
+               /* drop if problems */
+               sbuf_call_proto(sbuf, SBUF_EV_RECV_FAILED);
+               return;
+       }
 
        /*
         * It's tempting to try to avoid the recv() but that would
@@ -200,13 +222,21 @@ void sbuf_continue(SBuf *sbuf)
  *
  * The callback will be called with arg given to sbuf_init.
  */
-void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
+bool sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb user_cb)
 {
+       int err;
+
        AssertActive(sbuf);
 
        event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST,
                  user_cb, sbuf->proto_cb_arg);
-       event_add(&sbuf->ev, NULL);
+
+       err = event_add(&sbuf->ev, NULL);
+       if (err < 0) {
+               log_warning("sbuf_continue_with_callback: %s", strerror(errno));
+               return false;
+       }
+       return true;
 }
 
 /* socket cleanup & close */
@@ -214,7 +244,8 @@ void sbuf_close(SBuf *sbuf)
 {
        /* keep handler & arg values */
        if (sbuf->sock > 0) {
-               event_del(&sbuf->ev);
+               if (event_del(&sbuf->ev) < 0)
+                       fatal_perror("event_del");
                safe_close(sbuf->sock);
        }
        sbuf->dst = NULL;
@@ -303,16 +334,24 @@ static bool sbuf_call_proto(SBuf *sbuf, int event)
 }
 
 /* let's wait for new data */
-static void sbuf_wait_for_data(SBuf *sbuf)
+static bool sbuf_wait_for_data(SBuf *sbuf)
 {
+       int err;
+
        event_set(&sbuf->ev, sbuf->sock, EV_READ | EV_PERSIST, sbuf_recv_cb, sbuf);
-       event_add(&sbuf->ev, NULL);
+       err = event_add(&sbuf->ev, NULL);
+       if (err < 0) {
+               log_warning("sbuf_wait_for_data: event_add: %s", strerror(errno));
+               return false;
+       }
+       return true;
 }
 
 /* libevent EV_WRITE: called when dest socket is writable again */
 static void sbuf_send_cb(int sock, short flags, void *arg)
 {
        SBuf *sbuf = arg;
+       bool res;
 
        /* sbuf was closed before in this loop */
        if (!sbuf->sock)
@@ -323,21 +362,34 @@ static void sbuf_send_cb(int sock, short flags, void *arg)
 
        /* prepare normal situation for sbuf_main_loop */
        sbuf->wait_send = 0;
-       sbuf_wait_for_data(sbuf);
-
-       /* here we should certainly skip recv() */
-       sbuf_main_loop(sbuf, SKIP_RECV);
+       res = sbuf_wait_for_data(sbuf);
+       if (res) {
+               /* here we should certainly skip recv() */
+               sbuf_main_loop(sbuf, SKIP_RECV);
+       } else
+               /* drop if problems */
+               sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
 }
 
 /* socket is full, wait until it's writable again */
-static void sbuf_queue_send(SBuf *sbuf)
+static bool sbuf_queue_send(SBuf *sbuf)
 {
+       int err;
        AssertActive(sbuf);
 
        sbuf->wait_send = 1;
-       event_del(&sbuf->ev);
+       err = event_del(&sbuf->ev);
+       if (err < 0) {
+               log_warning("sbuf_queue_send: event_del failed: %s", strerror(errno));
+               return false;
+       }
        event_set(&sbuf->ev, sbuf->dst->sock, EV_WRITE, sbuf_send_cb, sbuf);
-       event_add(&sbuf->ev, NULL);
+       err = event_add(&sbuf->ev, NULL);
+       if (err < 0) {
+               log_warning("sbuf_queue_send: event_add failed: %s", strerror(errno));
+               return false;
+       }
+       return true;
 }
 
 /*
@@ -370,9 +422,11 @@ try_more:
        pos = sbuf->buf + sbuf->send_pos;
        res = safe_send(sbuf->dst->sock, pos, avail, 0);
        if (res < 0) {
-               if (errno == EAGAIN)
-                       sbuf_queue_send(sbuf);
-               else
+               if (errno == EAGAIN) {
+                       if (!sbuf_queue_send(sbuf))
+                               /* drop if queue failed */
+                               sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
+               } else
                        sbuf_call_proto(sbuf, SBUF_EV_SEND_FAILED);
                return false;
        }
@@ -615,15 +669,16 @@ static void sbuf_connect_cb(int sock, short flags, void *arg)
        SBuf *sbuf = arg;
 
        if (flags & EV_WRITE) {
-               if (sbuf_after_connect_check(sbuf)) {
-                       if (sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
-                               sbuf_wait_for_data(sbuf);
-               } else
-                       sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
-       } else {
-               /* EV_TIMEOUT */
-               sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
+               if (!sbuf_after_connect_check(sbuf))
+                       goto failed;
+               if (!sbuf_call_proto(sbuf, SBUF_EV_CONNECT_OK))
+                       return;
+               if (!sbuf_wait_for_data(sbuf))
+                       goto failed;
+               return;
        }
+failed:
+       sbuf_call_proto(sbuf, SBUF_EV_CONNECT_FAILED);
 }
 
 /* send some data to listening socket */
index 34827d396b5e0a0fc9739cbe904c653329b290ee..a4555cca6148aa004fdfc19fef2bf6e12ec9639d 100644 (file)
@@ -128,6 +128,7 @@ static void refresh_stats(int s, short flags, void *arg)
        PgPool *pool;
        struct timeval period = { cf_stats_period, 0 };
        PgStats old_total, cur_total, avg;
+       int err;
 
        reset_stats(&old_total);
        reset_stats(&cur_total);
@@ -143,25 +144,33 @@ static void refresh_stats(int s, short flags, void *arg)
                stat_add(&cur_total, &pool->stats);
                stat_add(&old_total, &pool->older_stats);
        }
-       evtimer_add(&ev_stats, &period);
-
        calc_average(&avg, &cur_total, &old_total);
        /* send totals to logfile */
        log_info("Stats: %llu req/s, in %llu b/s, "
                 "out %llu b/s, query %llu us",
                 avg.request_count, avg.client_bytes,
                 avg.server_bytes, avg.query_time);
+
+       err = evtimer_add(&ev_stats, &period);
+       if (err < 0) {
+               /* fixme */
+               log_warning("Cannot re-add stats timer, stats non-functional now: %s",
+                           strerror(errno));
+       }
 }
 
 void stats_setup(void)
 {
        struct timeval period = { cf_stats_period, 0 };
+       int err;
 
        new_stamp = get_cached_time();
        old_stamp = new_stamp - USEC;
 
        /* launch maintenance */
        evtimer_set(&ev_stats, refresh_stats, NULL);
-       evtimer_add(&ev_stats, &period);
+       err = evtimer_add(&ev_stats, &period);
+       if (err < 0)
+               fatal_perror("evtimer_add");
 }
 
index b9250af8d458f18db92398d760f5276e560d691c..94654de1b1b86ca0c79ea0064e89c5161d001ffe 100644 (file)
@@ -275,9 +275,11 @@ bool takeover_login(PgSocket *bouncer)
        if (res) {
                /* use own callback */
                sbuf_pause(&bouncer->sbuf);
-               sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+               res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
+               if (!res)
+                       fatal("takeover_login: sbuf_continue_with_callback failed");
        } else {
-               disconnect_server(bouncer, false, "failed to send command");
+               fatal("takeover_login: failed to send command");
        }
        return res;
 }