From: Tatsuo Ishii Date: Wed, 9 Jul 2025 06:58:16 +0000 (+0900) Subject: Feature: implement NegotiateProtocolVersion message. X-Git-Tag: V4_7_0_BETA1~86 X-Git-Url: http://waps.l3s.uni-hannover.de/gitweb/%20%22mailto:postgres95%40openlink.co.uk%22?a=commitdiff_plain;h=766e738118e15a564e205429564cbfe1915d684e;p=pgpool2.git Feature: implement NegotiateProtocolVersion message. Implementing the message is necessary when frontend requests the protocol version 3.2 (i.e. PostgreSQL 18+ or compatible clients), while backend still only supports 3.0 (i.e. backend is PostgreSQL 17 or before). This commit handles the message so that the message is forwarded from backend to frontend when there's no connection cache exists. If connection cache exists, pgpool sends the message, which has been saved at the time when the connection cache was created, to frontend. Note that the frontend/backend protocol 3.2 changes the BackendKeyData message format, but it's not implemented in this commit yet. This means that still pgpool cannot handle 3.2 protocol. Discussion: https://www.postgresql.org/message-id/20250708.112133.1324153277751075866.ishii%40postgresql.org --- diff --git a/src/auth/pool_auth.c b/src/auth/pool_auth.c index be9f33434..54d646bc3 100644 --- a/src/auth/pool_auth.c +++ b/src/auth/pool_auth.c @@ -79,6 +79,7 @@ static void authenticate_frontend_SCRAM(POOL_CONNECTION * backend, POOL_CONNECTI static void authenticate_frontend_clear_text(POOL_CONNECTION * frontend); static bool get_auth_password(POOL_CONNECTION * backend, POOL_CONNECTION * frontend, int reauth, char **password, PasswordType *passwordType); +static void ProcessNegotiateProtocol(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp); /* * Do authentication. Assuming the only caller is @@ -342,6 +343,7 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) protoMajor = MAIN_CONNECTION(cp)->sp->major; +read_kind: kind = pool_read_kind(cp); if (kind < 0) ereport(ERROR, @@ -365,6 +367,12 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) errdetail("backend response with kind \'E\' when expecting \'R\'"), errhint("This issue can be caused by version mismatch (current version %d)", protoMajor))); } + else if (kind == 'v') + { + /* NegotiateProtocolVersion received */ + ProcessNegotiateProtocol(frontend, cp); + goto read_kind; + } else if (kind != 'R') ereport(ERROR, (errmsg("backend authentication failed"), @@ -597,8 +605,11 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } - send_auth_ok(frontend, protoMajor); - authkind = 0; + if (kind == 'R') + { + send_auth_ok(frontend, protoMajor); + authkind = 0; + } } else @@ -756,7 +767,16 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) CONNECTION_SLOT(cp, i)->key = cp->info[i].key = key; cp->info[i].major = sp->major; - cp->info[i].minor = sp->minor; + + /* + * If NegotiateProtocol message has been received, set the minor + * version. Othewise use the version in the StartupMessage. + */ + if (CONNECTION_SLOT(cp, i)->nplen > 0) + cp->info[i].minor = CONNECTION_SLOT(cp, i)->negotiated_minor; + else + cp->info[i].minor = sp->minor; + strlcpy(cp->info[i].database, sp->database, sizeof(cp->info[i].database)); strlcpy(cp->info[i].user, sp->user, sizeof(cp->info[i].user)); cp->info[i].counter = 1; @@ -779,16 +799,31 @@ pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } /* -* do re-authentication for reused connection. if success return 0 otherwise throws ereport. +* do re-authentication for reused connection. if success return 0 otherwise +* throws ereport. */ int pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) { int protoMajor; int msglen; + POOL_CONNECTION_POOL_SLOT *sp; protoMajor = MAJOR(cp); + /* + * If NegotiateProtocolMsg has been received from backend, forward it to + * frontend. If the frontend dislike it, it will disconnect the + * connection. Otherwise it will silently continue. + */ + sp = CONNECTION_SLOT(cp, MAIN_NODE_ID); + if (protoMajor == PROTO_MAJOR_V3 && sp->nplen > 0) + { + elog(DEBUG1, "negotiateProtocol message is forwarded to frontend at reauth"); + pool_write_and_flush(frontend, sp->negotiateProtocolMsg, + sp->nplen); + } + /* * if hba is enabled we would already have passed authentication */ @@ -822,6 +857,9 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) } } + /* + * Send auth ok + */ pool_write(frontend, "R", 1); if (protoMajor == PROTO_MAJOR_V3) @@ -832,7 +870,10 @@ pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp) msglen = htonl(0); pool_write_and_flush(frontend, &msglen, sizeof(msglen)); + + /* send BackendKeyData */ pool_send_backend_key_data(frontend, MAIN_CONNECTION(cp)->pid, MAIN_CONNECTION(cp)->key, protoMajor); + return 0; } @@ -2074,3 +2115,70 @@ pg_SASL_continue(POOL_CONNECTION * backend, char *payload, int payloadlen, void return 0; } + +/* + * Forward NegotiateProtocol message to frontend. + * + * When this function is called, message kind has been already read. + */ +static void +ProcessNegotiateProtocol(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *cp) +{ + int32 len; + int32 savelen; + int32 protoMajor; + int32 protoMinor; + int32 protov; + bool forwardMsg = false; + int i; + + elog(DEBUG1, "Forwarding NegotiateProtocol message to frontend"); + pool_write(frontend, "v", 1); /* forward message kind */ + savelen = len = pool_read_int(cp); /* message length including self */ + pool_write(frontend, &len, 4); /* forward message length */ + len = ntohl(len) - 4; /* length of rest of the message */ + protov = pool_read_int(cp); /* read protocol version */ + protoMajor = PG_PROTOCOL_MAJOR(ntohl(protov)); /* protocol major version */ + protoMinor = PG_PROTOCOL_MINOR(ntohl(protov)); /* protocol minor version */ + pool_write(frontend, &protov, 4); /* forward protocol version */ + elog(DEBUG1, "protocol verion offered: major: %d minor: %d", protoMajor, protoMinor); + len -= 4; + for (i = 0; i < NUM_BACKENDS; i++) + { + if (VALID_BACKEND(i)) + { + POOL_CONNECTION_POOL_SLOT *sp; + char *p; + char *np; + Size nplen; + + p = pool_read2(CONNECTION(cp, i), len); + if (!forwardMsg) + { + pool_write_and_flush(frontend, p, len); /* forward rest of message */ + forwardMsg = true; + } + /* save negatiate protocol version */ + sp = CONNECTION_SLOT(cp, i); + sp->negotiated_major = protoMajor; + sp->negotiated_minor = protoMinor; + + /* save negatiate protocol message */ + nplen = 1 + /* message kind */ + sizeof(savelen) + /* message length */ + sizeof(protov) + /* protocol version */ + len; /* rest of message */ + /* allocate message area */ + sp->negotiateProtocolMsg = MemoryContextAlloc(TopMemoryContext, nplen); + np = sp->negotiateProtocolMsg; + sp->nplen = nplen; /* set message length */ + + *np++ = 'v'; + memcpy(np, &savelen, sizeof(savelen)); + np += sizeof(savelen); + memcpy(np, &protov, sizeof(protov)); + np += sizeof(protov); + memcpy(np, p, len); + } + } +} diff --git a/src/include/pool.h b/src/include/pool.h index c9b4dc27e..28cf1757c 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -4,7 +4,9 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2024 PgPool Global Development Group + * Portions Copyright (c) 2003-2025 PgPool Global Development Group + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -103,6 +105,41 @@ typedef enum POOL_SOCKET_EOF } POOL_SOCKET_STATE; +/* + * Imported from src/include/libpq/pqcomm.h as of PostgreSQL 18. + * + * These manipulate the frontend/backend protocol version number. + * + * The major number should be incremented for incompatible changes. The minor + * number should be incremented for compatible changes (eg. additional + * functionality). + * + * If a backend supports version m.n of the protocol it must actually support + * versions m.[0..n]. Backend support for version m-1 can be dropped after a + * `reasonable' length of time. + * + * A frontend isn't required to support anything other than the current + * version. + */ + +#define PG_PROTOCOL_MAJOR(v) ((v) >> 16) +#define PG_PROTOCOL_MINOR(v) ((v) & 0x0000ffff) +#define PG_PROTOCOL_FULL(v) (PG_PROTOCOL_MAJOR(v) * 10000 + PG_PROTOCOL_MINOR(v)) +#define PG_PROTOCOL(m,n) (((m) << 16) | (n)) + +/* + * The earliest and latest frontend/backend protocol version supported. + */ + +#define PG_PROTOCOL_EARLIEST PG_PROTOCOL(3,0) +#define PG_PROTOCOL_LATEST PG_PROTOCOL(3,2) + +typedef uint32 ProtocolVersion; /* FE/BE protocol version number */ + +typedef ProtocolVersion MsgType; + +/* end of importing */ + /* protocol major version numbers */ #define PROTO_MAJOR_V2 2 #define PROTO_MAJOR_V3 3 @@ -262,6 +299,15 @@ typedef struct time_t closetime; /* absolute time in second when the connection * closed if 0, that means the connection is * under use. */ + /* + * Protocol version after negotiation. If nplen == 0, no negotiation has + * been done. + */ + int negotiated_major; + int negotiated_minor; + char *negotiateProtocolMsg; /* Raw NegotiateProtocol messag */ + int32 nplen; /* message length of NegotiateProtocol messag */ + } POOL_CONNECTION_POOL_SLOT; typedef struct diff --git a/src/protocol/child.c b/src/protocol/child.c index f4142f90d..7aea33540 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -637,8 +637,8 @@ read_startup_packet(POOL_CONNECTION * cp) sp->len = len; memcpy(&protov, sp->startup_packet, sizeof(protov)); - sp->major = ntohl(protov) >> 16; - sp->minor = ntohl(protov) & 0x0000ffff; + sp->major = PG_PROTOCOL_MAJOR(ntohl(protov)); + sp->minor = PG_PROTOCOL_MINOR(ntohl(protov)); cp->protoVersion = sp->major; switch (sp->major) diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c index 225294a1b..666187216 100644 --- a/src/protocol/pool_connection_pool.c +++ b/src/protocol/pool_connection_pool.c @@ -235,6 +235,8 @@ pool_discard_cp(char *user, char *database, int protoMajor) } CONNECTION_SLOT(p, i)->sp = NULL; pool_close(CONNECTION(p, i)); + if (CONNECTION_SLOT(p, i)->negotiateProtocolMsg) + pfree(CONNECTION_SLOT(p, i)->negotiateProtocolMsg); pfree(CONNECTION_SLOT(p, i)); } @@ -945,7 +947,7 @@ static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p) continue; } - s = palloc(sizeof(POOL_CONNECTION_POOL_SLOT)); + s = palloc0(sizeof(POOL_CONNECTION_POOL_SLOT)); if (create_cp(s, i) == NULL) {