From 4ab250a8d35780b11149e30464ade0bed559ac9c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 13 Mar 2017 20:22:10 -0700 Subject: [PATCH] WIP: Opcode based agg evaluation. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- src/backend/executor/execCompileExpr.c | 366 +++++++++++++++++++++++++ src/backend/executor/execExpr.c | 246 +++++++++++++++++ src/backend/executor/execInterpExpr.c | 204 ++++++++++++++ src/backend/executor/nodeAgg.c | 304 +++----------------- src/include/executor/execExpr.h | 52 ++++ src/include/executor/executor.h | 2 +- src/include/executor/nodeAgg.h | 273 ++++++++++++++++++ src/include/nodes/execnodes.h | 3 + 8 files changed, 1183 insertions(+), 267 deletions(-) diff --git a/src/backend/executor/execCompileExpr.c b/src/backend/executor/execCompileExpr.c index 3477a0f088..bc1169c9a9 100644 --- a/src/backend/executor/execCompileExpr.c +++ b/src/backend/executor/execCompileExpr.c @@ -23,6 +23,7 @@ #include "catalog/objectaccess.h" #include "catalog/pg_type.h" #include "executor/execdebug.h" +#include "executor/nodeAgg.h" #include "executor/nodeSubplan.h" #include "executor/execExpr.h" #include "funcapi.h" @@ -47,6 +48,7 @@ bool jit_expressions = false; static LLVMTypeRef StructExprState; static LLVMTypeRef StructExprContext; +static LLVMTypeRef StructAggStatePerGroupData; static LLVMTypeRef TypePGFunction; static void @@ -107,6 +109,18 @@ create_types(void) LLVMStructSetBody(StructExprContext, members, lengthof(members), false); } + { + LLVMTypeRef members[3]; + + members[0] = TypeSizeT; + members[1] = LLVMInt8Type(); + members[2] = LLVMInt8Type(); + + StructAggStatePerGroupData = LLVMStructCreateNamed(LLVMGetGlobalContext(), + "struct.AggStatePerGroupData"); + LLVMStructSetBody(StructAggStatePerGroupData, members, lengthof(members), false); + } + { LLVMTypeRef params[1]; params[0] = LLVMPointerType(TypeSizeT, 0); /* FIXME: define fmgrinfo properly */ @@ -184,6 +198,23 @@ create_EvalArrayRefCheckSubscript(LLVMModuleRef mod) return fn; } +static LLVMValueRef +create_ExecAggInitGroup(LLVMModuleRef mod) +{ + LLVMTypeRef sig; + LLVMValueRef fn; + LLVMTypeRef param_types[3]; + + param_types[0] = LLVMPointerType(TypeSizeT, 0); + param_types[1] = LLVMPointerType(TypeSizeT, 0); + param_types[2] = LLVMPointerType(StructAggStatePerGroupData, 0); + + sig = LLVMFunctionType(LLVMVoidType(), param_types, lengthof(param_types), 0); + fn = LLVMAddFunction(mod, "ExecAggInitGroup", sig); + + return fn; +} + void ExecInstantiateCompiledExpr(ExprState *state, PlanState *parent) { @@ -229,6 +260,7 @@ ExecInstantiateCompiledExpr(ExprState *state, PlanState *parent) LLVMValueRef l_EvalGroupingFunc = NULL; LLVMValueRef l_EvalSubPlan = NULL; LLVMValueRef l_EvalAlternativeSubPlan = NULL; + LLVMValueRef l_AggInitGroup = NULL; /* state itself */ LLVMValueRef v_state; @@ -2320,6 +2352,340 @@ ExecInstantiateCompiledExpr(ExprState *state, PlanState *parent) break; } + case EEO_AGG_FILTER: + { + LLVMValueRef v_resnull, v_resvalue; + LLVMValueRef v_filtered; + + v_resnull = LLVMBuildLoad(builder, v_resnullp, ""); + v_resvalue = LLVMBuildLoad(builder, v_resvaluep, ""); + + v_filtered = LLVMBuildOr( + builder, + LLVMBuildICmp( + builder, LLVMIntEQ, v_resnull, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + LLVMBuildICmp( + builder, LLVMIntEQ, v_resvalue, + LLVMConstInt(TypeSizeT, 0, false), ""), + ""); + + LLVMBuildCondBr( + builder, + v_filtered, + opblocks[op->d.agg_filter.jumpfalse], + opblocks[i + 1]); + + break; + } + + case EEO_AGG_STRICT_INPUT_CHECK: + { + int nargs = op->d.agg_strict_input_check.nargs; + bool *nulls = op->d.agg_strict_input_check.nulls; + int argno; + + LLVMValueRef v_nullp; + LLVMBasicBlockRef *b_checknulls; + + v_nullp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) nulls, false), + LLVMPointerType(LLVMInt8Type(), 0), + "v_nullp"); + + /* create blocks for checking args */ + b_checknulls = palloc(sizeof(LLVMBasicBlockRef *) * nargs); + for (argno = 0; argno < nargs; argno++) + { + b_checknulls[argno] = LLVMInsertBasicBlock(opblocks[i + 1], "check-null"); + } + + LLVMBuildBr(builder, b_checknulls[0]); + + /* strict function, check for NULL args */ + for (argno = 0; argno < nargs; argno++) + { + LLVMValueRef v_argno = LLVMConstInt(LLVMInt32Type(), argno, false); + LLVMValueRef v_argisnull; + LLVMBasicBlockRef b_argnotnull; + + LLVMPositionBuilderAtEnd(builder, b_checknulls[argno]); + + if (argno + 1 == nargs) + b_argnotnull = opblocks[i + 1]; + else + b_argnotnull = b_checknulls[argno + 1]; + + v_argisnull = LLVMBuildLoad( + builder, + LLVMBuildGEP( + builder, v_nullp, &v_argno, 1, ""), + ""); + + LLVMBuildCondBr( + builder, + LLVMBuildICmp(builder, LLVMIntEQ, v_argisnull, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + opblocks[op->d.agg_strict_input_check.jumpnull], + b_argnotnull); + } + + break; + } + + case EEO_AGG_INIT_TRANS: + { + AggState *aggstate; + AggStatePerTrans pertrans; + + LLVMValueRef v_aggstatep; + LLVMValueRef v_pertransp; + LLVMValueRef v_allpergroupsp; + LLVMValueRef v_pergroupp; + LLVMValueRef v_pergroupoff; + LLVMValueRef v_notransvalue; + + LLVMBasicBlockRef b_init; + + aggstate = op->d.agg_init_trans.aggstate; + pertrans = op->d.agg_init_trans.pertrans; + + v_aggstatep = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) aggstate, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) pertrans, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_allpergroupsp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &aggstate->current_peragg, false), + LLVMPointerType(LLVMPointerType(StructAggStatePerGroupData, 0), 0), + "aggstate.allpergroups"); + + v_pergroupoff = LLVMConstInt(LLVMInt32Type(), op->d.agg_init_trans.pergroupoff, 0); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_allpergroupsp, + ""), + &v_pergroupoff, 1, ""); + + v_notransvalue = LLVMBuildLoad( + builder, + LLVMBuildStructGEP( + builder, v_pergroupp, 2, "notransvalue"), + "" + ); + + b_init = LLVMInsertBasicBlock(opblocks[i + 1], "inittrans"); + + LLVMBuildCondBr( + builder, + LLVMBuildICmp(builder, LLVMIntEQ, v_notransvalue, + LLVMConstInt(LLVMInt8Type(), 1, false), ""), + b_init, + opblocks[i + 1]); + + LLVMPositionBuilderAtEnd(builder, b_init); + if (!l_AggInitGroup) + l_AggInitGroup = create_ExecAggInitGroup(mod); + + { + LLVMValueRef params[3]; + + params[0] = v_aggstatep; + params[1] = v_pertransp; + params[2] = v_pergroupp; + + LLVMBuildCall( + builder, + l_AggInitGroup, + params, lengthof(params), + ""); + } + LLVMBuildBr(builder, opblocks[op->d.agg_init_trans.jumpnull]); + + break; + } + + case EEO_AGG_STRICT_TRANS_CHECK: + { + LLVMBuildBr( + builder, + opblocks[i + 1]); + break; + } + + case EEO_AGG_PLAIN_TRANS: + { + AggState *aggstate; + AggStatePerTrans pertrans; + FunctionCallInfo fcinfo; + + LLVMValueRef v_fcinfo; + LLVMValueRef v_fn_addr; + LLVMValueRef v_fcinfo_isnullp; + LLVMValueRef v_fcinfo_isnull; + LLVMValueRef v_argp, v_argnullp; + + LLVMValueRef v_arg0p; + LLVMValueRef v_argnull0p; + + LLVMValueRef v_transvaluep; + LLVMValueRef v_transnullp; + + LLVMValueRef v_pergroupoff; + LLVMValueRef v_setno; + + LLVMValueRef v_allpergroupsp; + LLVMValueRef v_current_setp; + LLVMValueRef v_current_pertransp; + + LLVMValueRef v_pertransp; + + LLVMValueRef v_pergroupp; + LLVMValueRef v_argno; + + + LLVMValueRef v_retval; + + aggstate = op->d.agg_plain_trans.aggstate; + pertrans = op->d.agg_plain_trans.pertrans; + + fcinfo = &pertrans->transfn_fcinfo; + + v_fcinfo = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) fcinfo, false), + LLVMPointerType(TypeSizeT, 0), + "v_fcinfo"); + + v_fn_addr = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) fcinfo->flinfo->fn_addr, false), + LLVMPointerType(TypePGFunction, 0), + "v_fn_addr"); + + v_fcinfo_isnullp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &fcinfo->isnull, false), + LLVMPointerType(LLVMInt8Type(), 0), + "v_fcinfo_isnull"); + + v_argnullp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) fcinfo->argnull, false), + LLVMPointerType(LLVMInt8Type(), 0), + "v_argnullp"); + + v_argp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (uintptr_t) fcinfo->arg, false), + LLVMPointerType(TypeSizeT, 0), + "v_arg"); + + v_pergroupoff = LLVMConstInt(LLVMInt32Type(), op->d.agg_plain_trans.pergroupoff, 0); + + v_setno = LLVMConstInt(LLVMInt32Type(), op->d.agg_plain_trans.current_set, 0); + + v_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) pertrans, false), + LLVMPointerType(TypeSizeT, 0), + ""); + + v_current_setp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &aggstate->current_set, false), + LLVMPointerType(LLVMInt32Type(), 0), + "aggstate.current_set"); + v_current_pertransp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &aggstate->curpertrans, false), + LLVMPointerType(LLVMPointerType(TypeSizeT, 0), 0), + "aggstate.curpertrans"); + v_allpergroupsp = LLVMBuildIntToPtr( + builder, + LLVMConstInt(TypeSizeT, (intptr_t) &aggstate->current_peragg, false), + LLVMPointerType(LLVMPointerType(StructAggStatePerGroupData, 0), 0), + "aggstate.allpergroups"); + + v_pergroupp = LLVMBuildGEP( + builder, + LLVMBuildLoad( + builder, + v_allpergroupsp, + ""), + &v_pergroupoff, 1, ""); + + /* set aggstate globals */ + LLVMBuildStore(builder, v_setno, v_current_setp); + LLVMBuildStore(builder, v_pertransp, v_current_pertransp); + + /* store transvalue in fcinfo->arg/argnull[0] */ + v_argno = LLVMConstInt(LLVMInt32Type(), 0, false); + v_arg0p = LLVMBuildGEP(builder, v_argp, &v_argno, 1, ""); + v_argnull0p = LLVMBuildGEP(builder, v_argnullp, &v_argno, 1, ""); + + v_transvaluep = LLVMBuildStructGEP( + builder, v_pergroupp, 0, "transvaluep"); + v_transnullp = LLVMBuildStructGEP( + builder, v_pergroupp, 1, "transnullp"); + + LLVMBuildStore( + builder, + LLVMBuildLoad( + builder, + v_transvaluep, + "transvalue"), + v_arg0p); + + LLVMBuildStore( + builder, + LLVMBuildLoad( + builder, + v_transnullp, + "transnull"), + v_argnull0p); + + /* reset fcinfo->isnull */ + LLVMBuildStore( + builder, + LLVMConstInt(LLVMInt8Type(), 0, false), + v_fcinfo_isnullp); + /* call function */ + v_retval = LLVMBuildCall( + builder, v_fn_addr, &v_fcinfo, 1, "funccall_trans"); + + v_fcinfo_isnull = LLVMBuildLoad( + builder, v_fcinfo_isnullp, ""); + + /* retrieve trans value */ + LLVMBuildStore( + builder, + v_retval, + v_transvaluep); + LLVMBuildStore( + builder, + v_fcinfo_isnull, + v_transnullp); + + LLVMBuildBr(builder, opblocks[i + 1]); + + break; + } + + case EEO_AGG_ORDERED_TRANS_DATUM: + case EEO_AGG_ORDERED_TRANS_TUPLE: case EEO_FUNCEXPR_FUSAGE: case EEO_FUNCEXPR_STRICT_FUSAGE: diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index a043c8d06e..57402e0f62 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -69,6 +69,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/date.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/timestamp.h" @@ -2273,6 +2274,251 @@ ExecInitArrayRef(ExprEvalStep *scratch, ArrayRef *aref, PlanState *parent, } } +ExprState * +ExecInitAggTrans(List *transinfos, PlanState *parent) +{ + ExprState *state = makeNode(ExprState); + ListCell *lc; + List *exprList = NIL; + ExprEvalStep scratch; + int pergroupoff = 0; + + scratch.resvalue = &state->resvalue; + scratch.resnull = &state->resnull; + +#if defined(USE_THREADED_COMPUTED_GOTO) + if (dispatch_table == NULL) + dispatch_table = (void **) DatumGetPointer(ExecEvalExpr(NULL, NULL, NULL)); +#endif + + /* + * First figure out which slots we're going to need. Out of expediency + * build one list for all expressions and then use existing code :( + */ + foreach (lc, transinfos) + { + AggTransBuildInfo *transinfo = (AggTransBuildInfo *) lfirst(lc); + + exprList = lappend(exprList, transinfo->pertrans->aggref->aggdirectargs); + exprList = lappend(exprList, transinfo->pertrans->aggref->args); + exprList = lappend(exprList, transinfo->pertrans->aggref->aggorder); + exprList = lappend(exprList, transinfo->pertrans->aggref->aggdistinct); + exprList = lappend(exprList, transinfo->pertrans->aggref->aggfilter); + } + ExecInitExprSlots(state, (Node *) exprList); + + /* + * Emit instructions for each transition value / grouping set combination. + */ + foreach (lc, transinfos) + { + AggTransBuildInfo *transinfo = (AggTransBuildInfo *) lfirst(lc); + AggStatePerTrans pertrans = transinfo->pertrans; + int numInputs = pertrans->numInputs; + int argno; + int setno; + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + ListCell *arg, *bail; + List *adjust_bailout = NIL; + bool *strictnulls = NULL; + + /* + * If filter present, emit. Do so before evaluating the input, to + * avoid potentially unneeded computations. + */ + if (pertrans->aggref->aggfilter) + { + /* evaluate filter expression */ + ExecInitExprRec(pertrans->aggref->aggfilter, parent, state, + &state->resvalue, + &state->resnull); + /* and jump out if false */ + scratch.opcode = EEO_AGG_FILTER; + scratch.d.agg_filter.jumpfalse = -1; /* adjust later */ + ExprEvalPushStep(state, &scratch); + adjust_bailout = lappend_int(adjust_bailout, + state->steps_len - 1); + } + + /* + * Evaluate aggregate input into the user of that information. + */ + argno = 0; + if (pertrans->numSortCols == 0) + { + strictnulls = fcinfo->argnull + 1; + + foreach (arg, pertrans->aggref->args) + { + TargetEntry *source_tle = (TargetEntry *) lfirst(arg); + + /* Start from 1, since the 0th arg will be the transition value */ + ExecInitExprRec(source_tle->expr, parent, state, + &fcinfo->arg[argno + 1], + &fcinfo->argnull[argno + 1]); + argno++; + } + } + else if (pertrans->numInputs == 1) + { + TargetEntry *source_tle = + (TargetEntry *) linitial(pertrans->aggref->args); + Assert(list_length(pertrans->aggref->args) == 1); + + ExecInitExprRec(source_tle->expr, parent, state, + &state->resvalue, + &state->resnull); + strictnulls = &state->resnull; + argno++; + } + else + { + Datum *values = pertrans->sortslot->tts_values; + bool *nulls = pertrans->sortslot->tts_isnull; + + strictnulls = nulls; + + foreach (arg, pertrans->aggref->args) + { + TargetEntry *source_tle = (TargetEntry *) lfirst(arg); + + ExecInitExprRec(source_tle->expr, parent, state, + &values[argno], &nulls[argno]); + argno++; + } + } + Assert(numInputs == argno); + + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. This is true for both plain and + * sorted/distinct aggregates. + */ + if (fcinfo->flinfo->fn_strict && numInputs > 0) + { + scratch.opcode = EEO_AGG_STRICT_INPUT_CHECK; + scratch.d.agg_strict_input_check.nulls = strictnulls; + scratch.d.agg_strict_input_check.jumpnull = -1; /* adjust later */ + scratch.d.agg_strict_input_check.nargs = numInputs; + ExprEvalPushStep(state, &scratch); + adjust_bailout = lappend_int(adjust_bailout, + state->steps_len - 1); + } + + /* and call transition function (once for each grouping set) */ + for (setno = 0; setno < transinfo->numGroupingSets; setno++) + { + int adjust_init_jumpnull = -1; + int adjust_strict_jumpnull = -1; + + /* + * If the initial value for the transition state doesn't exist in the + * pg_aggregate table then we will let the first non-NULL value + * returned from the outer procNode become the initial value. (This is + * useful for aggregates like max() and min().) The noTransValue flag + * signals that we still need to do this. + */ + if (pertrans->numSortCols == 0 && + fcinfo->flinfo->fn_strict && + pertrans->initValueIsNull) + { + scratch.opcode = EEO_AGG_INIT_TRANS; + scratch.d.agg_init_trans.aggstate = transinfo->aggstate; + scratch.d.agg_init_trans.pertrans = pertrans; + scratch.d.agg_init_trans.pergroupoff = pergroupoff; + scratch.d.agg_init_trans.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, &scratch); + + adjust_init_jumpnull = state->steps_len - 1; + } + + if (pertrans->numSortCols == 0 && + fcinfo->flinfo->fn_strict) + { + scratch.opcode = EEO_AGG_STRICT_TRANS_CHECK; + scratch.d.agg_strict_trans_check.aggstate = transinfo->aggstate; + scratch.d.agg_strict_trans_check.pergroupoff = pergroupoff; + scratch.d.agg_strict_trans_check.jumpnull = -1; /* adjust later */ + ExprEvalPushStep(state, &scratch); + + /* + * Note, we don't push into adjust_bailout here - those jump + * to the end of all transition value computations. + */ + adjust_strict_jumpnull = state->steps_len - 1; + } + + if (pertrans->numSortCols == 0) + { + scratch.opcode = EEO_AGG_PLAIN_TRANS; + scratch.d.agg_plain_trans.aggstate = transinfo->aggstate; + scratch.d.agg_plain_trans.pertrans = pertrans; + scratch.d.agg_plain_trans.current_set = setno; + scratch.d.agg_plain_trans.pergroupoff = pergroupoff; + ExprEvalPushStep(state, &scratch); + } + else if (pertrans->numInputs == 1) + { + scratch.opcode = EEO_AGG_ORDERED_TRANS_DATUM; + scratch.d.agg_ordered_trans.aggstate = transinfo->aggstate; + scratch.d.agg_ordered_trans.pertrans = pertrans; + scratch.d.agg_ordered_trans.current_set = setno; + scratch.d.agg_ordered_trans.pergroupoff = pergroupoff; + ExprEvalPushStep(state, &scratch); + } + else + { + scratch.opcode = EEO_AGG_ORDERED_TRANS_TUPLE; + scratch.d.agg_ordered_trans.aggstate = transinfo->aggstate; + scratch.d.agg_ordered_trans.pertrans = pertrans; + scratch.d.agg_ordered_trans.current_set = setno; + scratch.d.agg_ordered_trans.pergroupoff = pergroupoff; + ExprEvalPushStep(state, &scratch); + } + + if (adjust_init_jumpnull != -1 ) + { + ExprEvalStep *as = &state->steps[adjust_init_jumpnull]; + Assert(as->d.agg_init_trans.jumpnull == -1); + as->d.agg_init_trans.jumpnull = state->steps_len; + } + + if (adjust_strict_jumpnull != -1 ) + { + ExprEvalStep *as = &state->steps[adjust_strict_jumpnull]; + Assert(as->d.agg_strict_trans_check.jumpnull == -1); + as->d.agg_strict_trans_check.jumpnull = state->steps_len; + } + + pergroupoff++; + } + + /* adjust early bail out jump target(s) */ + foreach (bail, adjust_bailout) + { + ExprEvalStep *as = &state->steps[lfirst_int(bail)]; + if (as->opcode == EEO_AGG_FILTER) + { + Assert(as->d.agg_filter.jumpfalse == -1); + as->d.agg_filter.jumpfalse = state->steps_len; + } + else if (as->opcode == EEO_AGG_STRICT_INPUT_CHECK) + { + Assert(as->d.agg_strict_input_check.jumpnull == -1); + as->d.agg_strict_input_check.jumpnull = state->steps_len; + } + } + } + + scratch.resvalue = NULL; + scratch.resnull = NULL; + scratch.opcode = EEO_DONE; + ExprEvalPushStep(state, &scratch); + + ExecInstantiateExpr(state, parent); + + return state; +} /* * Helper for preparing ArrayRef expressions for evaluation: is expr a nested diff --git a/src/backend/executor/execInterpExpr.c b/src/backend/executor/execInterpExpr.c index d1d714bf85..e9f8be877d 100644 --- a/src/backend/executor/execInterpExpr.c +++ b/src/backend/executor/execInterpExpr.c @@ -59,12 +59,14 @@ #include "executor/nodeSubplan.h" #include "executor/execExpr.h" #include "funcapi.h" +#include "utils/memutils.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "parser/parsetree.h" #include "pgstat.h" #include "utils/builtins.h" #include "utils/date.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" #include "utils/typcache.h" @@ -297,6 +299,13 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEO_WINDOW_FUNC, &&CASE_EEO_SUBPLAN, &&CASE_EEO_ALTERNATIVE_SUBPLAN, + &&CASE_EEO_AGG_FILTER, + &&CASE_EEO_AGG_STRICT_INPUT_CHECK, + &&CASE_EEO_AGG_INIT_TRANS, + &&CASE_EEO_AGG_STRICT_TRANS_CHECK, + &&CASE_EEO_AGG_PLAIN_TRANS, + &&CASE_EEO_AGG_ORDERED_TRANS_DATUM, + &&CASE_EEO_AGG_ORDERED_TRANS_TUPLE, &&CASE_EEO_LAST }; @@ -1442,12 +1451,182 @@ starteval: EEO_DISPATCH(op); } + EEO_CASE(EEO_AGG_FILTER) + { + if (*op->resnull || !DatumGetBool(*op->resvalue)) + { + Assert(op->d.agg_filter.jumpfalse != -1); + op = &state->steps[op->d.agg_filter.jumpfalse]; + EEO_DISPATCH_DIRECT(op); + } + else + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_STRICT_INPUT_CHECK) + { + int argno; + bool *nulls = op->d.agg_strict_input_check.nulls; + + Assert(op->d.agg_strict_input_check.jumpnull != -1); + + for (argno = 0; argno < op->d.agg_strict_input_check.nargs; argno++) + { + if (nulls[argno]) + { + op = &state->steps[op->d.agg_strict_input_check.jumpnull]; + EEO_DISPATCH_DIRECT(op); + } + } + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_INIT_TRANS) + { + AggState *aggstate; + AggStatePerGroup allpergroups; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_plain_trans.aggstate; + allpergroups = aggstate->current_peragg; + pergroup = &allpergroups[op->d.agg_init_trans.pergroupoff]; + + if (pergroup->noTransValue) + { + AggStatePerTrans pertrans = op->d.agg_init_trans.pertrans; + + ExecAggInitGroup(aggstate, pertrans, pergroup); + + op = &state->steps[op->d.agg_init_trans.jumpnull]; + EEO_DISPATCH_DIRECT(op); + } + + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_STRICT_TRANS_CHECK) + { + AggState *aggstate; + AggStatePerGroup allpergroups; + AggStatePerGroup pergroup; + + aggstate = op->d.agg_plain_trans.aggstate; + allpergroups = aggstate->current_peragg; + pergroup = &allpergroups[op->d.agg_strict_trans_check.pergroupoff]; + + Assert(op->d.agg_strict_trans_check.jumpnull != -1); + + if (pergroup->transValueIsNull) + { + elog(ERROR, "blarg"); + op = &state->steps[op->d.agg_strict_trans_check.jumpnull]; + EEO_DISPATCH_DIRECT(op); + } + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_PLAIN_TRANS) + { + AggState *aggstate; + AggStatePerTrans pertrans; + AggStatePerGroup allpergroups; + AggStatePerGroup pergroup; + FunctionCallInfo fcinfo; + MemoryContext oldContext; + Datum newVal; + + aggstate = op->d.agg_plain_trans.aggstate; + pertrans = op->d.agg_plain_trans.pertrans; + allpergroups = aggstate->current_peragg; + pergroup = &allpergroups[op->d.agg_plain_trans.pergroupoff]; + + fcinfo = &pertrans->transfn_fcinfo; + + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + aggstate->current_set = op->d.agg_plain_trans.current_set; + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + fcinfo->arg[0] = pergroup->transValue; + fcinfo->argnull[0] = pergroup->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * free the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. Also, if transfn returned a + * pointer to a R/W expanded object that is already a child of the + * aggcontext, assume we can adopt that value without copying it. + */ + if (!pertrans->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + if (DatumIsReadWriteExpandedObject(newVal, + false, + pertrans->transtypeLen) && + MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) + /* do nothing */ ; + else + newVal = datumCopy(newVal, + pertrans->transtypeByVal, + pertrans->transtypeLen); + } + if (!pergroup->transValueIsNull) + { + if (DatumIsReadWriteExpandedObject(pergroup->transValue, + false, + pertrans->transtypeLen)) + DeleteExpandedObject(pergroup->transValue); + else + pfree(DatumGetPointer(pergroup->transValue)); + } + } + + + pergroup->transValue = newVal; + pergroup->transValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); + + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_ORDERED_TRANS_DATUM) + { + AggStatePerTrans pertrans = op->d.agg_plain_trans.pertrans; + int setno = op->d.agg_plain_trans.current_set; + + tuplesort_putdatum(pertrans->sortstates[setno], + *op->resvalue, *op->resnull); + EEO_DISPATCH(op); + } + + EEO_CASE(EEO_AGG_ORDERED_TRANS_TUPLE) + { + AggStatePerTrans pertrans = op->d.agg_plain_trans.pertrans; + int setno = op->d.agg_plain_trans.current_set; + + ExecClearTuple(pertrans->sortslot); + pertrans->sortslot->tts_nvalid = pertrans->numInputs; + ExecStoreVirtualTuple(pertrans->sortslot); + tuplesort_puttupleslot(pertrans->sortstates[setno], pertrans->sortslot); + + EEO_DISPATCH(op); + } + EEO_CASE(EEO_LAST) { /* unreachable */ Assert(false); goto out; } + } out: *isnull = state->resnull; @@ -3244,6 +3423,31 @@ ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext) *op->resvalue = PointerGetDatum(dtuple); } +void +ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup) +{ + FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + MemoryContext oldContext; + + /* + * transValue has not been initialized. This is the first non-NULL + * input value. We use it as the initial value for transValue. (We + * already checked that the agg's input type is binary-compatible + * with its transtype, so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We + * do not need to pfree the old transValue, since it's NULL. + */ + oldContext = MemoryContextSwitchTo( + aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + pergroup->transValue = datumCopy(fcinfo->arg[1], + pertrans->transtypeByVal, + pertrans->transtypeLen); + pergroup->transValueIsNull = false; + pergroup->noTransValue = false; + MemoryContextSwitchTo(oldContext); +} + /* * Callback function to release a tupdesc refcount at expression tree shutdown */ diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 93ba676f59..11e110f6bd 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -175,272 +175,6 @@ #include "utils/datum.h" -/* - * AggStatePerTransData - per aggregate state value information - * - * Working state for updating the aggregate's state value, by calling the - * transition function with an input row. This struct does not store the - * information needed to produce the final aggregate result from the transition - * state, that's stored in AggStatePerAggData instead. This separation allows - * multiple aggregate results to be produced from a single state value. - */ -typedef struct AggStatePerTransData -{ - /* - * These values are set up during ExecInitAgg() and do not change - * thereafter: - */ - - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple Aggref's sharing the same state value, as long as - * the inputs and transition function are identical. This points to the - * first one of them. - */ - Aggref *aggref; - - /* - * Nominal number of arguments for aggregate function. For plain aggs, - * this excludes any ORDER BY expressions. For ordered-set aggs, this - * counts both the direct and aggregated (ORDER BY) arguments. - */ - int numArguments; - - /* - * Number of aggregated input columns. This includes ORDER BY expressions - * in both the plain-agg and ordered-set cases. Ordered-set direct args - * are not counted, though. - */ - int numInputs; - - /* offset of input columns in AggState->evalslot */ - int inputoff; - - /* - * Number of aggregated input columns to pass to the transfn. This - * includes the ORDER BY columns for ordered-set aggs, but not for plain - * aggs. (This doesn't count the transition state value!) - */ - int numTransInputs; - - /* Oid of the state transition or combine function */ - Oid transfn_oid; - - /* Oid of the serialization function or InvalidOid */ - Oid serialfn_oid; - - /* Oid of the deserialization function or InvalidOid */ - Oid deserialfn_oid; - - /* Oid of state value's datatype */ - Oid aggtranstype; - - /* ExprStates of the FILTER and argument expressions. */ - ExprState *aggfilter; /* state of FILTER expression, if any */ - List *aggdirectargs; /* states of direct-argument expressions */ - - /* - * fmgr lookup data for transition function or combine function. Note in - * particular that the fn_strict flag is kept here. - */ - FmgrInfo transfn; - - /* fmgr lookup data for serialization function */ - FmgrInfo serialfn; - - /* fmgr lookup data for deserialization function */ - FmgrInfo deserialfn; - - /* Input collation derived for aggregate */ - Oid aggCollation; - - /* number of sorting columns */ - int numSortCols; - - /* number of sorting columns to consider in DISTINCT comparisons */ - /* (this is either zero or the same as numSortCols) */ - int numDistinctCols; - - /* deconstructed sorting information (arrays of length numSortCols) */ - AttrNumber *sortColIdx; - Oid *sortOperators; - Oid *sortCollations; - bool *sortNullsFirst; - - /* - * fmgr lookup data for input columns' equality operators --- only - * set/used when aggregate has DISTINCT flag. Note that these are in - * order of sort column index, not parameter index. - */ - FmgrInfo *equalfns; /* array of length numDistinctCols */ - - /* - * initial value from pg_aggregate entry - */ - Datum initValue; - bool initValueIsNull; - - /* - * We need the len and byval info for the agg's input and transition data - * types in order to know how to copy/delete values. - * - * Note that the info for the input type is used only when handling - * DISTINCT aggs with just one argument, so there is only one input type. - */ - int16 inputtypeLen, - transtypeLen; - bool inputtypeByVal, - transtypeByVal; - - /* - * Stuff for evaluation of aggregate inputs in cases where the aggregate - * requires sorted input. The arguments themselves will be evaluated via - * AggState->evalslot/evalproj for all aggregates at once, but we only - * want to sort the relevant columns for individual aggregates. - */ - TupleDesc sortdesc; /* descriptor of input tuples */ - - /* - * Slots for holding the evaluated input arguments. These are set up - * during ExecInitAgg() and then used for each input row requiring - * processing besides what's done in AggState->evalproj. - */ - TupleTableSlot *sortslot; /* current input tuple */ - TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ - - /* - * These values are working state that is initialized at the start of an - * input tuple group and updated for each input tuple. - * - * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input - * values straight to the transition function. If it's DISTINCT or - * requires ORDER BY, we pass the input values into a Tuplesort object; - * then at completion of the input tuple group, we scan the sorted values, - * eliminate duplicates if needed, and run the transition function on the - * rest. - * - * We need a separate tuplesort for each grouping set. - */ - - Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ - - /* - * This field is a pre-initialized FunctionCallInfo struct used for - * calling this aggregate's transfn. We save a few cycles per row by not - * re-initializing the unchanging fields; which isn't much, but it seems - * worth the extra space consumption. - */ - FunctionCallInfoData transfn_fcinfo; - - /* Likewise for serialization and deserialization functions */ - FunctionCallInfoData serialfn_fcinfo; - - FunctionCallInfoData deserialfn_fcinfo; -} AggStatePerTransData; - -/* - * AggStatePerAggData - per-aggregate information - * - * This contains the information needed to call the final function, to produce - * a final aggregate result from the state value. If there are multiple - * identical Aggrefs in the query, they can all share the same per-agg data. - * - * These values are set up during ExecInitAgg() and do not change thereafter. - */ -typedef struct AggStatePerAggData -{ - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple identical Aggref's sharing the same per-agg. This - * points to the first one of them. - */ - Aggref *aggref; - - /* index to the state value which this agg should use */ - int transno; - - /* Optional Oid of final function (may be InvalidOid) */ - Oid finalfn_oid; - - /* - * fmgr lookup data for final function --- only valid when finalfn_oid oid - * is not InvalidOid. - */ - FmgrInfo finalfn; - - /* - * Number of arguments to pass to the finalfn. This is always at least 1 - * (the transition state value) plus any ordered-set direct args. If the - * finalfn wants extra args then we pass nulls corresponding to the - * aggregated input columns. - */ - int numFinalArgs; - - /* - * We need the len and byval info for the agg's result data type in order - * to know how to copy/delete values. - */ - int16 resulttypeLen; - bool resulttypeByVal; - -} AggStatePerAggData; - -/* - * AggStatePerGroupData - per-aggregate-per-group working state - * - * These values are working state that is initialized at the start of - * an input tuple group and updated for each input tuple. - * - * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these - * structs (pointed to by aggstate->pergroup); we re-use the array for - * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the - * hash table contains an array of these structs for each tuple group. - * - * Logically, the sortstate field belongs in this struct, but we do not - * keep it here for space reasons: we don't support DISTINCT aggregates - * in AGG_HASHED mode, so there's no reason to use up a pointer field - * in every entry of the hashtable. - */ -typedef struct AggStatePerGroupData -{ - Datum transValue; /* current transition value */ - bool transValueIsNull; - - bool noTransValue; /* true if transValue not set yet */ - - /* - * Note: noTransValue initially has the same value as transValueIsNull, - * and if true both are cleared to false at the same time. They are not - * the same though: if transfn later returns a NULL, we want to keep that - * NULL and not auto-replace it with a later input value. Only the first - * non-NULL input will be auto-substituted. - */ -} AggStatePerGroupData; - -/* - * AggStatePerPhaseData - per-grouping-set-phase state - * - * Grouping sets are divided into "phases", where a single phase can be - * processed in one pass over the input. If there is more than one phase, then - * at the end of input from the current phase, state is reset and another pass - * taken over the data which has been re-sorted in the mean time. - * - * Accordingly, each phase specifies a list of grouping sets and group clause - * information, plus each phase after the first also has a sort order. - */ -typedef struct AggStatePerPhaseData -{ - int numsets; /* number of grouping sets (or 0) */ - int *gset_lengths; /* lengths of grouping sets */ - Bitmapset **grouped_cols; /* column groupings for rollup */ - FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ - Agg *aggnode; /* Agg node for phase data */ - Sort *sortnode; /* Sort node for input ordering for phase */ -} AggStatePerPhaseData; - - static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, @@ -846,6 +580,18 @@ advance_transition_function(AggState *aggstate, static inline void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { +//#define NEW_AGG +#ifdef NEW_AGG + if (aggstate->evaltrans) + { + bool isnull; + aggstate->current_peragg = pergroup; + ExecEvalExprSwitchContext(aggstate->evaltrans, aggstate->tmpcontext, + &isnull); + aggstate->current_peragg = NULL; + return; + } +#else int transno; int setno = 0; int numGroupingSets = Max(aggstate->phase->numsets, 1); @@ -856,6 +602,16 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) AggStatePerTrans pertrans; AggStatePerGroup pergroupstate = &pergroup[0]; + if (aggstate->evaltrans) + { + bool isnull; + aggstate->current_peragg = pergroup; + ExecEvalExprSwitchContext(aggstate->evaltrans, aggstate->tmpcontext, + &isnull); + aggstate->current_peragg = NULL; + return; + } + /* compute input for all aggregates */ if (aggstate->evalproj) aggstate->evalslot = ExecProject(aggstate->evalproj); @@ -961,6 +717,7 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) } } } +#endif } /* @@ -3045,6 +2802,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) NULL); ExecSetSlotDescriptor(aggstate->evalslot, aggstate->evaldesc); + /* build expression doing all the transition stuff at once */ + combined_inputeval = NIL; + for (transno = 0, pertrans = &pertransstates[0]; + transno < aggstate->numtrans; transno++, pertrans++) + { + AggTransBuildInfo *build = palloc0(sizeof(AggTransBuildInfo)); + + build->aggstate = aggstate; + build->pertrans = pertrans; + build->numGroupingSets = Max(aggstate->phase->numsets, 1); + combined_inputeval = lappend(combined_inputeval, build); + } + + aggstate->evaltrans = ExecInitAggTrans(combined_inputeval, &aggstate->ss.ps); + return aggstate; } diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 7843d77e6f..dba38137c5 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -15,6 +15,7 @@ #define EXEC_EXPR_H #include "nodes/execnodes.h" +#include "executor/nodeAgg.h" struct ArrayRefState; @@ -99,6 +100,13 @@ typedef enum ExprEvalOp EEO_WINDOW_FUNC, EEO_SUBPLAN, EEO_ALTERNATIVE_SUBPLAN, + EEO_AGG_FILTER, + EEO_AGG_STRICT_INPUT_CHECK, + EEO_AGG_INIT_TRANS, + EEO_AGG_STRICT_TRANS_CHECK, + EEO_AGG_PLAIN_TRANS, + EEO_AGG_ORDERED_TRANS_DATUM, + EEO_AGG_ORDERED_TRANS_TUPLE, EEO_LAST } ExprEvalOp; @@ -386,6 +394,49 @@ typedef struct ExprEvalStep { int last_var; } fetch; + + struct + { + int jumpfalse; + } agg_filter; + + struct + { + bool *nulls; + int nargs; + int jumpnull; + } agg_strict_input_check; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + int pergroupoff; + int jumpnull; + } agg_init_trans; + + struct + { + AggState *aggstate; + int pergroupoff; + int jumpnull; + } agg_strict_trans_check; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + int current_set; + int pergroupoff; + } agg_plain_trans; + + struct + { + AggState *aggstate; + AggStatePerTrans pertrans; + int current_set; + int pergroupoff; + } agg_ordered_trans; } d; } ExprEvalStep; @@ -453,5 +504,6 @@ extern void ExecEvalGroupingFunc(ExprState *state, ExprEvalStep *op); extern void ExecEvalSubPlan(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalAlternativeSubPlan(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalWholeRowVar(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecAggInitGroup(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroup); #endif /* EXEC_EXPR_H */ diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index c8def4ceff..aac87c6b87 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -275,7 +275,7 @@ extern ExprState *ExecPrepareQual(List *qual, EState *estate); extern ExprState *ExecPrepareCheck(List *qual, EState *estate); extern ExprState *ExecInitQual(List *qual, PlanState *parent); extern ExprState *ExecInitCheck(List *qual, PlanState *parent); - +extern ExprState *ExecInitAggTrans(List *transinfos, PlanState *parent); extern bool ExecCheck(ExprState *state, ExprContext *context); extern int ExecTargetListLength(List *targetlist); extern int ExecCleanTargetListLength(List *targetlist); diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index d2fee52e12..a0d5c4a105 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -16,6 +16,279 @@ #include "nodes/execnodes.h" + +/* + * AggStatePerTransData - per aggregate state value information + * + * Working state for updating the aggregate's state value, by calling the + * transition function with an input row. This struct does not store the + * information needed to produce the final aggregate result from the transition + * state, that's stored in AggStatePerAggData instead. This separation allows + * multiple aggregate results to be produced from a single state value. + */ +typedef struct AggStatePerTransData +{ + /* + * These values are set up during ExecInitAgg() and do not change + * thereafter: + */ + + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple Aggref's sharing the same state value, as long as + * the inputs and transition function are identical. This points to the + * first one of them. + */ + Aggref *aggref; + + /* + * Nominal number of arguments for aggregate function. For plain aggs, + * this excludes any ORDER BY expressions. For ordered-set aggs, this + * counts both the direct and aggregated (ORDER BY) arguments. + */ + int numArguments; + + /* + * Number of aggregated input columns. This includes ORDER BY expressions + * in both the plain-agg and ordered-set cases. Ordered-set direct args + * are not counted, though. + */ + int numInputs; + + /* offset of input columns in AggState->evalslot */ + int inputoff; + + /* + * Number of aggregated input columns to pass to the transfn. This + * includes the ORDER BY columns for ordered-set aggs, but not for plain + * aggs. (This doesn't count the transition state value!) + */ + int numTransInputs; + + /* Oid of the state transition or combine function */ + Oid transfn_oid; + + /* Oid of the serialization function or InvalidOid */ + Oid serialfn_oid; + + /* Oid of the deserialization function or InvalidOid */ + Oid deserialfn_oid; + + /* Oid of state value's datatype */ + Oid aggtranstype; + + /* ExprStates of the FILTER and argument expressions. */ + ExprState *aggfilter; /* state of FILTER expression, if any */ + List *aggdirectargs; /* states of direct-argument expressions */ + + /* + * fmgr lookup data for transition function or combine function. Note in + * particular that the fn_strict flag is kept here. + */ + FmgrInfo transfn; + + /* fmgr lookup data for serialization function */ + FmgrInfo serialfn; + + /* fmgr lookup data for deserialization function */ + FmgrInfo deserialfn; + + /* Input collation derived for aggregate */ + Oid aggCollation; + + /* number of sorting columns */ + int numSortCols; + + /* number of sorting columns to consider in DISTINCT comparisons */ + /* (this is either zero or the same as numSortCols) */ + int numDistinctCols; + + /* deconstructed sorting information (arrays of length numSortCols) */ + AttrNumber *sortColIdx; + Oid *sortOperators; + Oid *sortCollations; + bool *sortNullsFirst; + + /* + * fmgr lookup data for input columns' equality operators --- only + * set/used when aggregate has DISTINCT flag. Note that these are in + * order of sort column index, not parameter index. + */ + FmgrInfo *equalfns; /* array of length numDistinctCols */ + + /* + * initial value from pg_aggregate entry + */ + Datum initValue; + bool initValueIsNull; + + /* + * We need the len and byval info for the agg's input and transition data + * types in order to know how to copy/delete values. + * + * Note that the info for the input type is used only when handling + * DISTINCT aggs with just one argument, so there is only one input type. + */ + int16 inputtypeLen, + transtypeLen; + bool inputtypeByVal, + transtypeByVal; + + /* + * Stuff for evaluation of aggregate inputs in cases where the aggregate + * requires sorted input. The arguments themselves will be evaluated via + * AggState->evalslot/evalproj for all aggregates at once, but we only + * want to sort the relevant columns for individual aggregates. + */ + TupleDesc sortdesc; /* descriptor of input tuples */ + + /* + * Slots for holding the evaluated input arguments. These are set up + * during ExecInitAgg() and then used for each input row requiring + * processing besides what's done in AggState->evalproj. + */ + TupleTableSlot *sortslot; /* current input tuple */ + TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ + + /* + * These values are working state that is initialized at the start of an + * input tuple group and updated for each input tuple. + * + * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input + * values straight to the transition function. If it's DISTINCT or + * requires ORDER BY, we pass the input values into a Tuplesort object; + * then at completion of the input tuple group, we scan the sorted values, + * eliminate duplicates if needed, and run the transition function on the + * rest. + * + * We need a separate tuplesort for each grouping set. + */ + + Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ + + /* + * This field is a pre-initialized FunctionCallInfo struct used for + * calling this aggregate's transfn. We save a few cycles per row by not + * re-initializing the unchanging fields; which isn't much, but it seems + * worth the extra space consumption. + */ + FunctionCallInfoData transfn_fcinfo; + + /* Likewise for serialization and deserialization functions */ + FunctionCallInfoData serialfn_fcinfo; + + FunctionCallInfoData deserialfn_fcinfo; +} AggStatePerTransData; + +/* + * AggStatePerAggData - per-aggregate information + * + * This contains the information needed to call the final function, to produce + * a final aggregate result from the state value. If there are multiple + * identical Aggrefs in the query, they can all share the same per-agg data. + * + * These values are set up during ExecInitAgg() and do not change thereafter. + */ +typedef struct AggStatePerAggData +{ + /* + * Link to an Aggref expr this state value is for. + * + * There can be multiple identical Aggref's sharing the same per-agg. This + * points to the first one of them. + */ + Aggref *aggref; + + /* index to the state value which this agg should use */ + int transno; + + /* Optional Oid of final function (may be InvalidOid) */ + Oid finalfn_oid; + + /* + * fmgr lookup data for final function --- only valid when finalfn_oid oid + * is not InvalidOid. + */ + FmgrInfo finalfn; + + /* + * Number of arguments to pass to the finalfn. This is always at least 1 + * (the transition state value) plus any ordered-set direct args. If the + * finalfn wants extra args then we pass nulls corresponding to the + * aggregated input columns. + */ + int numFinalArgs; + + /* + * We need the len and byval info for the agg's result data type in order + * to know how to copy/delete values. + */ + int16 resulttypeLen; + bool resulttypeByVal; + +} AggStatePerAggData; + +/* + * AggStatePerGroupData - per-aggregate-per-group working state + * + * These values are working state that is initialized at the start of + * an input tuple group and updated for each input tuple. + * + * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these + * structs (pointed to by aggstate->pergroup); we re-use the array for + * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the + * hash table contains an array of these structs for each tuple group. + * + * Logically, the sortstate field belongs in this struct, but we do not + * keep it here for space reasons: we don't support DISTINCT aggregates + * in AGG_HASHED mode, so there's no reason to use up a pointer field + * in every entry of the hashtable. + */ +typedef struct AggStatePerGroupData +{ + Datum transValue; /* current transition value */ + bool transValueIsNull; + + bool noTransValue; /* true if transValue not set yet */ + + /* + * Note: noTransValue initially has the same value as transValueIsNull, + * and if true both are cleared to false at the same time. They are not + * the same though: if transfn later returns a NULL, we want to keep that + * NULL and not auto-replace it with a later input value. Only the first + * non-NULL input will be auto-substituted. + */ +} AggStatePerGroupData; + +/* + * AggStatePerPhaseData - per-grouping-set-phase state + * + * Grouping sets are divided into "phases", where a single phase can be + * processed in one pass over the input. If there is more than one phase, then + * at the end of input from the current phase, state is reset and another pass + * taken over the data which has been re-sorted in the mean time. + * + * Accordingly, each phase specifies a list of grouping sets and group clause + * information, plus each phase after the first also has a sort order. + */ +typedef struct AggStatePerPhaseData +{ + int numsets; /* number of grouping sets (or 0) */ + int *gset_lengths; /* lengths of grouping sets */ + Bitmapset **grouped_cols; /* column groupings for rollup */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + Agg *aggnode; /* Agg node for phase data */ + Sort *sortnode; /* Sort node for input ordering for phase */ +} AggStatePerPhaseData; + +typedef struct AggTransBuildInfo +{ + AggState *aggstate; + AggStatePerTrans pertrans; + int numGroupingSets; /* 1... */ +} AggTransBuildInfo; + extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags); extern TupleTableSlot *ExecAgg(AggState *node); extern void ExecEndAgg(AggState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b08ba73749..a6a1675e54 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1677,6 +1677,7 @@ typedef struct AggState bool agg_done; /* indicates completion of Agg scan */ int projected_set; /* The last projected grouping set */ int current_set; /* The current grouping set being evaluated */ + AggStatePerGroup current_peragg; /* The group values being evaluated */ Bitmapset *grouped_cols; /* grouped cols in current projection */ List *all_grouped_cols; /* list of all grouped cols in DESC * order */ @@ -1702,6 +1703,8 @@ typedef struct AggState TupleTableSlot *evalslot; /* slot for agg inputs */ ProjectionInfo *evalproj; /* projection machinery */ TupleDesc evaldesc; /* descriptor of input tuples */ + ExprState *evaltrans; /* evaluation of transition functions */ + } AggState; /* ---------------- -- 2.39.5