/*
  * Make a TransitionCaptureState object from a given TriggerDesc.  The
  * resulting object holds the flags which control whether transition tuples
- * are collected when tables are modified.  This allows us to use the flags
- * from a parent table to control the collection of transition tuples from
- * child tables.
+ * are collected when tables are modified, and the tuplestores themselves.
+ * Note that we copy the flags from a parent table into this struct (rather
+ * than using each relation's TriggerDesc directly) so that we can use it to
+ * control the collection of transition tuples from child tables.
  *
  * If there are no triggers with transition tables configured for 'trigdesc',
  * then return NULL.
                (trigdesc->trig_delete_old_table || trigdesc->trig_update_old_table ||
                 trigdesc->trig_update_new_table || trigdesc->trig_insert_new_table))
        {
+               MemoryContext oldcxt;
+               ResourceOwner saveResourceOwner;
+
+               /*
+                * Normally DestroyTransitionCaptureState should be called after
+                * executing all AFTER triggers for the current statement.
+                *
+                * To handle error cleanup, TransitionCaptureState and the tuplestores
+                * it contains will live in the current [sub]transaction's memory
+                * context.  Likewise for the current resource owner, because we also
+                * want to clean up temporary files spilled to disk by the tuplestore
+                * in that scenario.  This scope is sufficient, because AFTER triggers
+                * with transition tables cannot be deferred (only constraint triggers
+                * can be deferred, and constraint triggers cannot have transition
+                * tables).  The AFTER trigger queue may contain pointers to this
+                * TransitionCaptureState, but any such entries will be processed or
+                * discarded before the end of the current [sub]transaction.
+                *
+                * If a future release allows deferred triggers with transition
+                * tables, we'll need to reconsider the scope of the
+                * TransitionCaptureState object.
+                */
+               oldcxt = MemoryContextSwitchTo(CurTransactionContext);
+               saveResourceOwner = CurrentResourceOwner;
+
                state = (TransitionCaptureState *)
                        palloc0(sizeof(TransitionCaptureState));
                state->tcs_delete_old_table = trigdesc->trig_delete_old_table;
                state->tcs_update_old_table = trigdesc->trig_update_old_table;
                state->tcs_update_new_table = trigdesc->trig_update_new_table;
                state->tcs_insert_new_table = trigdesc->trig_insert_new_table;
+               PG_TRY();
+               {
+                       CurrentResourceOwner = CurTransactionResourceOwner;
+                       if (trigdesc->trig_delete_old_table || trigdesc->trig_update_old_table)
+                               state->tcs_old_tuplestore = tuplestore_begin_heap(false, false, work_mem);
+                       if (trigdesc->trig_insert_new_table || trigdesc->trig_update_new_table)
+                               state->tcs_new_tuplestore = tuplestore_begin_heap(false, false, work_mem);
+               }
+               PG_CATCH();
+               {
+                       CurrentResourceOwner = saveResourceOwner;
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
+               CurrentResourceOwner = saveResourceOwner;
+               MemoryContextSwitchTo(oldcxt);
        }
 
        return state;
 }
 
+void
+DestroyTransitionCaptureState(TransitionCaptureState *tcs)
+{
+       if (tcs->tcs_new_tuplestore != NULL)
+               tuplestore_end(tcs->tcs_new_tuplestore);
+       if (tcs->tcs_old_tuplestore != NULL)
+               tuplestore_end(tcs->tcs_old_tuplestore);
+       pfree(tcs);
+}
+
 /*
  * Call a trigger function.
  *
 }
 
 void
-ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo)
+ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo,
+                                        TransitionCaptureState *transition_capture)
 {
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
        if (trigdesc && trigdesc->trig_insert_after_statement)
                AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
-                                                         false, NULL, NULL, NIL, NULL, NULL);
+                                                         false, NULL, NULL, NIL, NULL, transition_capture);
 }
 
 TupleTableSlot *
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
        if ((trigdesc && trigdesc->trig_insert_after_row) ||
-               (trigdesc && !transition_capture && trigdesc->trig_insert_new_table) ||
                (transition_capture && transition_capture->tcs_insert_new_table))
                AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_INSERT,
                                                          true, NULL, trigtuple,
 }
 
 void
-ExecASDeleteTriggers(EState *estate, ResultRelInfo *relinfo)
+ExecASDeleteTriggers(EState *estate, ResultRelInfo *relinfo,
+                                        TransitionCaptureState *transition_capture)
 {
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
        if (trigdesc && trigdesc->trig_delete_after_statement)
                AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_DELETE,
-                                                         false, NULL, NULL, NIL, NULL, NULL);
+                                                         false, NULL, NULL, NIL, NULL, transition_capture);
 }
 
 bool
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
        if ((trigdesc && trigdesc->trig_delete_after_row) ||
-               (trigdesc && !transition_capture && trigdesc->trig_delete_old_table) ||
                (transition_capture && transition_capture->tcs_delete_old_table))
        {
                HeapTuple       trigtuple;
 }
 
 void
-ExecASUpdateTriggers(EState *estate, ResultRelInfo *relinfo)
+ExecASUpdateTriggers(EState *estate, ResultRelInfo *relinfo,
+                                        TransitionCaptureState *transition_capture)
 {
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
                AfterTriggerSaveEvent(estate, relinfo, TRIGGER_EVENT_UPDATE,
                                                          false, NULL, NULL, NIL,
                                                          GetUpdatedColumns(relinfo, estate),
-                                                         NULL);
+                                                         transition_capture);
 }
 
 TupleTableSlot *
        TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
 
        if ((trigdesc && trigdesc->trig_update_after_row) ||
-               (trigdesc && !transition_capture &&
-                (trigdesc->trig_update_old_table ||
-                 trigdesc->trig_update_new_table)) ||
                (transition_capture &&
                 (transition_capture->tcs_update_old_table ||
                  transition_capture->tcs_update_new_table)))
        Oid                     ats_tgoid;              /* the trigger's ID */
        Oid                     ats_relid;              /* the relation it's on */
        CommandId       ats_firing_id;  /* ID for firing cycle */
+       TransitionCaptureState *ats_transition_capture;
 } AfterTriggerSharedData;
 
 typedef struct AfterTriggerEventData *AfterTriggerEvent;
  * fdw_tuplestores[query_depth] is a tuplestore containing the foreign tuples
  * needed for the current query.
  *
- * old_tuplestores[query_depth] and new_tuplestores[query_depth] hold the
- * transition relations for the current query.
- *
  * maxquerydepth is just the allocated length of query_stack and the
  * tuplestores.
  *
        AfterTriggerEventList *query_stack; /* events pending from each query */
        Tuplestorestate **fdw_tuplestores;      /* foreign tuples for one row from
                                                                                 * each query */
-       Tuplestorestate **old_tuplestores;      /* all old tuples from each query */
-       Tuplestorestate **new_tuplestores;      /* all new tuples from each query */
        int                     maxquerydepth;  /* allocated len of above array */
        MemoryContext event_cxt;        /* memory context for events, if any */
 
                                        Instrumentation *instr,
                                        MemoryContext per_tuple_context,
                                        TupleTableSlot *trig_tuple_slot1,
-                                       TupleTableSlot *trig_tuple_slot2);
+                                       TupleTableSlot *trig_tuple_slot2,
+                                       TransitionCaptureState *transition_capture);
 static SetConstraintState SetConstraintStateCreate(int numalloc);
 static SetConstraintState SetConstraintStateCopy(SetConstraintState state);
 static SetConstraintState SetConstraintStateAddItem(SetConstraintState state,
 
 /*
  * Gets a current query transition tuplestore and initializes it if necessary.
- * This can be holding a single transition row tuple (in the case of an FDW)
- * or a transition table (for an AFTER trigger).
  */
 static Tuplestorestate *
 GetTriggerTransitionTuplestore(Tuplestorestate **tss)
                if (newshared->ats_tgoid == evtshared->ats_tgoid &&
                        newshared->ats_relid == evtshared->ats_relid &&
                        newshared->ats_event == evtshared->ats_event &&
+                       newshared->ats_transition_capture == evtshared->ats_transition_capture &&
                        newshared->ats_firing_id == 0)
                        break;
        }
                                        FmgrInfo *finfo, Instrumentation *instr,
                                        MemoryContext per_tuple_context,
                                        TupleTableSlot *trig_tuple_slot1,
-                                       TupleTableSlot *trig_tuple_slot2)
+                                       TupleTableSlot *trig_tuple_slot2,
+                                       TransitionCaptureState *transition_capture)
 {
        AfterTriggerShared evtshared = GetTriggerSharedData(event);
        Oid                     tgoid = evtshared->ats_tgoid;
        /*
         * Set up the tuplestore information.
         */
-       if (LocTriggerData.tg_trigger->tgoldtable)
-               LocTriggerData.tg_oldtable =
-                       GetTriggerTransitionTuplestore(afterTriggers.old_tuplestores);
-       else
-               LocTriggerData.tg_oldtable = NULL;
-       if (LocTriggerData.tg_trigger->tgnewtable)
-               LocTriggerData.tg_newtable =
-                       GetTriggerTransitionTuplestore(afterTriggers.new_tuplestores);
-       else
-               LocTriggerData.tg_newtable = NULL;
+       LocTriggerData.tg_oldtable = LocTriggerData.tg_newtable = NULL;
+       if (transition_capture != NULL)
+       {
+               if (LocTriggerData.tg_trigger->tgoldtable)
+                       LocTriggerData.tg_oldtable = transition_capture->tcs_old_tuplestore;
+               if (LocTriggerData.tg_trigger->tgnewtable)
+                       LocTriggerData.tg_newtable = transition_capture->tcs_new_tuplestore;
+       }
 
        /*
         * Setup the remaining trigger information
                                 * won't try to re-fire it.
                                 */
                                AfterTriggerExecute(event, rel, trigdesc, finfo, instr,
-                                                                       per_tuple_context, slot1, slot2);
+                                                                       per_tuple_context, slot1, slot2,
+                                                                       evtshared->ats_transition_capture);
 
                                /*
                                 * Mark the event as done.
        Assert(afterTriggers.state == NULL);
        Assert(afterTriggers.query_stack == NULL);
        Assert(afterTriggers.fdw_tuplestores == NULL);
-       Assert(afterTriggers.old_tuplestores == NULL);
-       Assert(afterTriggers.new_tuplestores == NULL);
        Assert(afterTriggers.maxquerydepth == 0);
        Assert(afterTriggers.event_cxt == NULL);
        Assert(afterTriggers.events.head == NULL);
 {
        AfterTriggerEventList *events;
        Tuplestorestate *fdw_tuplestore;
-       Tuplestorestate *old_tuplestore;
-       Tuplestorestate *new_tuplestore;
 
        /* Must be inside a query, too */
        Assert(afterTriggers.query_depth >= 0);
                tuplestore_end(fdw_tuplestore);
                afterTriggers.fdw_tuplestores[afterTriggers.query_depth] = NULL;
        }
-       old_tuplestore = afterTriggers.old_tuplestores[afterTriggers.query_depth];
-       if (old_tuplestore)
-       {
-               tuplestore_end(old_tuplestore);
-               afterTriggers.old_tuplestores[afterTriggers.query_depth] = NULL;
-       }
-       new_tuplestore = afterTriggers.new_tuplestores[afterTriggers.query_depth];
-       if (new_tuplestore)
-       {
-               tuplestore_end(new_tuplestore);
-               afterTriggers.new_tuplestores[afterTriggers.query_depth] = NULL;
-       }
        afterTriggerFreeEventList(&afterTriggers.query_stack[afterTriggers.query_depth]);
 
        afterTriggers.query_depth--;
         */
        afterTriggers.query_stack = NULL;
        afterTriggers.fdw_tuplestores = NULL;
-       afterTriggers.old_tuplestores = NULL;
-       afterTriggers.new_tuplestores = NULL;
        afterTriggers.maxquerydepth = 0;
        afterTriggers.state = NULL;
 
                                        tuplestore_end(ts);
                                        afterTriggers.fdw_tuplestores[afterTriggers.query_depth] = NULL;
                                }
-                               ts = afterTriggers.old_tuplestores[afterTriggers.query_depth];
-                               if (ts)
-                               {
-                                       tuplestore_end(ts);
-                                       afterTriggers.old_tuplestores[afterTriggers.query_depth] = NULL;
-                               }
-                               ts = afterTriggers.new_tuplestores[afterTriggers.query_depth];
-                               if (ts)
-                               {
-                                       tuplestore_end(ts);
-                                       afterTriggers.new_tuplestores[afterTriggers.query_depth] = NULL;
-                               }
 
                                afterTriggerFreeEventList(&afterTriggers.query_stack[afterTriggers.query_depth]);
                        }
                afterTriggers.fdw_tuplestores = (Tuplestorestate **)
                        MemoryContextAllocZero(TopTransactionContext,
                                                                   new_alloc * sizeof(Tuplestorestate *));
-               afterTriggers.old_tuplestores = (Tuplestorestate **)
-                       MemoryContextAllocZero(TopTransactionContext,
-                                                                  new_alloc * sizeof(Tuplestorestate *));
-               afterTriggers.new_tuplestores = (Tuplestorestate **)
-                       MemoryContextAllocZero(TopTransactionContext,
-                                                                  new_alloc * sizeof(Tuplestorestate *));
                afterTriggers.maxquerydepth = new_alloc;
        }
        else
                afterTriggers.fdw_tuplestores = (Tuplestorestate **)
                        repalloc(afterTriggers.fdw_tuplestores,
                                         new_alloc * sizeof(Tuplestorestate *));
-               afterTriggers.old_tuplestores = (Tuplestorestate **)
-                       repalloc(afterTriggers.old_tuplestores,
-                                        new_alloc * sizeof(Tuplestorestate *));
-               afterTriggers.new_tuplestores = (Tuplestorestate **)
-                       repalloc(afterTriggers.new_tuplestores,
-                                        new_alloc * sizeof(Tuplestorestate *));
                /* Clear newly-allocated slots for subsequent lazy initialization. */
                memset(afterTriggers.fdw_tuplestores + old_alloc,
                           0, (new_alloc - old_alloc) * sizeof(Tuplestorestate *));
-               memset(afterTriggers.old_tuplestores + old_alloc,
-                          0, (new_alloc - old_alloc) * sizeof(Tuplestorestate *));
-               memset(afterTriggers.new_tuplestores + old_alloc,
-                          0, (new_alloc - old_alloc) * sizeof(Tuplestorestate *));
                afterTriggers.maxquerydepth = new_alloc;
        }
 
                AfterTriggerEnlargeQueryState();
 
        /*
-        * If the relation has AFTER ... FOR EACH ROW triggers, capture rows into
-        * transition tuplestores for this depth.
+        * If the directly named relation has any triggers with transition tables,
+        * then we need to capture transition tuples.
         */
-       if (row_trigger)
+       if (row_trigger && transition_capture != NULL)
        {
-               HeapTuple original_insert_tuple = NULL;
-               TupleConversionMap *map = NULL;
-               bool delete_old_table = false;
-               bool update_old_table = false;
-               bool update_new_table = false;
-               bool insert_new_table = false;
-
-               if (transition_capture != NULL)
-               {
-                       /*
-                        * A TransitionCaptureState object was provided to tell us which
-                        * tuples to capture based on a parent table named in a DML
-                        * statement.  We may be dealing with a child table with an
-                        * incompatible TupleDescriptor, in which case we'll need a map to
-                        * convert them.  As a small optimization, we may receive the
-                        * original tuple from an insertion into a partitioned table to
-                        * avoid a wasteful parent->child->parent round trip.
-                        */
-                       delete_old_table = transition_capture->tcs_delete_old_table;
-                       update_old_table = transition_capture->tcs_update_old_table;
-                       update_new_table = transition_capture->tcs_update_new_table;
-                       insert_new_table = transition_capture->tcs_insert_new_table;
-                       map = transition_capture->tcs_map;
-                       original_insert_tuple =
-                               transition_capture->tcs_original_insert_tuple;
-               }
-               else if (trigdesc != NULL)
-               {
-                       /*
-                        * Check if we need to capture transition tuples for triggers
-                        * defined on this relation directly.  This case is useful for
-                        * cases like execReplication.c which don't set up a
-                        * TriggerCaptureState because they don't know how to work with
-                        * partitions.
-                        */
-                       delete_old_table = trigdesc->trig_delete_old_table;
-                       update_old_table = trigdesc->trig_update_old_table;
-                       update_new_table = trigdesc->trig_update_new_table;
-                       insert_new_table = trigdesc->trig_insert_new_table;
-               }
+               HeapTuple original_insert_tuple = transition_capture->tcs_original_insert_tuple;
+               TupleConversionMap *map = transition_capture->tcs_map;
+               bool delete_old_table = transition_capture->tcs_delete_old_table;
+               bool update_old_table = transition_capture->tcs_update_old_table;
+               bool update_new_table = transition_capture->tcs_update_new_table;
+               bool insert_new_table = transition_capture->tcs_insert_new_table;;
 
                if ((event == TRIGGER_EVENT_DELETE && delete_old_table) ||
                        (event == TRIGGER_EVENT_UPDATE && update_old_table))
                        Tuplestorestate *old_tuplestore;
 
                        Assert(oldtup != NULL);
-                       old_tuplestore =
-                               GetTriggerTransitionTuplestore
-                               (afterTriggers.old_tuplestores);
+                       old_tuplestore = transition_capture->tcs_old_tuplestore;
+
                        if (map != NULL)
                        {
                                HeapTuple       converted = do_convert_tuple(oldtup, map);
                        Tuplestorestate *new_tuplestore;
 
                        Assert(newtup != NULL);
-                       new_tuplestore =
-                               GetTriggerTransitionTuplestore
-                               (afterTriggers.new_tuplestores);
+                       new_tuplestore = transition_capture->tcs_new_tuplestore;
+
                        if (original_insert_tuple != NULL)
                                tuplestore_puttuple(new_tuplestore, original_insert_tuple);
                        else if (map != NULL)
                new_shared.ats_tgoid = trigger->tgoid;
                new_shared.ats_relid = RelationGetRelid(rel);
                new_shared.ats_firing_id = 0;
+               new_shared.ats_transition_capture = transition_capture;
 
                afterTriggerAddEvent(&afterTriggers.query_stack[afterTriggers.query_depth],
                                                         &new_event, &new_shared);