#include "utils/resowner_private.h"
 #include "utils/syscache.h"
 
+/*
+ * If a catcache invalidation is processed while we are in the middle of
+ * creating a catcache entry (or list), it might apply to the entry we're
+ * creating, making it invalid before it's been inserted to the catcache.  To
+ * catch such cases, we have a stack of "create-in-progress" entries.  Cache
+ * invalidation marks any matching entries in the stack as dead, in addition
+ * to the actual CatCTup and CatCList entries.
+ */
+typedef struct CatCInProgress
+{
+   CatCache   *cache;          /* cache that the entry belongs to */
+   uint32      hash_value;     /* hash of the entry; ignored for lists */
+   bool        list;           /* is it a list entry? */
+   bool        dead;           /* set when the entry is invalidated */
+   struct CatCInProgress *next;
+} CatCInProgress;
+
+static CatCInProgress *catcache_in_progress_stack = NULL;
 
  /* #define CACHEDEBUG */  /* turns DEBUG elogs on */
 
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
 static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
-static CatCTup *CatalogCacheCreateEntry(CatCache *cache,
-                                       HeapTuple ntp, SysScanDesc scandesc,
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
                                        Datum *arguments,
                                        uint32 hashValue, Index hashIndex);
 
            /* could be multiple matches, so keep looking! */
        }
    }
+
+   /* Also invalidate any entries that are being built */
+   for (CatCInProgress *e = catcache_in_progress_stack; e != NULL; e = e->next)
+   {
+       if (e->cache == cache)
+       {
+           if (e->list || e->hash_value == hashValue)
+               e->dead = true;
+       }
+   }
 }
 
 /* ----------------------------------------------------------------
  *
  * This is not very efficient if the target cache is nearly empty.
  * However, it shouldn't need to be efficient; we don't invoke it often.
+ *
+ * If 'debug_discard' is true, we are being called as part of
+ * debug_discard_caches.  In that case, the cache is not reset for
+ * correctness, but just to get more testing of cache invalidation.  We skip
+ * resetting in-progress build entries in that case, or we'd never make any
+ * progress.
  */
 static void
-ResetCatalogCache(CatCache *cache)
+ResetCatalogCache(CatCache *cache, bool debug_discard)
 {
    dlist_mutable_iter iter;
    int         i;
 #endif
        }
    }
+
+   /* Also invalidate any entries that are being built */
+   if (!debug_discard)
+   {
+       for (CatCInProgress *e = catcache_in_progress_stack; e != NULL; e = e->next)
+       {
+           if (e->cache == cache)
+               e->dead = true;
+       }
+   }
 }
 
 /*
  */
 void
 ResetCatalogCaches(void)
+{
+   ResetCatalogCachesExt(false);
+}
+
+void
+ResetCatalogCachesExt(bool debug_discard)
 {
    slist_iter  iter;
 
    {
        CatCache   *cache = slist_container(CatCache, cc_next, iter.cur);
 
-       ResetCatalogCache(cache);
+       ResetCatalogCache(cache, debug_discard);
    }
 
    CACHE_elog(DEBUG2, "end of ResetCatalogCaches call");
        if (cache->cc_reloid == catId)
        {
            /* Yes, so flush all its contents */
-           ResetCatalogCache(cache);
+           ResetCatalogCache(cache, false);
 
            /* Tell inval.c to call syscache callbacks for this cache */
            CallSyscacheCallbacks(cache->id, 0);
 
        while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
        {
-           ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+           ct = CatalogCacheCreateEntry(cache, ntp, NULL,
                                         hashValue, hashIndex);
            /* upon failure, we must start the scan over */
            if (ct == NULL)
        if (IsBootstrapProcessingMode())
            return NULL;
 
-       ct = CatalogCacheCreateEntry(cache, NULL, NULL, arguments,
+       ct = CatalogCacheCreateEntry(cache, NULL, arguments,
                                     hashValue, hashIndex);
 
        /* Creating a negative cache entry shouldn't fail */
    HeapTuple   ntp;
    MemoryContext oldcxt;
    int         i;
+   CatCInProgress *save_in_progress;
+   CatCInProgress in_progress_ent;
 
    /*
     * one-time startup overhead for each cache
 
    ctlist = NIL;
 
+   /*
+    * Cache invalidation can happen while we're building the list.
+    * CatalogCacheCreateEntry() handles concurrent invalidation of individual
+    * tuples, but it's also possible that a new entry is concurrently added
+    * that should be part of the list we're building.  Register an
+    * "in-progress" entry that will receive the invalidation, until we have
+    * built the final list entry.
+    */
+   save_in_progress = catcache_in_progress_stack;
+   in_progress_ent.next = catcache_in_progress_stack;
+   in_progress_ent.cache = cache;
+   in_progress_ent.hash_value = lHashValue;
+   in_progress_ent.list = true;
+   in_progress_ent.dead = false;
+   catcache_in_progress_stack = &in_progress_ent;
+
    PG_TRY();
    {
        ScanKeyData cur_skey[CATCACHE_MAXKEYS];
        Relation    relation;
        SysScanDesc scandesc;
-       bool        stale;
 
        relation = table_open(cache->cc_reloid, AccessShareLock);
 
+       /*
+        * Scan the table for matching entries.  If an invalidation arrives
+        * mid-build, we will loop back here to retry.
+        */
        do
        {
            /*
-            * Ok, need to make a lookup in the relation, copy the scankey and
-            * fill out any per-call fields.  (We must re-do this when
-            * retrying, because systable_beginscan scribbles on the scankey.)
+            * If we are retrying, release refcounts on any items created on
+            * the previous iteration.  We dare not try to free them if
+            * they're now unreferenced, since an error while doing that would
+            * result in the PG_CATCH below doing extra refcount decrements.
+            * Besides, we'll likely re-adopt those items in the next
+            * iteration, so it's not worth complicating matters to try to get
+            * rid of them.
+            */
+           foreach(ctlist_item, ctlist)
+           {
+               ct = (CatCTup *) lfirst(ctlist_item);
+               Assert(ct->c_list == NULL);
+               Assert(ct->refcount > 0);
+               ct->refcount--;
+           }
+           /* Reset ctlist in preparation for new try */
+           ctlist = NIL;
+           in_progress_ent.dead = false;
+
+           /*
+            * Copy the scankey and fill out any per-call fields.  (We must
+            * re-do this when retrying, because systable_beginscan scribbles
+            * on the scankey.)
             */
            memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
            cur_skey[0].sk_argument = v1;
            /* The list will be ordered iff we are doing an index scan */
            ordered = (scandesc->irel != NULL);
 
-           stale = false;
-
-           while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+           while (HeapTupleIsValid(ntp = systable_getnext(scandesc)) &&
+                  !in_progress_ent.dead)
            {
                uint32      hashValue;
                Index       hashIndex;
                if (!found)
                {
                    /* We didn't find a usable entry, so make a new one */
-                   ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+                   ct = CatalogCacheCreateEntry(cache, ntp, NULL,
                                                 hashValue, hashIndex);
+
                    /* upon failure, we must start the scan over */
                    if (ct == NULL)
                    {
-                       /*
-                        * Release refcounts on any items we already had.  We
-                        * dare not try to free them if they're now
-                        * unreferenced, since an error while doing that would
-                        * result in the PG_CATCH below doing extra refcount
-                        * decrements.  Besides, we'll likely re-adopt those
-                        * items in the next iteration, so it's not worth
-                        * complicating matters to try to get rid of them.
-                        */
-                       foreach(ctlist_item, ctlist)
-                       {
-                           ct = (CatCTup *) lfirst(ctlist_item);
-                           Assert(ct->c_list == NULL);
-                           Assert(ct->refcount > 0);
-                           ct->refcount--;
-                       }
-                       /* Reset ctlist in preparation for new try */
-                       ctlist = NIL;
-                       stale = true;
+                       in_progress_ent.dead = true;
                        break;
                    }
                }
            }
 
            systable_endscan(scandesc);
-       } while (stale);
+       } while (in_progress_ent.dead);
 
        table_close(relation, AccessShareLock);
 
    }
    PG_CATCH();
    {
+       Assert(catcache_in_progress_stack == &in_progress_ent);
+       catcache_in_progress_stack = save_in_progress;
+
        foreach(ctlist_item, ctlist)
        {
            ct = (CatCTup *) lfirst(ctlist_item);
        PG_RE_THROW();
    }
    PG_END_TRY();
+   Assert(catcache_in_progress_stack == &in_progress_ent);
+   catcache_in_progress_stack = save_in_progress;
 
    cl->cl_magic = CL_MAGIC;
    cl->my_cache = cache;
 }
 
 
-/*
- * equalTuple
- *     Are these tuples memcmp()-equal?
- */
-static bool
-equalTuple(HeapTuple a, HeapTuple b)
-{
-   uint32      alen;
-   uint32      blen;
-
-   alen = a->t_len;
-   blen = b->t_len;
-   return (alen == blen &&
-           memcmp((char *) a->t_data,
-                  (char *) b->t_data, blen) == 0);
-}
-
 /*
  * CatalogCacheCreateEntry
  *     Create a new CatCTup entry, copying the given HeapTuple and other
  *
  * To create a normal cache entry, ntp must be the HeapTuple just fetched
  * from scandesc, and "arguments" is not used.  To create a negative cache
- * entry, pass NULL for ntp and scandesc; then "arguments" is the cache
- * keys to use.  In either case, hashValue/hashIndex are the hash values
- * computed from the cache keys.
+ * entry, pass NULL for ntp; then "arguments" is the cache keys to use.
+ * In either case, hashValue/hashIndex are the hash values computed from
+ * the cache keys.
  *
  * Returns NULL if we attempt to detoast the tuple and observe that it
  * became stale.  (This cannot happen for a negative entry.)  Caller must
  * retry the tuple lookup in that case.
  */
 static CatCTup *
-CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, SysScanDesc scandesc,
-                       Datum *arguments,
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
                        uint32 hashValue, Index hashIndex)
 {
    CatCTup    *ct;
-   HeapTuple   dtp;
    MemoryContext oldcxt;
 
    if (ntp)
    {
        int         i;
+       HeapTuple   dtp = NULL;
 
        /*
-        * The visibility recheck below essentially never fails during our
-        * regression tests, and there's no easy way to force it to fail for
-        * testing purposes.  To ensure we have test coverage for the retry
-        * paths in our callers, make debug builds randomly fail about 0.1% of
-        * the times through this code path, even when there's no toasted
-        * fields.
+        * The invalidation of the in-progress entry essentially never happens
+        * during our regression tests, and there's no easy way to force it to
+        * fail for testing purposes.  To ensure we have test coverage for the
+        * retry paths in our callers, make debug builds randomly fail about
+        * 0.1% of the times through this code path, even when there's no
+        * toasted fields.
         */
 #ifdef USE_ASSERT_CHECKING
        if (pg_prng_uint32(&pg_global_prng_state) <= (PG_UINT32_MAX / 1000))
         */
        if (HeapTupleHasExternal(ntp))
        {
-           bool        need_cmp = IsInplaceUpdateOid(cache->cc_reloid);
-           HeapTuple   before = NULL;
-           bool        matches = true;
-
-           if (need_cmp)
-               before = heap_copytuple(ntp);
-           dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc);
+           CatCInProgress *save_in_progress;
+           CatCInProgress in_progress_ent;
 
            /*
             * The tuple could become stale while we are doing toast table
-            * access (since AcceptInvalidationMessages can run then).
-            * equalTuple() detects staleness from inplace updates, while
-            * systable_recheck_tuple() detects staleness from normal updates.
-            *
-            * While this equalTuple() follows the usual rule of reading with
-            * a pin and no buffer lock, it warrants suspicion since an
-            * inplace update could appear at any moment.  It's safe because
-            * the inplace update sends an invalidation that can't reorder
-            * before the inplace heap change.  If the heap change reaches
-            * this process just after equalTuple() looks, we've not missed
-            * its inval.
+            * access (since AcceptInvalidationMessages can run then).  The
+            * invalidation will mark our in-progress entry as dead.
             */
-           if (need_cmp)
+           save_in_progress = catcache_in_progress_stack;
+           in_progress_ent.next = catcache_in_progress_stack;
+           in_progress_ent.cache = cache;
+           in_progress_ent.hash_value = hashValue;
+           in_progress_ent.list = false;
+           in_progress_ent.dead = false;
+           catcache_in_progress_stack = &in_progress_ent;
+
+           PG_TRY();
            {
-               matches = equalTuple(before, ntp);
-               heap_freetuple(before);
+               dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc);
            }
-           if (!matches || !systable_recheck_tuple(scandesc, ntp))
+           PG_FINALLY();
+           {
+               Assert(catcache_in_progress_stack == &in_progress_ent);
+               catcache_in_progress_stack = save_in_progress;
+           }
+           PG_END_TRY();
+
+           if (in_progress_ent.dead)
            {
                heap_freetuple(dtp);
                return NULL;