* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-bool admin_handle_client(PgSocket *client, PktHdr *pkt);
-bool admin_pre_login(PgSocket *client);
+bool admin_handle_client(PgSocket *client, PktHdr *pkt) _MUSTCHECK;
+bool admin_pre_login(PgSocket *client) _MUSTCHECK;
void admin_setup(void);
-bool admin_error(PgSocket *console, const char *fmt, ...);
+bool admin_error(PgSocket *console, const char *fmt, ...) _PRINTF(2, 3) /* _MUSTCHECK */;
void admin_pause_done(void);
-void admin_flush(PgSocket *admin, PktBuf *buf, const char *desc);
-bool admin_ready(PgSocket *admin, const char *desc);
+bool admin_flush(PgSocket *admin, PktBuf *buf, const char *desc) /* _MUSTCHECK */;
+bool admin_ready(PgSocket *admin, const char *desc) _MUSTCHECK;
void admin_handle_cancel(PgSocket *client);
extern ConfElem bouncer_params[];
-static inline PgSocket *
+static inline PgSocket * _MUSTCHECK
pop_socket(StatList *slist)
{
List *item = statlist_pop(slist);
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg);
-bool set_pool(PgSocket *client, const char *dbname, const char *username);
+bool client_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg) _MUSTCHECK;
+bool set_pool(PgSocket *client, const char *dbname, const char *username) _MUSTCHECK;
void config_postprocess(void);
void resume_all(void);
void per_loop_maint(void);
-bool suspend_socket(PgSocket *sk);
+bool suspend_socket(PgSocket *sk) _MUSTCHECK;
typedef void (*conf_data_callback_fn)(char *key, char *value);
typedef const char * (*conf_var_get_fn)(ConfElem *elem);
-typedef bool (*conf_var_set_fn)(ConfElem *elem, const char *value, PgSocket *console);
+typedef bool (*conf_var_set_fn)(ConfElem *elem, const char *value, PgSocket *console) _MUSTCHECK;
typedef struct {
conf_var_get_fn fn_get;
bool cf_set_str(ConfElem *elem, const char *value, PgSocket *console);
const char *conf_to_text(ConfElem *elem);
-bool set_config_param(ConfElem *elem_list, const char *key, const char *val, bool reload, PgSocket *console);
+bool set_config_param(ConfElem *elem_list, const char *key, const char *val, bool reload, PgSocket *console) /* _MUSTCHECK */;
/* connstring parsing */
void parse_database(char *name, char *connstr);
/* user file parsing */
-bool load_auth_file(const char *fn);
-bool loader_users_check(void);
+bool load_auth_file(const char *fn) /* _MUSTCHECK */;
+bool loader_users_check(void) /* _MUSTCHECK */;
PgDatabase *find_database(const char *name);
PgUser *find_user(const char *name);
PgPool *get_pool(PgDatabase *, PgUser *);
-bool find_server(PgSocket *client);
-bool release_server(PgSocket *server);
-bool finish_client_login(PgSocket *client);
+bool find_server(PgSocket *client) _MUSTCHECK;
+bool release_server(PgSocket *server) /* _MUSTCHECK */;
+bool finish_client_login(PgSocket *client) _MUSTCHECK;
-PgSocket * accept_client(int sock, const struct sockaddr_in *addr, bool is_unix);
+PgSocket * accept_client(int sock, const struct sockaddr_in *addr, bool is_unix) _MUSTCHECK;
void disconnect_server(PgSocket *server, bool notify, const char *reason);
void disconnect_client(PgSocket *client, bool notify, const char *reason);
-PgDatabase * add_database(const char *name);
-PgUser * add_user(const char *name, const char *passwd);
-PgUser * force_user(PgDatabase *db, const char *username, const char *passwd);
+PgDatabase * add_database(const char *name) _MUSTCHECK;
+PgUser * add_user(const char *name, const char *passwd) _MUSTCHECK;
+PgUser * force_user(PgDatabase *db, const char *username, const char *passwd) _MUSTCHECK;
void accept_cancel_request(PgSocket *req);
void forward_cancel_request(PgSocket *server);
void launch_new_connection(PgPool *pool);
bool use_client_socket(int fd, PgAddr *addr, const char *dbname, const char *username, uint64_t ckey, int oldfd, int linkfd,
- const char *client_end, const char *std_string, const char *datestyle, const char *timezone);
+ const char *client_end, const char *std_string, const char *datestyle, const char *timezone)
+ _MUSTCHECK;
bool use_server_socket(int fd, PgAddr *addr, const char *dbname, const char *username, uint64_t ckey, int oldfd, int linkfd,
- const char *client_end, const char *std_string, const char *datestyle, const char *timezone);
+ const char *client_end, const char *std_string, const char *datestyle, const char *timezone)
+ _MUSTCHECK;
void pause_client(PgSocket *client);
void activate_client(PgSocket *client);
/*
* pktbuf creation
*/
-PktBuf *pktbuf_dynamic(int start_len);
+PktBuf *pktbuf_dynamic(int start_len) _MUSTCHECK;
void pktbuf_static(PktBuf *buf, uint8_t *data, int len);
/*
* sending
*/
-bool pktbuf_send_immidiate(PktBuf *buf, PgSocket *sk);
-void pktbuf_send_queued(PktBuf *buf, PgSocket *sk);
+bool pktbuf_send_immidiate(PktBuf *buf, PgSocket *sk) _MUSTCHECK;
+bool pktbuf_send_queued(PktBuf *buf, PgSocket *sk) _MUSTCHECK;
/*
* low-level ops
*/
void pooler_setup(void);
-bool use_pooler_socket(int fd, bool is_unix);
+bool use_pooler_socket(int fd, bool is_unix) _MUSTCHECK;
void resume_pooler(void);
void suspend_pooler(void);
void get_pooler_fds(int *p_net, int *p_unix);
MBuf data;
};
-bool get_header(MBuf *data, PktHdr *pkt);
+bool get_header(MBuf *data, PktHdr *pkt) _MUSTCHECK;
-bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg);
+bool send_pooler_error(PgSocket *client, bool send_ready, const char *msg) /*_MUSTCHECK*/;
void log_server_error(const char *note, PktHdr *pkt);
void add_welcome_parameter(PgPool *pool, const char *key, const char *val);
void finish_welcome_msg(PgSocket *server);
-bool welcome_client(PgSocket *client);
+bool welcome_client(PgSocket *client) _MUSTCHECK;
-bool answer_authreq(PgSocket *server, PktHdr *pkt);
+bool answer_authreq(PgSocket *server, PktHdr *pkt) _MUSTCHECK;
-bool send_startup_packet(PgSocket *server);
+bool send_startup_packet(PgSocket *server) _MUSTCHECK;
-int scan_text_result(MBuf *pkt, const char *tupdesc, ...);
+int scan_text_result(MBuf *pkt, const char *tupdesc, ...) _MUSTCHECK;
/* is packet completely in our buffer */
static inline bool incomplete_pkt(const PktHdr *pkt)
void sbuf_prepare_skip(SBuf *sbuf, int amount);
void sbuf_prepare_fetch(SBuf *sbuf, int amount);
-bool sbuf_answer(SBuf *sbuf, const void *buf, int len);
+bool sbuf_answer(SBuf *sbuf, const void *buf, int len) _MUSTCHECK;
void sbuf_continue_with_callback(SBuf *sbuf, sbuf_libevent_cb cb);
}
bool sbuf_rewrite_header(SBuf *sbuf, int old_len,
- const uint8_t *new_hdr, int new_len);
+ const uint8_t *new_hdr, int new_len) _MUSTCHECK;
+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
-bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg);
+bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *pkt, void *arg) _MUSTCHECK;
obj_init_fn init_func, obj_clean_fn clean_func);
void objcache_destroy(ObjectCache *cache);
-void * obj_alloc(ObjectCache *cache);
+void * obj_alloc(ObjectCache *cache) _MALLOC _MUSTCHECK;
void obj_free(ObjectCache *cache, void *obj);
int objcache_total_count(ObjectCache *cache);
void stats_setup(void);
-bool admin_database_stats(PgSocket *client, StatList *pool_list);
+bool admin_database_stats(PgSocket *client, StatList *pool_list) _MUSTCHECK;
#include <crypt.h>
#endif
+/* gcc has hew positive aspects too */
#ifdef __GNUC__
+
#define unlikely(x) __builtin_expect(!!(x), 0)
#define likely(x) __builtin_expect(!!(x), 1)
+
+#define _MUSTCHECK __attribute__((warn_unused_result))
+#define _DEPRECATED __attribute__((deprecated))
+#define _PRINTF(fmtpos, argpos) __attribute__((format(printf, fmtpos, argpos)))
+#define _MALLOC __attribute__((malloc))
+
+#if 0
+#define _USED __attribute__((used))
+#define _UNUSED __attribute__((unused))
+#define _NONNULL(args...) __attribute__((nonnull(args)))
+#define _REGPARM(num) __attribute__((regparm(num)))
+#define _FASTCALL __attribute__((fastcall))
+#endif
+
#else
+
#define unlikely(x) x
#define likely(x) x
+
+#define _MUSTCHECK
+#define _DEPRECATED
+#define _PRINTF(x,y)
+#define _MALLOC
+
#endif
+/* cant use assert() as we want to log too */
#ifdef CASSERT
#define Assert(e) \
do { \
*/
void takeover_init(void);
-bool takeover_login(PgSocket *bouncer);
+bool takeover_login(PgSocket *bouncer) _MUSTCHECK;
/*
* load file into malloced buffer
*/
-char *load_file(const char *fn);
+char *load_file(const char *fn) _MUSTCHECK;
-void *zmalloc(size_t len);
+void *zmalloc(size_t len) _MUSTCHECK _MALLOC;
/*
* generic logging
*/
-void log_level(const char *level, const char *s, ...);
+void log_level(const char *level, const char *s, ...) _PRINTF(2, 3);
#define log_error(args...) log_level("ERROR", ## args)
#define log_warning(args...) log_level("WARNING", ## args)
#define log_info(args...) log_level("LOG", ## args)
/*
* logging about specific socket
*/
-void slog_level(const char *level, const PgSocket *sock, const char *fmt, ...);
+void slog_level(const char *level, const PgSocket *sock, const char *fmt, ...) _PRINTF(3, 4);
#define slog_error(sk, args...) slog_level("ERROR", sk, ## args)
#define slog_warning(sk, args...) slog_level("WARNING", sk, ## args)
#define slog_info(sk, args...) slog_level("LOG", sk, ## args)
/*
* log and exit
*/
-void _fatal(const char *file, int line, const char *func, bool do_exit, const char *s, ...);
-void _fatal_perror(const char *file, int line, const char *func, const char *s, ...);
+void _fatal(const char *file, int line, const char *func, bool do_exit, const char *s, ...) _PRINTF(5, 6);
+void _fatal_perror(const char *file, int line, const char *func, const char *s, ...) _PRINTF(4, 5);
#define fatal(args...) \
_fatal(__FILE__, __LINE__, __FUNCTION__, true, ## args)
#define fatal_noexit(args...) \
/*
* non-interruptible operations
*/
-int safe_read(int fd, void *buf, int len);
-int safe_write(int fd, const void *buf, int len);
-int safe_recv(int fd, void *buf, int len, int flags);
-int safe_send(int fd, const void *buf, int len, int flags);
+int safe_read(int fd, void *buf, int len) _MUSTCHECK;
+int safe_write(int fd, const void *buf, int len) _MUSTCHECK;
+int safe_recv(int fd, void *buf, int len, int flags) _MUSTCHECK;
+int safe_send(int fd, const void *buf, int len, int flags) _MUSTCHECK;
int safe_close(int fd);
-int safe_recvmsg(int fd, struct msghdr *msg, int flags);
-int safe_sendmsg(int fd, const struct msghdr *msg, int flags);
+int safe_recvmsg(int fd, struct msghdr *msg, int flags) _MUSTCHECK;
+int safe_sendmsg(int fd, const struct msghdr *msg, int flags) _MUSTCHECK;
/*
* password tools
#define MD5_PASSWD_LEN 35
#define isMD5(passwd) (memcmp(passwd, "md5", 3) == 0 \
&& strlen(passwd) == MD5_PASSWD_LEN)
-bool pg_md5_encrypt(const char *part1, const char *part2, size_t p2len, char *dest);
-bool get_random_bytes(uint8_t *dest, int len);
+void pg_md5_encrypt(const char *part1, const char *part2, size_t p2len, char *dest);
+void get_random_bytes(uint8_t *dest, int len);
void socket_set_nonblocking(int fd, int val);
void tune_socket(int sock, bool is_unix);
char std_strings[VAR_STDSTR_LEN];
};
-bool varcache_set(VarCache *cache, const char *key, const char *value);
-bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p);
+bool varcache_set(VarCache *cache, const char *key, const char *value) /* _MUSTCHECK */;
+bool varcache_apply(PgSocket *server, PgSocket *client, bool *changes_p) _MUSTCHECK;
void varcache_fill_unset(VarCache *src, PgSocket *dst);
void varcache_clean(VarCache *cache);
void varcache_add_params(PktBuf *pkt, VarCache *vars);
return cnt;
}
-void admin_flush(PgSocket *admin, PktBuf *buf, const char *desc)
+bool admin_flush(PgSocket *admin, PktBuf *buf, const char *desc)
{
pktbuf_write_CommandComplete(buf, desc);
pktbuf_write_ReadyForQuery(buf);
- pktbuf_send_queued(buf, admin);
+ return pktbuf_send_queued(buf, admin);
}
bool admin_ready(PgSocket *admin, const char *desc)
db->addr.port = cf_listen_port;
db->addr.is_unix = 1;
db->pool_size = 2;
- force_user(db, "pgbouncer", "");
+ if (!force_user(db, "pgbouncer", ""))
+ fatal("no mem on startup - cannot alloc pgbouncer user");
/* fake pool, tag the it as special */
pool = get_pool(db, db->forced_user);
{
List *item, *tmp;
PgSocket *admin;
+ bool res;
statlist_for_each_safe(item, &admin_pool->active_client_list, tmp) {
admin = container_of(item, PgSocket, head);
if (!admin->wait_for_response)
continue;
+ res = false;
switch (cf_pause_mode) {
case P_PAUSE:
- admin_ready(admin, "PAUSE");
+ res = admin_ready(admin, "PAUSE");
break;
case P_SUSPEND:
- admin_ready(admin, "SUSPEND");
+ res = admin_ready(admin, "SUSPEND");
break;
default:
if (count_paused_databases() > 0)
- admin_ready(admin, "PAUSE");
+ res = admin_ready(admin, "PAUSE");
else
+ /* fixme */
fatal("admin_pause_done: bad state");
}
- admin->wait_for_response = 0;
+
+ if (!res)
+ disconnect_client(admin, false, "dead admin");
+ else
+ admin->wait_for_response = 0;
}
if (statlist_empty(&admin_pool->active_client_list)
- && cf_pause_mode == P_SUSPEND)
+ && cf_pause_mode == P_SUSPEND)
{
log_info("Admin disappeared when suspended, doing RESUME");
cf_pause_mode = P_NONE;
List *item;
PgDatabase *db;
- log_noise("event: %d, SBuf: %d, PgSocket: %d, Full PgSocket: %d, buf ofs: %d",
- sizeof(struct event), sizeof(SBuf), sizeof(PgSocket), PG_SOCKET_SIZE,
- offsetof(SBuf, buf));
+ log_noise("event: %lu, SBuf: %lu, PgSocket: %lu, Full PgSocket: %lu, buf ofs: %lu",
+ sizeof(struct event), sizeof(SBuf), sizeof(PgSocket),
+ PG_SOCKET_SIZE, offsetof(SBuf, buf));
/* load limits */
err = getrlimit(RLIMIT_NOFILE, &lim);
Assert(server->link == NULL);
/* notify server and close connection */
- if (send_term && notify)
- sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term));
+ if (send_term && notify) {
+ if (!sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term)))
+ /* ignore result */
+ notify = false;
+ }
sbuf_close(&server->sbuf);
change_server_state(server, SV_JUSTFREE);
PgSocket *server = client->link;
/* ->ready may be set before all is sent */
if (server->ready && sbuf_is_empty(&server->sbuf)) {
+ /* retval does not matter here */
release_server(server);
} else {
server->link = NULL;
/* in suspend, don't let send query */
if (cf_pause_mode == P_SUSPEND)
- suspend_socket(client);
+ if (!suspend_socket(client))
+ disconnect_client(client, true, "extra data on login not allowed when suspending");
+ /* fixme: disallow extra data on login completely? is it handled? */
return true;
}
pktbuf_free(buf);
}
-void pktbuf_send_queued(PktBuf *buf, PgSocket *sk)
+bool pktbuf_send_queued(PktBuf *buf, PgSocket *sk)
{
int fd = sbuf_socket(&sk->sbuf);
Assert(!buf->fixed_buf);
if (buf->failed) {
- send_pooler_error(sk, true, "result prepare failed");
pktbuf_free(buf);
+ return send_pooler_error(sk, true, "result prepare failed");
} else {
buf->sending = 1;
pktbuf_send_func(fd, EV_WRITE, buf);
+ return true;
}
}
static void pool_accept(int sock, short flags, void *is_unix)
{
int fd;
+ PgSocket *client;
union {
struct sockaddr_in in;
struct sockaddr_un un;
else
log_warning("unix peer uid failed: %s", strerror(errno));
}
- accept_client(fd, NULL, true);
+ client = accept_client(fd, NULL, true);
} else {
log_debug("P: new tcp client");
- accept_client(fd, &addr.in, false);
+ client = accept_client(fd, &addr.in, false);
+ }
+
+ if (!client) {
+ log_debug("P: no mem for client struct");
+ safe_close(fd);
}
}
/* got all params */
finish_welcome_msg(server);
+ /* need to notify sbuf if server was closed */
res = release_server(server);
/* let the takeover process handle it */
}
/* parse msg for fd and info */
-static bool takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
+static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
{
int fd;
char *task, *s_addr, *user, *db;
int got;
uint64_t ckey;
PgAddr addr;
+ bool res = false;
memset(&addr, 0, sizeof(addr));
/* decide what to do with it */
if (strcmp(task, "client") == 0)
- use_client_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
+ res = use_client_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
client_enc, std_string, datestyle, timezone);
else if (strcmp(task, "server") == 0)
- use_server_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
+ res = use_server_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
client_enc, std_string, datestyle, timezone);
else if (strcmp(task, "pooler") == 0)
- use_pooler_socket(fd, addr.is_unix);
+ res = use_pooler_socket(fd, addr.is_unix);
else
fatal("unknown task: %s", task);
- return true;
+ if (!res)
+ fatal("socket takeover failed - no mem?");
}
static void takeover_create_link(PgPool *pool, PgSocket *client)
static void write_logfile(const char *buf, int len)
{
+ int res;
if (!log_fd) {
int fd = open(cf_logfile, O_CREAT | O_APPEND | O_WRONLY, 0644);
if (fd < 0)
return;
log_fd = fd;
}
- safe_write(log_fd, buf, len);
+ res = safe_write(log_fd, buf, len);
+ if (res < len)
+ /* nothing to do here */
+ len = 0;
}
static void _log_write(const char *pfx, const char *msg)
if (res < 0) {
log_warning("safe_sendmsg(%d, msg[%d,%d], %d) = %s", fd,
- msg->msg_iov[0].iov_len,
- msg->msg_controllen,
+ (int)msg->msg_iov[0].iov_len,
+ (int)msg->msg_controllen,
flags, strerror(errno));
/* with ancillary data on blocking socket OSX returns
}
buf = malloc(st.st_size + 1);
- if (!buf)
+ if (!buf) {
+ log_error("%s: no mem", fn);
goto load_error;
+ }
if ((fd = open(fn, O_RDONLY)) < 0) {
log_error("%s: %s", fn, strerror(errno));
*dst = 0;
}
-bool pg_md5_encrypt(const char *part1,
+void pg_md5_encrypt(const char *part1,
const char *part2, size_t part2len,
char *dest)
{
memcpy(dest, "md5", 3);
hash2hex(hash, dest + 3);
-
- return true;
}
/* wrapped for getting random bytes */
-bool get_random_bytes(uint8_t *dest, int len)
+void get_random_bytes(uint8_t *dest, int len)
{
int i;
for (i = 0; i < len; i++)
dest[i] = random() & 255;
- return len;
}
/*