From 7e3e8b6a051f83d8ca07aa697ca3eafee62e4f00 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Wed, 28 Mar 2012 22:12:15 +0300 Subject: [PATCH] User mapping now almost works on sql/med too --- src/cluster.c | 260 ++++++++++++++++++++++++++++++++++++-------------- src/execute.c | 5 +- src/plproxy.h | 7 +- 3 files changed, 198 insertions(+), 74 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 5519dcf..20737c3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -142,8 +142,12 @@ static int userinfo_cmp(uintptr_t val, struct AANode *node) static void userinfo_free(struct AANode *node, void *arg) { ConnUserInfo *info = (ConnUserInfo *)node; - pfree((void*)info->username); - pfree((void*)info->connstr); + pfree(info->username); + if (info->extra_connstr) + { + memset(info->extra_connstr, 0, strlen(info->extra_connstr)); + pfree(info->extra_connstr); + } memset(info, 0, sizeof(*info)); pfree(info); } @@ -215,7 +219,6 @@ static void free_connlist(ProxyCluster *cluster) { aatree_destroy(&cluster->conn_tree); - aatree_destroy(&cluster->userinfo_tree); pfree(cluster->part_map); pfree(cluster->active_list); @@ -535,6 +538,66 @@ plproxy_fdw_validator(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } +static void +reload_sqlmed_user(ProxyFunction *func, ProxyCluster *cluster) +{ + ConnUserInfo *userinfo = cluster->cur_userinfo; + + UserMapping *um; + HeapTuple tup; + StringInfoData cstr; + ListCell *cell; + AclResult aclresult; + + + um = GetUserMapping(userinfo->user_oid, cluster->sqlmed_server_oid); + + /* retry same lookup so we can set cache stamp... */ + tup = SearchSysCache(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(um->userid), + ObjectIdGetDatum(um->serverid), + 0, 0); + if (!HeapTupleIsValid(tup)) + { + /* Specific mapping not found, try PUBLIC */ + tup = SearchSysCache(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(InvalidOid), + ObjectIdGetDatum(um->serverid), + 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for user mapping (%u,%u)", + um->userid, um->serverid); + } + scstamp_set(USERMAPPINGOID, &userinfo->umStamp, tup); + ReleaseSysCache(tup); + + /* + * Check permissions, user must have usage on the server. + */ + aclresult = pg_foreign_server_aclcheck(um->serverid, um->userid, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, cluster->name); + + /* Extract the common connect string elements from user mapping */ + initStringInfo(&cstr); + foreach(cell, um->options) + { + DefElem *def = lfirst(cell); + + appendStringInfo(&cstr, " %s='%s'", def->defname, strVal(def->arg)); + } + + if (userinfo->extra_connstr) + { + memset(userinfo->extra_connstr, 0, strlen(userinfo->extra_connstr)); + pfree(userinfo->extra_connstr); + userinfo->extra_connstr = NULL; + } + + userinfo->extra_connstr = MemoryContextStrdup(cluster_mem, cstr.data); + memset(cstr.data, 0, cstr.len); +} + /* * Reload the cluster configuration and partitions from SQL/MED catalogs. */ @@ -542,19 +605,15 @@ static void reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster, ForeignServer *foreign_server) { - UserMapping *user_mapping; + ConnUserInfo *info = cluster->cur_userinfo; ForeignDataWrapper *fdw; HeapTuple tup; AclResult aclresult; - StringInfo user_options; ListCell *cell; - Oid userid; int part_count = 0; int part_num; - userid = GetSessionUserId(); - user_mapping = GetUserMapping(userid, foreign_server->serverid); fdw = GetForeignDataWrapper(foreign_server->fdwid); /* @@ -570,43 +629,13 @@ reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster, scstamp_set(FOREIGNSERVEROID, &cluster->clusterStamp, tup); ReleaseSysCache(tup); - tup = SearchSysCache(USERMAPPINGUSERSERVER, - ObjectIdGetDatum(user_mapping->userid), - ObjectIdGetDatum(foreign_server->serverid), - 0, 0); - - if (!HeapTupleIsValid(tup)) - { - /* Specific mapping not found, try PUBLIC */ - tup = SearchSysCache(USERMAPPINGUSERSERVER, - ObjectIdGetDatum(InvalidOid), - ObjectIdGetDatum(foreign_server->serverid), - 0, 0); - if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for user mapping (%u,%u)", - user_mapping->userid, foreign_server->serverid); - } - - scstamp_set(USERMAPPINGOID, &cluster->umStamp, tup); - - ReleaseSysCache(tup); - /* * Check permissions, user must have usage on the server. */ - aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE); + aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, info->user_oid, ACL_USAGE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername); - /* Extract the common connect string elements from user mapping */ - user_options = makeStringInfo(); - foreach(cell, user_mapping->options) - { - DefElem *def = lfirst(cell); - - appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg)); - } - /* * Collect the configuration definitions from foreign data wrapper. */ @@ -649,15 +678,11 @@ reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster, foreach(cell, foreign_server->options) { DefElem *def = lfirst(cell); - StringInfo buf = makeStringInfo(); if (!extract_part_num(def->defname, &part_num)) continue; - appendStringInfo(buf, "%s%s%s", strVal(def->arg), - user_options->len ? " " : "", - user_options->data); - add_connection(cluster, buf->data, part_num); + add_connection(cluster, strVal(def->arg), part_num); } } @@ -713,6 +738,34 @@ determine_compat_mode(ProxyCluster *cluster) elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name); } +static void inval_one_umap(struct AANode *n, void *arg) +{ + ConnUserInfo *info = (ConnUserInfo *)n; + SCInvalArg newStamp; + + if (info->needs_reload) + /* already invalidated */ + return; + + if (arg == NULL) + { + info->needs_reload = true; + return; + } + + newStamp = *(SCInvalArg *)arg; + if (scstamp_check(USERMAPPINGOID, &info->umStamp, newStamp)) + /* user mappings changed */ + info->needs_reload = true; +} + +static void inval_umapping(struct AANode *n, void *arg) +{ + ProxyCluster *cluster = (ProxyCluster *)n; + + aatree_walk(&cluster->userinfo_tree, AA_WALK_IN_ORDER, inval_one_umap, arg); +} + static void inval_fserver(struct AANode *n, void *arg) { ProxyCluster *cluster = (ProxyCluster *)n; @@ -727,20 +780,10 @@ static void inval_fserver(struct AANode *n, void *arg) else if (scstamp_check(FOREIGNSERVEROID, &cluster->clusterStamp, newStamp)) /* server definitions changed */ cluster->needs_reload = true; -} - -static void inval_one_umap(struct AANode *n, void *arg) -static void inval_umapping(struct AANode *n, void *arg) -{ - ProxyCluster *cluster = (ProxyCluster *)n; - SCInvalArg newStamp = *(SCInvalArg *)arg; + /* tag all users too */ if (cluster->needs_reload) - /* already invalidated */ - return; - else if (scstamp_check(USERMAPPINGOID, &cluster->umStamp, newStamp)) - /* user mappings changed */ - cluster->needs_reload = true; + inval_umapping(&cluster->node, NULL); } /* @@ -759,20 +802,24 @@ ClusterSyscacheCallback(Datum arg, int cacheid, SCInvalArg newStamp) aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, inval_umapping, &newStamp); } -#endif - /* * Register syscache invalidation callbacks for SQL/MED clusters. */ void plproxy_syscache_callback_init(void) { -#ifdef PLPROXY_USE_SQLMED CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0); CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0); -#endif } +#else /* !PLPROXY_USE_SQLMED */ + +void plproxy_syscache_callback_init(void) {} + +#endif + + + /* * Reload the cluster configuration and partitions from plproxy.get_cluster* * functions. @@ -817,24 +864,74 @@ new_cluster(const char *name) return cluster; } -static void init_cluster_user(ProxyCluster *cluster, const char *username) +/* + * Invalidate all connections for particular user + */ + +static void drop_userinfo_state(struct AANode *node, void *arg) +{ + ProxyConnectionState *cur = (ProxyConnectionState *)node; + ConnUserInfo *userinfo = arg; + + if (cur->userinfo == userinfo && cur->db) + { + PQfinish(cur->db); + cur->db = NULL; + cur->state = C_NONE; + } +} + +static void drop_userinfo_conn(struct AANode *node, void *arg) +{ + ProxyConnection *conn = (ProxyConnection *)node; + ConnUserInfo *userinfo = arg; + + aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, drop_userinfo_state, userinfo); +} + +static void inval_user_connections(ProxyCluster *cluster, ConnUserInfo *userinfo) +{ + /* find all connections with this user and drop them */ + aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, drop_userinfo_conn, userinfo); + + /* + * We can clear the flag only when it's certain + * that no connections with old info exist + */ + userinfo->needs_reload = false; +} + +/* + * Initialize user info struct + */ + +static ConnUserInfo * +get_userinfo(ProxyCluster *cluster, Oid user_oid) { ConnUserInfo *userinfo; - StringInfo tmp; struct AANode *node; + const char *username; + + username = GetUserNameFromId(user_oid); node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username); if (node) { userinfo = (ConnUserInfo *)node; } else { - tmp = makeStringInfo(); - appendStringInfo(tmp, "user=%s", username); userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo)); userinfo->username = MemoryContextStrdup(cluster_mem, username); - userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data); + + aatree_insert(&cluster->userinfo_tree, (uintptr_t)username, &userinfo->node); } - cluster->cur_userinfo = userinfo; + if (userinfo->user_oid != user_oid) + { + /* user got renamed? */ + userinfo->user_oid = user_oid; + userinfo->needs_reload = true; + } + + return userinfo; } /* @@ -843,11 +940,13 @@ static void init_cluster_user(ProxyCluster *cluster, const char *username) static void refresh_cluster(ProxyFunction *func, ProxyCluster *cluster) { - const char *username; + ConnUserInfo *uinfo; Oid user_oid; user_oid = GetSessionUserId(); - username = GetUserNameFromId(user_oid); + + uinfo = get_userinfo(cluster, user_oid); + cluster->cur_userinfo = uinfo; #ifdef PLPROXY_USE_SQLMED if (cluster->needs_reload) @@ -865,18 +964,32 @@ refresh_cluster(ProxyFunction *func, ProxyCluster *cluster) if (!cluster->sqlmed_cluster) determine_compat_mode(cluster); else + { + cluster->sqlmed_server_oid = server->serverid; reload_sqlmed_cluster(func, cluster, server); + } } + #endif + if (uinfo->needs_reload) + { +#ifdef PLPROXY_USE_SQLMED + if (cluster->sqlmed_cluster) + { + inval_user_connections(cluster, uinfo); + reload_sqlmed_user(func, cluster); + } + else +#endif + uinfo->needs_reload = false; + } + /* Either no SQL/MED support or no such foreign server */ if (!cluster->sqlmed_cluster && !cluster->fake_cluster) reload_plproxy_cluster(func, cluster); - init_cluster_user(cluster, username); - cluster->needs_reload = false; - pfree((void*)username); } /* @@ -1040,6 +1153,7 @@ struct MaintInfo { static void clean_state(struct AANode *node, void *arg) { ProxyConnectionState *cur = (ProxyConnectionState *)node; + ConnUserInfo *uinfo = cur->userinfo; struct MaintInfo *maint = arg; ProxyConfig *cf = maint->cf; struct timeval *now = maint->now; @@ -1054,6 +1168,10 @@ static void clean_state(struct AANode *node, void *arg) { drop = true; } + else if (uinfo->needs_reload) + { + drop = true; + } else if (cf->connection_lifetime <= 0) { /* no aging */ diff --git a/src/execute.c b/src/execute.c index 320536f..6b546c2 100644 --- a/src/execute.c +++ b/src/execute.c @@ -385,7 +385,10 @@ get_connstr(ProxyConnection *conn) return pstrdup(conn->connstr); initStringInfo(&cstr); - appendStringInfo(&cstr, "%s %s", conn->connstr, info->connstr); + if (info->extra_connstr) + appendStringInfo(&cstr, "%s %s", conn->connstr, info->extra_connstr); + else + appendStringInfo(&cstr, "%s user='%s'", conn->connstr, info->username); return cstr.data; } diff --git a/src/plproxy.h b/src/plproxy.h index a2c7e95..f58c9cf 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -138,9 +138,10 @@ typedef struct ProxyConfig typedef struct ConnUserInfo { struct AANode node; + Oid user_oid; - const char *username; - const char *connstr; + char *username; + char *extra_connstr; /* user= and password= */ SysCacheStamp umStamp; bool needs_reload; @@ -217,6 +218,8 @@ typedef struct ProxyCluster int ret_cur_pos; /* Result walking: index of current row */ int ret_total; /* Result walking: total rows left */ + Oid sqlmed_server_oid; + bool fake_cluster; /* single connect-string cluster */ bool sqlmed_cluster; /* True if the cluster is defined using SQL/MED */ bool needs_reload; /* True if the cluster partition list should be reloaded */ -- 2.39.5