replace select() with poll() to allow large fd values
authorMarko Kreen <markokr@gmail.com>
Thu, 20 Dec 2007 22:13:45 +0000 (22:13 +0000)
committerMarko Kreen <markokr@gmail.com>
Thu, 20 Dec 2007 22:13:45 +0000 (22:13 +0000)
Makefile
src/execute.c
src/poll_compat.h [new file with mode: 0644]

index 5f155856766de186eb0ce9927b44be24d1de7e64..be362ad86d69537a49f4fd15ff22335766f03709 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -23,7 +23,7 @@ TARNAME = plproxy-$(PLPROXY_VERSION)
 DIST_DIRS = src sql expected config doc debian
 DIST_FILES = Makefile src/plproxy.h src/rowstamp.h src/scanner.l src/parser.y \
                         $(foreach t,$(REGRESS),sql/$(t).sql expected/$(t).out) \
-                        config/simple.config.sql \
+                        config/simple.config.sql src/poll_compat.h \
                         doc/Makefile doc/config.txt doc/overview.txt \
                         doc/syntax.txt doc/todo.txt doc/tutorial.txt \
                         AUTHORS COPYRIGHT README plproxy.sql.in NEWS \
@@ -50,6 +50,7 @@ src/scanner.c: src/scanner.l
 
 # dependencies
 $(OBJS): src/plproxy.h src/rowstamp.h
+src/execute.o: src/poll_compat.h
 
 # utility rules
 
index 02361c1dee65f35e06becb3df3a00cf39c472d19..0e75c6aefe158519b7500b3d19c9572d068d9882 100644 (file)
@@ -28,7 +28,8 @@
 #include "plproxy.h"
 
 #include <sys/time.h>
-#include <sys/select.h>
+
+#include "poll_compat.h"
 
 /* some error happened */
 static void
@@ -198,22 +199,12 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
 {
        time_t          t;
        int                     res;
-       fd_set          fds;
-       int                     fd;
-       struct timeval notimeout = {0, 0};
+       struct pollfd   pfd;
        ProxyConfig *cf = &func->cur_cluster->config;
 
        if (PQstatus(conn->db) != CONNECTION_OK)
                return false;
 
-       fd = PQsocket(conn->db);
-       if (fd < 0)
-       {
-               elog(WARNING, "libpq socket lost: fd=%d, err=%s",
-                        fd, PQerrorMessage(conn->db));
-               return false;
-       }
-
        /* check if too old */
        if (cf->connection_lifetime > 0)
        {
@@ -232,16 +223,11 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
         * are events pending.  If there are drop the connection.
         */
 intr_loop:
-       /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
-       if (fd >= FD_SETSIZE)
-               plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd=%d",
-                                         FD_SETSIZE, fd);
-#endif
-
-       FD_ZERO(&fds);
-       FD_SET(fd, &fds);
-       res = select(fd + 1, &fds, NULL, NULL, &notimeout);
+       pfd.fd = PQsocket(conn->db);
+       pfd.events = POLLIN;
+       pfd.revents = 0;
+
+       res = poll(&pfd, 1, 0);
        if (res > 0)
        {
                elog(WARNING, "PL/Proxy: detected unstable connection");
@@ -409,18 +395,32 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
 static int
 poll_conns(ProxyFunction *func, ProxyCluster *cluster)
 {
+       static struct pollfd *pfd_cache = NULL;
+       static int pfd_allocated = 0;
+
        int                     i,
                                res,
-                               fd,
-                               fd_max = 0;
-       fd_set          read_fds;
-       fd_set          write_fds;
-       fd_set     *cur_set = NULL;
-       struct timeval timeout;
+                               fd;
        ProxyConnection *conn;
+       struct pollfd *pf;
+       int numfds = 0;
+       int ev = 0;
 
-       FD_ZERO(&read_fds);
-       FD_ZERO(&write_fds);
+       if (pfd_allocated < cluster->conn_count)
+       {
+               struct pollfd *tmp;
+               int num = cluster->conn_count;
+               if (num < 64)
+                       num = 64;
+               if (pfd_cache == NULL)
+                       tmp = malloc(num * sizeof(struct pollfd));
+               else
+                       tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
+               if (!tmp)
+                       elog(ERROR, "no mem for pollfd cache");
+               pfd_cache = tmp;
+               pfd_allocated = num;
+       }
 
        for (i = 0; i < cluster->conn_count; i++)
        {
@@ -437,54 +437,40 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
                                continue;
                        case C_CONNECT_READ:
                        case C_QUERY_READ:
-                               cur_set = &read_fds;
+                               ev = POLLIN;
                                break;
                        case C_CONNECT_WRITE:
                        case C_QUERY_WRITE:
-                               cur_set = &write_fds;
+                               ev = POLLOUT;
                                break;
                }
 
                /* add fd to proper set */
-               fd = PQsocket(conn->db);
-               if (fd > fd_max)
-                       fd_max = fd;
-               else if (fd < 0)
-                       plproxy_error(func, "libpq has lost its socket: fd=%d err=%s",
-                                                 fd, PQerrorMessage(conn->db));
-               FD_SET(fd, cur_set);
+               pf = pfd_cache + numfds++;
+               pf->fd = PQsocket(conn->db);
+               pf->events = ev;
+               pf->revents = 0;
        }
 
-       /* set timeout */
-       timeout.tv_sec = 1;
-       timeout.tv_usec = 0;
-
-       /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
-       if (fd_max > FD_SETSIZE)
-               plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd_max=%d",
-                                         FD_SETSIZE, fd_max);
-#endif
-
        /* wait for events */
-       res = select(fd_max + 1, &read_fds, &write_fds, NULL, &timeout);
+       res = poll(pfd_cache, numfds, 1000);
        if (res == 0)
                return 0;
        if (res < 0)
        {
                if (errno == EINTR)
                        return 0;
-               plproxy_error(func, "select() failed: %s", strerror(errno));
+               plproxy_error(func, "poll() failed: %s", strerror(errno));
        }
 
        /* now recheck the conns */
+       pf = pfd_cache;
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
                if (!conn->run_on)
                        continue;
 
-               /* look in which set it should be */
                switch (conn->state)
                {
                        case C_DONE:
@@ -493,21 +479,22 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
                                continue;
                        case C_CONNECT_READ:
                        case C_QUERY_READ:
-                               cur_set = &read_fds;
-                               break;
                        case C_CONNECT_WRITE:
                        case C_QUERY_WRITE:
-                               cur_set = &write_fds;
                                break;
                }
 
-               /* check */
+               /*
+                * they should be in same order as called,
+                */
                fd = PQsocket(conn->db);
-               if (fd < 0)
-                       elog(WARNING, "libpq dropped socket: fd=%d err=%s",
-                                fd, PQerrorMessage(conn->db));
-               else if (FD_ISSET(fd, cur_set))
+               if (pf->fd != fd)
+                       elog(WARNING, "fd order from poll() is messed up?");
+
+               if (pf->revents)
                        handle_conn(func, conn);
+
+               pf++;
        }
        return 1;
 }
diff --git a/src/poll_compat.h b/src/poll_compat.h
new file mode 100644 (file)
index 0000000..65cbe92
--- /dev/null
@@ -0,0 +1,80 @@
+
+#ifdef HAVE_POLL_Hx
+
+#include <poll.h>
+
+#else
+#ifdef HAVE_SYS_POLL_Hx
+
+#include <sys/poll.h>
+
+#else
+
+#include <sys/select.h>
+
+#define POLLIN (1 << 0)
+#define POLLOUT        (1 << 1)
+#define POLLHUP        (1 << 2)
+
+#define pollfd compat_pollfd
+#define poll compat_poll
+
+struct pollfd {
+       int fd;
+       short events;
+       short revents;
+};
+
+static int poll(struct pollfd *fds, unsigned nfds, int timeout_ms)
+{
+       struct pollfd *pf;
+       int i, fd_max = 0;
+       int res;
+       fd_set r_set, w_set;
+
+       FD_ZERO(&r_set);
+       FD_ZERO(&w_set);
+       for (i = 0; i < nfds; i++) {
+               pf = fds + i;
+               if (pf->fd < 0 || pf->fd >= FD_SETSIZE)
+               {
+                       /* give different errno for FD_SETSIZE to allow detect it */
+                       errno = (pf->fd < 0) ? EBADF : EFAULT;
+                       return -1;
+               }
+               if (pf->events & POLLIN)
+                       FD_SET(pf->fd, &r_set);
+               if (pf->events & POLLOUT)
+                       FD_SET(pf->fd, &w_set);
+               if (pf->fd > fd_max)
+                       fd_max = pf->fd;
+       }
+
+       if (timeout_ms >= 0)
+       {
+               struct timeval tv;
+               tv.tv_sec = timeout_ms / 1000;
+               tv.tv_usec = timeout_ms % 1000;
+               res = select(fd_max + 1, &r_set, &w_set, NULL, &tv);
+       } else
+               res = select(fd_max + 1, &r_set, &w_set, NULL, NULL);
+
+       if (res <= 0)
+               return res;
+
+       for (i = 0; i < nfds; i++) {
+               pf = fds + i;
+               pf->revents = 0;
+               if ((pf->events & POLLIN) && FD_ISSET(pf->fd, &r_set))
+                       pf->revents |= POLLIN;
+               if ((pf->events & POLLOUT) && FD_ISSET(pf->fd, &w_set))
+                       pf->revents |= POLLOUT;
+       }
+
+       return res;
+}
+
+#endif /* !HAVE_SYS_POLL_H */
+#endif /* !HAVE_POLL_H */
+
+