#include "access/heapam.h"
#include "catalog/indexing.h"
+#include "catalog/namespace.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_operator.h"
+#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "statistics/statistics.h"
#include "statistics/stat_utils.h"
char *attname;
AttrNumber attnum;
bool inherited;
+ Oid locked_table = InvalidOid;
Relation starel;
HeapTuple statup;
nspname = TextDatumGetCString(PG_GETARG_DATUM(ATTRELSCHEMA_ARG));
relname = TextDatumGetCString(PG_GETARG_DATUM(ATTRELNAME_ARG));
- reloid = stats_lookup_relid(nspname, relname);
-
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errhint("Statistics cannot be modified during recovery.")));
/* lock before looking up attribute */
- stats_lock_check_privileges(reloid);
+ reloid = RangeVarGetRelidExtended(makeRangeVar(nspname, relname, -1),
+ ShareUpdateExclusiveLock, 0,
+ RangeVarCallbackForStats, &locked_table);
/* user can specify either attname or attnum, but not both */
if (!PG_ARGISNULL(ATTNAME_ARG))
char *attname;
AttrNumber attnum;
bool inherited;
+ Oid locked_table = InvalidOid;
stats_check_required_arg(fcinfo, cleararginfo, C_ATTRELSCHEMA_ARG);
stats_check_required_arg(fcinfo, cleararginfo, C_ATTRELNAME_ARG);
nspname = TextDatumGetCString(PG_GETARG_DATUM(C_ATTRELSCHEMA_ARG));
relname = TextDatumGetCString(PG_GETARG_DATUM(C_ATTRELNAME_ARG));
- reloid = stats_lookup_relid(nspname, relname);
-
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("Statistics cannot be modified during recovery.")));
- stats_lock_check_privileges(reloid);
+ reloid = RangeVarGetRelidExtended(makeRangeVar(nspname, relname, -1),
+ ShareUpdateExclusiveLock, 0,
+ RangeVarCallbackForStats, &locked_table);
attname = TextDatumGetCString(PG_GETARG_DATUM(C_ATTNAME_ARG));
attnum = get_attnum(reloid, attname);
#include "postgres.h"
+#include "access/htup_details.h"
#include "access/relation.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
#include "catalog/pg_database.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
+#include "utils/syscache.h"
/*
* Ensure that a given argument is not null.
}
/*
- * Lock relation in ShareUpdateExclusive mode, check privileges, and close the
- * relation (but retain the lock).
- *
* A role has privileges to set statistics on the relation if any of the
* following are true:
* - the role owns the current database and the relation is not shared
* - the role has the MAINTAIN privilege on the relation
*/
void
-stats_lock_check_privileges(Oid reloid)
+RangeVarCallbackForStats(const RangeVar *relation,
+ Oid relId, Oid oldRelId, void *arg)
{
- Relation table;
- Oid table_oid = reloid;
- Oid index_oid = InvalidOid;
- LOCKMODE index_lockmode = NoLock;
+ Oid *locked_oid = (Oid *) arg;
+ Oid table_oid = relId;
+ HeapTuple tuple;
+ Form_pg_class form;
+ char relkind;
/*
- * For indexes, we follow the locking behavior in do_analyze_rel() and
- * check_lock_if_inplace_updateable_rel(), which is to lock the table
- * first in ShareUpdateExclusive mode and then the index in AccessShare
- * mode.
- *
- * Partitioned indexes are treated differently than normal indexes in
- * check_lock_if_inplace_updateable_rel(), so we take a
- * ShareUpdateExclusive lock on both the partitioned table and the
- * partitioned index.
+ * If we previously locked some other index's heap, and the name we're
+ * looking up no longer refers to that relation, release the now-useless
+ * lock.
*/
- switch (get_rel_relkind(reloid))
+ if (relId != oldRelId && OidIsValid(*locked_oid))
{
- case RELKIND_INDEX:
- index_oid = reloid;
- table_oid = IndexGetRelation(index_oid, false);
- index_lockmode = AccessShareLock;
- break;
- case RELKIND_PARTITIONED_INDEX:
- index_oid = reloid;
- table_oid = IndexGetRelation(index_oid, false);
- index_lockmode = ShareUpdateExclusiveLock;
- break;
- default:
- break;
+ UnlockRelationOid(*locked_oid, ShareUpdateExclusiveLock);
+ *locked_oid = InvalidOid;
+ }
+
+ /* If the relation does not exist, there's nothing more to do. */
+ if (!OidIsValid(relId))
+ return;
+
+ /* If the relation does exist, check whether it's an index. */
+ relkind = get_rel_relkind(relId);
+ if (relkind == RELKIND_INDEX ||
+ relkind == RELKIND_PARTITIONED_INDEX)
+ table_oid = IndexGetRelation(relId, false);
+
+ /*
+ * If retrying yields the same OID, there are a couple of extremely
+ * unlikely scenarios we need to handle.
+ */
+ if (relId == oldRelId)
+ {
+ /*
+ * If a previous lookup found an index, but the current lookup did
+ * not, the index was dropped and the OID was reused for something
+ * else between lookups. In theory, we could simply drop our lock on
+ * the index's parent table and proceed, but in the interest of
+ * avoiding complexity, we just error.
+ */
+ if (table_oid == relId && OidIsValid(*locked_oid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("index \"%s\" was concurrently dropped",
+ relation->relname)));
+
+ /*
+ * If the current lookup found an index but a previous lookup either
+ * did not find an index or found one with a different parent
+ * relation, the relation was dropped and the OID was reused for an
+ * index between lookups. RangeVarGetRelidExtended() will have
+ * already locked the index at this point, so we can't just lock the
+ * newly discovered parent table OID without risking deadlock. As
+ * above, we just error in this case.
+ */
+ if (table_oid != relId && table_oid != *locked_oid)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("index \"%s\" was concurrently created",
+ relation->relname)));
}
- table = relation_open(table_oid, ShareUpdateExclusiveLock);
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(table_oid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for OID %u", table_oid);
+ form = (Form_pg_class) GETSTRUCT(tuple);
/* the relkinds that can be used with ANALYZE */
- switch (table->rd_rel->relkind)
+ switch (form->relkind)
{
case RELKIND_RELATION:
case RELKIND_MATVIEW:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot modify statistics for relation \"%s\"",
- RelationGetRelationName(table)),
- errdetail_relkind_not_supported(table->rd_rel->relkind)));
+ NameStr(form->relname)),
+ errdetail_relkind_not_supported(form->relkind)));
}
- if (OidIsValid(index_oid))
- {
- Relation index;
-
- Assert(index_lockmode != NoLock);
- index = relation_open(index_oid, index_lockmode);
-
- Assert(index->rd_index && index->rd_index->indrelid == table_oid);
-
- /* retain lock on index */
- relation_close(index, NoLock);
- }
-
- if (table->rd_rel->relisshared)
+ if (form->relisshared)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot modify statistics for shared relation")));
+ /* Check permissions */
if (!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()))
{
- AclResult aclresult = pg_class_aclcheck(RelationGetRelid(table),
+ AclResult aclresult = pg_class_aclcheck(table_oid,
GetUserId(),
ACL_MAINTAIN);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult,
- get_relkind_objtype(table->rd_rel->relkind),
- NameStr(table->rd_rel->relname));
+ get_relkind_objtype(form->relkind),
+ NameStr(form->relname));
}
- /* retain lock on table */
- relation_close(table, NoLock);
-}
+ ReleaseSysCache(tuple);
-/*
- * Lookup relation oid from schema and relation name.
- */
-Oid
-stats_lookup_relid(const char *nspname, const char *relname)
-{
- Oid nspoid;
- Oid reloid;
-
- nspoid = LookupExplicitNamespace(nspname, false);
- reloid = get_relname_relid(relname, nspoid);
- if (!OidIsValid(reloid))
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_TABLE),
- errmsg("relation \"%s.%s\" does not exist",
- nspname, relname)));
-
- return reloid;
+ /* Lock heap before index to avoid deadlock. */
+ if (relId != oldRelId && table_oid != relId)
+ {
+ LockRelationOid(table_oid, ShareUpdateExclusiveLock);
+ *locked_oid = table_oid;
+ }
}