Ensure that the contents of a holdable cursor don't depend on out-of-line
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Dec 2008 17:06:41 +0000 (17:06 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Dec 2008 17:06:41 +0000 (17:06 +0000)
toasted values, since those could get dropped once the cursor's transaction
is over.  Per bug #4553 from Andrew Gierth.

Back-patch as far as 8.1.  The bug actually exists back to 7.4 when holdable
cursors were introduced, but this patch won't work before 8.1 without
significant adjustments.  Given the lack of field complaints, it doesn't seem
worth the work (and risk of introducing new bugs) to try to make a patch for
the older branches.

src/backend/commands/portalcmds.c
src/backend/executor/tstoreReceiver.c
src/backend/tcop/pquery.c
src/include/executor/tstoreReceiver.h

index 7d64fe0eed2ffd5ea6c59597d5b93aa63f10e972..99b7829ed1a526621b3ac512576397cfed4c57f4 100644 (file)
@@ -351,11 +351,15 @@ PersistHoldablePortal(Portal portal)
                 */
                ExecutorRewind(queryDesc);
 
-               /* Change the destination to output to the tuplestore */
+               /*
+                * Change the destination to output to the tuplestore.  Note we
+                * tell the tuplestore receiver to detoast all data passed through it.
+                */
                queryDesc->dest = CreateDestReceiver(DestTuplestore);
                SetTuplestoreDestReceiverParams(queryDesc->dest,
                                                                                portal->holdStore,
-                                                                               portal->holdContext);
+                                                                               portal->holdContext,
+                                                                               true);
 
                /* Fetch the result set into the tuplestore */
                ExecutorRun(queryDesc, ForwardScanDirection, 0L);
index c0405a47b4a803caa5e6882e0ed5835aeffe1af1..63843888caafdd46fcd55effc0d5860b851a06bd 100644 (file)
@@ -1,8 +1,12 @@
 /*-------------------------------------------------------------------------
  *
  * tstoreReceiver.c
- *       an implementation of DestReceiver that stores the result tuples in
- *       a Tuplestore
+ *       An implementation of DestReceiver that stores the result tuples in
+ *       a Tuplestore.
+ *
+ * Optionally, we can force detoasting (but not decompression) of out-of-line
+ * toasted values.  This is to support cursors WITH HOLD, which must retain
+ * data even if the underlying table is dropped.
  *
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
 
 #include "postgres.h"
 
+#include "access/tuptoaster.h"
 #include "executor/tstoreReceiver.h"
 
 
 typedef struct
 {
        DestReceiver pub;
-       Tuplestorestate *tstore;
-       MemoryContext cxt;
+       /* parameters: */
+       Tuplestorestate *tstore;        /* where to put the data */
+       MemoryContext cxt;                      /* context containing tstore */
+       bool            detoast;                /* were we told to detoast? */
+       /* workspace: */
+       Datum      *outvalues;          /* values array for result tuple */
+       Datum      *tofree;                     /* temp values to be pfree'd */
 } TStoreState;
 
 
+static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+
+
 /*
  * Prepare to receive tuples from executor.
  */
 static void
 tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
 {
-       /* do nothing */
+       TStoreState *myState = (TStoreState *) self;
+       bool            needtoast = false;
+       Form_pg_attribute *attrs = typeinfo->attrs;
+       int                     natts = typeinfo->natts;
+       int                     i;
+
+       /* Check if any columns require detoast work */
+       if (myState->detoast)
+       {
+               for (i = 0; i < natts; i++)
+               {
+                       if (attrs[i]->attisdropped)
+                               continue;
+                       if (attrs[i]->attlen == -1)
+                       {
+                               needtoast = true;
+                               break;
+                       }
+               }
+       }
+
+       /* Set up appropriate callback */
+       if (needtoast)
+       {
+               myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
+               /* Create workspace */
+               myState->outvalues = (Datum *)
+                       MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+               myState->tofree = (Datum *)
+                       MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+       }
+       else
+       {
+               myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
+               myState->outvalues = NULL;
+               myState->tofree = NULL;
+       }
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the easy case where we don't have to detoast.
  */
 static void
-tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
 {
        TStoreState *myState = (TStoreState *) self;
        MemoryContext oldcxt = MemoryContextSwitchTo(myState->cxt);
@@ -50,13 +101,77 @@ tstoreReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
        MemoryContextSwitchTo(oldcxt);
 }
 
+/*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we have to detoast any toasted values.
+ */
+static void
+tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
+{
+       TStoreState *myState = (TStoreState *) self;
+       TupleDesc       typeinfo = slot->tts_tupleDescriptor;
+       Form_pg_attribute *attrs = typeinfo->attrs;
+       int                     natts = typeinfo->natts;
+       int                     nfree;
+       int                     i;
+       MemoryContext oldcxt;
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       /*
+        * Fetch back any out-of-line datums.  We build the new datums array in
+        * myState->outvalues[] (but we can re-use the slot's isnull array).
+        * Also, remember the fetched values to free afterwards.
+        */
+       nfree = 0;
+       for (i = 0; i < natts; i++)
+       {
+               Datum           val = slot->tts_values[i];
+
+               if (!attrs[i]->attisdropped &&
+                       attrs[i]->attlen == -1 &&
+                       !slot->tts_isnull[i])
+               {
+                       if (VARATT_IS_EXTERNAL(DatumGetPointer(val)))
+                       {
+                               val = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *)
+                                                                                                               DatumGetPointer(val)));
+                               myState->tofree[nfree++] = val;
+                       }
+               }
+
+               myState->outvalues[i] = val;
+       }
+
+       /*
+        * Push the modified tuple into the tuplestore.
+        */
+       oldcxt = MemoryContextSwitchTo(myState->cxt);
+       tuplestore_putvalues(myState->tstore, typeinfo,
+                                                myState->outvalues, slot->tts_isnull);
+       MemoryContextSwitchTo(oldcxt);
+
+       /* And release any temporary detoasted values */
+       for (i = 0; i < nfree; i++)
+               pfree(DatumGetPointer(myState->tofree[i]));
+}
+
 /*
  * Clean up at end of an executor run
  */
 static void
 tstoreShutdownReceiver(DestReceiver *self)
 {
-       /* do nothing */
+       TStoreState *myState = (TStoreState *) self;
+
+       /* Release workspace if any */
+       if (myState->outvalues)
+               pfree(myState->outvalues);
+       myState->outvalues = NULL;
+       if (myState->tofree)
+               pfree(myState->tofree);
+       myState->tofree = NULL;
 }
 
 /*
@@ -76,7 +191,7 @@ CreateTuplestoreDestReceiver(void)
 {
        TStoreState *self = (TStoreState *) palloc0(sizeof(TStoreState));
 
-       self->pub.receiveSlot = tstoreReceiveSlot;
+       self->pub.receiveSlot = tstoreReceiveSlot_notoast;      /* might change */
        self->pub.rStartup = tstoreStartupReceiver;
        self->pub.rShutdown = tstoreShutdownReceiver;
        self->pub.rDestroy = tstoreDestroyReceiver;
@@ -93,11 +208,13 @@ CreateTuplestoreDestReceiver(void)
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
                                                                Tuplestorestate *tStore,
-                                                               MemoryContext tContext)
+                                                               MemoryContext tContext,
+                                                               bool detoast)
 {
        TStoreState *myState = (TStoreState *) self;
 
        Assert(myState->pub.mydest == DestTuplestore);
        myState->tstore = tStore;
        myState->cxt = tContext;
+       myState->detoast = detoast;
 }
index 77271bab9cd32eb90a850665b4c71c36b66f9813..765ae190bf032388b0ab2bd05ea52bacc7929b57 100644 (file)
@@ -1036,7 +1036,8 @@ FillPortalStore(Portal portal, bool isTopLevel)
        treceiver = CreateDestReceiver(DestTuplestore);
        SetTuplestoreDestReceiverParams(treceiver,
                                                                        portal->holdStore,
-                                                                       portal->holdContext);
+                                                                       portal->holdContext,
+                                                                       false);
 
        completionTag[0] = '\0';
 
index 9bf3136832bf13e9ae656d1876686e2475d53ddb..be5d068808124a82a5d62ca3c5dbe5e33407df1f 100644 (file)
@@ -23,6 +23,7 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
                                                                                        Tuplestorestate *tStore,
-                                                                                       MemoryContext tContext);
+                                                                                       MemoryContext tContext,
+                                                                                       bool detoast);
 
 #endif   /* TSTORE_RECEIVER_H */