/*
* If first time through, and we need a tuplestore, initialize it.
*/
- if (tuplestorestate == NULL && node->randomAccess)
+ if (tuplestorestate == NULL && node->eflags != 0)
{
tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
-
+ tuplestore_set_eflags(tuplestorestate, node->eflags);
node->tuplestorestate = (void *) tuplestorestate;
}
matstate->ss.ps.state = estate;
/*
- * We must have random access to the subplan output to do backward scan or
- * mark/restore. We also prefer to materialize the subplan output if we
- * might be called on to rewind and replay it many times. However, if none
- * of these cases apply, we can skip storing the data.
+ * We must have a tuplestore buffering the subplan output to do backward
+ * scan or mark/restore. We also prefer to materialize the subplan output
+ * if we might be called on to rewind and replay it many times. However,
+ * if none of these cases apply, we can skip storing the data.
*/
- matstate->randomAccess = (eflags & (EXEC_FLAG_REWIND |
- EXEC_FLAG_BACKWARD |
- EXEC_FLAG_MARK)) != 0;
+ matstate->eflags = (eflags & (EXEC_FLAG_REWIND |
+ EXEC_FLAG_BACKWARD |
+ EXEC_FLAG_MARK));
matstate->eof_underlying = false;
matstate->tuplestorestate = NULL;
void
ExecMaterialMarkPos(MaterialState *node)
{
- Assert(node->randomAccess);
+ Assert(node->eflags & EXEC_FLAG_MARK);
/*
* if we haven't materialized yet, just return.
void
ExecMaterialRestrPos(MaterialState *node)
{
- Assert(node->randomAccess);
+ Assert(node->eflags & EXEC_FLAG_MARK);
/*
* if we haven't materialized yet, just return.
{
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
- if (node->randomAccess)
+ if (node->eflags != 0)
{
/*
* If we haven't materialized yet, just return. If outerplan' chgParam
/*
* If subnode is to be rescanned then we forget previous stored
- * results; we have to re-read the subplan and re-store.
+ * results; we have to re-read the subplan and re-store. Also,
+ * if we told tuplestore it needn't support rescan, we lose and
+ * must re-read. (This last should not happen in common cases;
+ * else our caller lied by not passing EXEC_FLAG_REWIND to us.)
*
* Otherwise we can just rewind and rescan the stored output. The
* state of the subnode does not change.
*/
- if (((PlanState *) node)->lefttree->chgParam != NULL)
+ if (((PlanState *) node)->lefttree->chgParam != NULL ||
+ (node->eflags & EXEC_FLAG_REWIND) == 0)
{
tuplestore_end((Tuplestorestate *) node->tuplestorestate);
node->tuplestorestate = NULL;
+ if (((PlanState *) node)->lefttree->chgParam == NULL)
+ ExecReScan(((PlanState *) node)->lefttree, exprCtxt);
node->eof_underlying = false;
}
else
}
else
{
+ /* Mark before advancing, if wanted */
+ if (node->mj_ExtraMarks)
+ ExecMarkPos(innerPlan);
/* Stay in same state to fetch next inner tuple */
if (doFillInner)
{
* now we get the next inner tuple, if any. If there's none,
* advance to next outer tuple (which may be able to join to
* previously marked tuples).
+ *
+ * NB: must NOT do "extraMarks" here, since we may need to
+ * return to previously marked tuples.
*/
innerTupleSlot = ExecProcNode(innerPlan);
node->mj_InnerTupleSlot = innerTupleSlot;
break;
/*
+ * SKIPOUTER_ADVANCE: advance over an outer tuple that is
+ * known not to join to any inner tuple.
+ *
* Before advancing, we check to see if we must emit an
* outer-join fill tuple for this outer tuple.
*/
break;
/*
+ * SKIPINNER_ADVANCE: advance over an inner tuple that is
+ * known not to join to any outer tuple.
+ *
* Before advancing, we check to see if we must emit an
* outer-join fill tuple for this inner tuple.
*/
return result;
}
+ /* Mark before advancing, if wanted */
+ if (node->mj_ExtraMarks)
+ ExecMarkPos(innerPlan);
+
/*
* now we get the next inner tuple, if any
*/
return result;
}
+ /* Mark before advancing, if wanted */
+ if (node->mj_ExtraMarks)
+ ExecMarkPos(innerPlan);
+
/*
* now we get the next inner tuple, if any
*/
innerPlanState(mergestate) = ExecInitNode(innerPlan(node), estate,
eflags | EXEC_FLAG_MARK);
+ /*
+ * For certain types of inner child nodes, it is advantageous to issue
+ * MARK every time we advance past an inner tuple we will never return
+ * to. For other types, MARK on a tuple we cannot return to is a waste
+ * of cycles. Detect which case applies and set mj_ExtraMarks if we
+ * want to issue "unnecessary" MARK calls.
+ *
+ * Currently, only Material wants the extra MARKs, and it will be helpful
+ * only if eflags doesn't specify REWIND.
+ */
+ if (IsA(innerPlan(node), Material) &&
+ (eflags & EXEC_FLAG_REWIND) == 0)
+ mergestate->mj_ExtraMarks = true;
+ else
+ mergestate->mj_ExtraMarks = false;
+
#define MERGEJOIN_NSLOTS 4
/*
* maxKBytes, we dump all the tuples into a temp file and then read from that
* when needed.
*
- * When the caller requests random access to the data, we write the temp file
+ * When the caller requests backward-scan capability, we write the temp file
* in a format that allows either forward or backward scan. Otherwise, only
- * forward scan is allowed. But rewind and markpos/restorepos are allowed
- * in any case.
+ * forward scan is allowed. Rewind and markpos/restorepos are normally allowed
+ * but can be turned off via tuplestore_set_eflags; turning off both backward
+ * scan and rewind enables truncation of the tuplestore at the mark point
+ * (if any) for minimal memory usage.
*
* Because we allow reading before writing is complete, there are two
* interesting positions in the temp file: the current read position and
#include "postgres.h"
#include "access/heapam.h"
+#include "executor/executor.h"
#include "storage/buffile.h"
#include "utils/memutils.h"
#include "utils/tuplestore.h"
struct Tuplestorestate
{
TupStoreStatus status; /* enumerated value as shown above */
- bool randomAccess; /* did caller request random access? */
+ int eflags; /* capability flags */
bool interXact; /* keep open through transactions? */
long availMem; /* remaining memory available, in bytes */
BufFile *myfile; /* underlying file, or NULL if none */
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
- * If state->randomAccess is true, then the stored representation of the
- * tuple must be followed by another "unsigned int" that is a copy of the
+ * If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of
+ * the tuple must be followed by another "unsigned int" that is a copy of the
* length --- so the total tape space used is actually sizeof(unsigned int)
* more than the stored length value. This allows read-backwards. When
- * randomAccess is not true, the write/read routines may omit the extra
+ * EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra
* length word.
*
* writetup is expected to write both length words as well as the tuple
*/
-static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
+static Tuplestorestate *tuplestore_begin_common(int eflags,
bool interXact,
int maxKBytes);
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
static void dumptuples(Tuplestorestate *state);
+static void tuplestore_trim(Tuplestorestate *state, int ntuples);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
* Initialize for a tuple store operation.
*/
static Tuplestorestate *
-tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
+tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
{
Tuplestorestate *state;
state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
state->status = TSS_INMEM;
- state->randomAccess = randomAccess;
+ state->eflags = eflags;
state->interXact = interXact;
state->availMem = maxKBytes * 1024L;
state->myfile = NULL;
Tuplestorestate *
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
{
- Tuplestorestate *state = tuplestore_begin_common(randomAccess,
- interXact,
- maxKBytes);
+ Tuplestorestate *state;
+ int eflags;
+
+ /*
+ * This interpretation of the meaning of randomAccess is compatible
+ * with the pre-8.3 behavior of tuplestores.
+ */
+ eflags = randomAccess ?
+ (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) :
+ (EXEC_FLAG_REWIND | EXEC_FLAG_MARK);
+
+ state = tuplestore_begin_common(eflags, interXact, maxKBytes);
state->copytup = copytup_heap;
state->writetup = writetup_heap;
return state;
}
+/*
+ * tuplestore_set_eflags
+ *
+ * Set capability flags at a finer grain than is allowed by
+ * tuplestore_begin_xxx. This must be called before inserting any data
+ * into the tuplestore.
+ *
+ * eflags is a bitmask following the meanings used for executor node
+ * startup flags (see executor.h). tuplestore pays attention to these bits:
+ * EXEC_FLAG_REWIND need rewind to start
+ * EXEC_FLAG_BACKWARD need backward fetch
+ * EXEC_FLAG_MARK need mark/restore
+ * If tuplestore_set_eflags is not called, REWIND and MARK are allowed,
+ * and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call.
+ */
+void
+tuplestore_set_eflags(Tuplestorestate *state, int eflags)
+{
+ Assert(state->status == TSS_INMEM);
+ Assert(state->memtupcount == 0);
+
+ state->eflags = eflags;
+}
+
/*
* tuplestore_end
*
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If should_free is set, the
* caller must pfree the returned tuple when done with it.
+ *
+ * Backward scan is only allowed if randomAccess was set true or
+ * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
*/
static void *
tuplestore_gettuple(Tuplestorestate *state, bool forward,
unsigned int tuplen;
void *tup;
- Assert(forward || state->randomAccess);
+ Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD));
switch (state->status)
{
void
tuplestore_rescan(Tuplestorestate *state)
{
+ Assert(state->eflags & EXEC_FLAG_REWIND);
+
switch (state->status)
{
case TSS_INMEM:
void
tuplestore_markpos(Tuplestorestate *state)
{
+ Assert(state->eflags & EXEC_FLAG_MARK);
+
switch (state->status)
{
case TSS_INMEM:
state->markpos_current = state->current;
+ /*
+ * We can truncate the tuplestore if neither backward scan nor
+ * rewind capability are required by the caller. There will
+ * never be a need to back up past the mark point.
+ *
+ * Note: you might think we could remove all the tuples before
+ * "current", since that one is the next to be returned. However,
+ * since tuplestore_gettuple returns a direct pointer to our
+ * internal copy of the tuple, it's likely that the caller has
+ * still got the tuple just before "current" referenced in a slot.
+ * Don't free it yet.
+ */
+ if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)))
+ tuplestore_trim(state, 1);
break;
case TSS_WRITEFILE:
if (state->eof_reached)
void
tuplestore_restorepos(Tuplestorestate *state)
{
+ Assert(state->eflags & EXEC_FLAG_MARK);
+
switch (state->status)
{
case TSS_INMEM:
}
}
+/*
+ * tuplestore_trim - remove all but ntuples tuples before current
+ */
+static void
+tuplestore_trim(Tuplestorestate *state, int ntuples)
+{
+ int nremove;
+ int i;
+
+ /*
+ * We don't bother trimming temp files since it usually would mean more
+ * work than just letting them sit in kernel buffers until they age out.
+ */
+ if (state->status != TSS_INMEM)
+ return;
+
+ nremove = state->current - ntuples;
+ if (nremove <= 0)
+ return; /* nothing to do */
+ Assert(nremove <= state->memtupcount);
+
+ /* Release no-longer-needed tuples */
+ for (i = 0; i < nremove; i++)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
+ pfree(state->memtuples[i]);
+ }
+
+ /*
+ * Slide the array down and readjust pointers. This may look pretty
+ * stupid, but we expect that there will usually not be very many
+ * tuple-pointers to move, so this isn't that expensive; and it keeps
+ * a lot of other logic simple.
+ *
+ * In fact, in the current usage for merge joins, it's demonstrable that
+ * there will always be exactly one non-removed tuple; so optimize that
+ * case.
+ */
+ if (nremove + 1 == state->memtupcount)
+ state->memtuples[0] = state->memtuples[nremove];
+ else
+ memmove(state->memtuples, state->memtuples + nremove,
+ (state->memtupcount - nremove) * sizeof(void *));
+
+ state->memtupcount -= nremove;
+ state->current -= nremove;
+ state->markpos_current -= nremove;
+}
+
/*
* Tape interface routines
if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
elog(ERROR, "write failed");
- if (state->randomAccess) /* need trailing length word? */
+ if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
if (BufFileWrite(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "write failed");
if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
len - sizeof(int)) != (size_t) (len - sizeof(int)))
elog(ERROR, "unexpected end of data");
- if (state->randomAccess) /* need trailing length word? */
+ if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
if (BufFileRead(state->myfile, (void *) &tuplen,
sizeof(tuplen)) != sizeof(tuplen))
elog(ERROR, "unexpected end of data");