#include "plproxy.h"
#include <sys/time.h>
-#include <sys/select.h>
+
+#include "poll_compat.h"
/* some error happened */
static void
{
time_t t;
int res;
- fd_set fds;
- int fd;
- struct timeval notimeout = {0, 0};
+ struct pollfd pfd;
ProxyConfig *cf = &func->cur_cluster->config;
if (PQstatus(conn->db) != CONNECTION_OK)
return false;
- fd = PQsocket(conn->db);
- if (fd < 0)
- {
- elog(WARNING, "libpq socket lost: fd=%d, err=%s",
- fd, PQerrorMessage(conn->db));
- return false;
- }
-
/* check if too old */
if (cf->connection_lifetime > 0)
{
* are events pending. If there are drop the connection.
*/
intr_loop:
- /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
- if (fd >= FD_SETSIZE)
- plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd=%d",
- FD_SETSIZE, fd);
-#endif
-
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
- res = select(fd + 1, &fds, NULL, NULL, ¬imeout);
+ pfd.fd = PQsocket(conn->db);
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ res = poll(&pfd, 1, 0);
if (res > 0)
{
elog(WARNING, "PL/Proxy: detected unstable connection");
static int
poll_conns(ProxyFunction *func, ProxyCluster *cluster)
{
+ static struct pollfd *pfd_cache = NULL;
+ static int pfd_allocated = 0;
+
int i,
res,
- fd,
- fd_max = 0;
- fd_set read_fds;
- fd_set write_fds;
- fd_set *cur_set = NULL;
- struct timeval timeout;
+ fd;
ProxyConnection *conn;
+ struct pollfd *pf;
+ int numfds = 0;
+ int ev = 0;
- FD_ZERO(&read_fds);
- FD_ZERO(&write_fds);
+ if (pfd_allocated < cluster->conn_count)
+ {
+ struct pollfd *tmp;
+ int num = cluster->conn_count;
+ if (num < 64)
+ num = 64;
+ if (pfd_cache == NULL)
+ tmp = malloc(num * sizeof(struct pollfd));
+ else
+ tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
+ if (!tmp)
+ elog(ERROR, "no mem for pollfd cache");
+ pfd_cache = tmp;
+ pfd_allocated = num;
+ }
for (i = 0; i < cluster->conn_count; i++)
{
continue;
case C_CONNECT_READ:
case C_QUERY_READ:
- cur_set = &read_fds;
+ ev = POLLIN;
break;
case C_CONNECT_WRITE:
case C_QUERY_WRITE:
- cur_set = &write_fds;
+ ev = POLLOUT;
break;
}
/* add fd to proper set */
- fd = PQsocket(conn->db);
- if (fd > fd_max)
- fd_max = fd;
- else if (fd < 0)
- plproxy_error(func, "libpq has lost its socket: fd=%d err=%s",
- fd, PQerrorMessage(conn->db));
- FD_SET(fd, cur_set);
+ pf = pfd_cache + numfds++;
+ pf->fd = PQsocket(conn->db);
+ pf->events = ev;
+ pf->revents = 0;
}
- /* set timeout */
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-
- /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
- if (fd_max > FD_SETSIZE)
- plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd_max=%d",
- FD_SETSIZE, fd_max);
-#endif
-
/* wait for events */
- res = select(fd_max + 1, &read_fds, &write_fds, NULL, &timeout);
+ res = poll(pfd_cache, numfds, 1000);
if (res == 0)
return 0;
if (res < 0)
{
if (errno == EINTR)
return 0;
- plproxy_error(func, "select() failed: %s", strerror(errno));
+ plproxy_error(func, "poll() failed: %s", strerror(errno));
}
/* now recheck the conns */
+ pf = pfd_cache;
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
if (!conn->run_on)
continue;
- /* look in which set it should be */
switch (conn->state)
{
case C_DONE:
continue;
case C_CONNECT_READ:
case C_QUERY_READ:
- cur_set = &read_fds;
- break;
case C_CONNECT_WRITE:
case C_QUERY_WRITE:
- cur_set = &write_fds;
break;
}
- /* check */
+ /*
+ * they should be in same order as called,
+ */
fd = PQsocket(conn->db);
- if (fd < 0)
- elog(WARNING, "libpq dropped socket: fd=%d err=%s",
- fd, PQerrorMessage(conn->db));
- else if (FD_ISSET(fd, cur_set))
+ if (pf->fd != fd)
+ elog(WARNING, "fd order from poll() is messed up?");
+
+ if (pf->revents)
handle_conn(func, conn);
+
+ pf++;
}
return 1;
}
--- /dev/null
+
+#ifdef HAVE_POLL_Hx
+
+#include <poll.h>
+
+#else
+#ifdef HAVE_SYS_POLL_Hx
+
+#include <sys/poll.h>
+
+#else
+
+#include <sys/select.h>
+
+#define POLLIN (1 << 0)
+#define POLLOUT (1 << 1)
+#define POLLHUP (1 << 2)
+
+#define pollfd compat_pollfd
+#define poll compat_poll
+
+struct pollfd {
+ int fd;
+ short events;
+ short revents;
+};
+
+static int poll(struct pollfd *fds, unsigned nfds, int timeout_ms)
+{
+ struct pollfd *pf;
+ int i, fd_max = 0;
+ int res;
+ fd_set r_set, w_set;
+
+ FD_ZERO(&r_set);
+ FD_ZERO(&w_set);
+ for (i = 0; i < nfds; i++) {
+ pf = fds + i;
+ if (pf->fd < 0 || pf->fd >= FD_SETSIZE)
+ {
+ /* give different errno for FD_SETSIZE to allow detect it */
+ errno = (pf->fd < 0) ? EBADF : EFAULT;
+ return -1;
+ }
+ if (pf->events & POLLIN)
+ FD_SET(pf->fd, &r_set);
+ if (pf->events & POLLOUT)
+ FD_SET(pf->fd, &w_set);
+ if (pf->fd > fd_max)
+ fd_max = pf->fd;
+ }
+
+ if (timeout_ms >= 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = timeout_ms / 1000;
+ tv.tv_usec = timeout_ms % 1000;
+ res = select(fd_max + 1, &r_set, &w_set, NULL, &tv);
+ } else
+ res = select(fd_max + 1, &r_set, &w_set, NULL, NULL);
+
+ if (res <= 0)
+ return res;
+
+ for (i = 0; i < nfds; i++) {
+ pf = fds + i;
+ pf->revents = 0;
+ if ((pf->events & POLLIN) && FD_ISSET(pf->fd, &r_set))
+ pf->revents |= POLLIN;
+ if ((pf->events & POLLOUT) && FD_ISSET(pf->fd, &w_set))
+ pf->revents |= POLLOUT;
+ }
+
+ return res;
+}
+
+#endif /* !HAVE_SYS_POLL_H */
+#endif /* !HAVE_POLL_H */
+
+