From 2a67d0057dc71375eae2ae80e84a2b8e24f798fd Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 9 Nov 2021 19:12:09 +0100 Subject: [PATCH] implement parameterized inner loops --- .../distributed/executor/adaptive_executor.c | 117 ++++++++++++++++-- .../distributed/executor/citus_custom_scan.c | 49 +++++++- .../distributed/planner/path_based_planner.c | 53 ++++++++ src/include/distributed/citus_custom_scan.h | 4 + 4 files changed, 210 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 0e9d96fd5..c7a0aaee2 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -209,6 +209,8 @@ typedef struct DistributedExecution /* Parameters for parameterized plans. Can be NULL. */ ParamListInfo paramListInfo; + ParamExecData *paramExecData; + List *paramExecTypes; /* list of workers involved in the execution */ List *workerList; @@ -611,6 +613,8 @@ typedef struct TaskPlacementExecution static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo, + ParamExecData *paramExecData, + List *paramExecTypes, int targetPoolSize, TupleDestination * defaultTupleDest, @@ -821,6 +825,8 @@ AdaptiveExecutor(CitusScanState *scanState) distributedPlan->modLevel, taskList, paramListInfo, + executorState->es_param_exec_vals, + executorState->es_plannedstmt->paramExecTypes, targetPoolSize, defaultTupleDest, &xactProperties, @@ -1016,7 +1022,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) DistributedExecution *execution = CreateDistributedExecution( executionParams->modLevel, executionParams->taskList, - paramListInfo, executionParams->targetPoolSize, + paramListInfo, NULL, NIL, executionParams->targetPoolSize, defaultTupleDest, &executionParams->xactProperties, executionParams->jobIdList, executionParams->localExecutionSupported); @@ -1085,6 +1091,8 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo, + ParamExecData *paramExecData, + List *paramExecTypes, int targetPoolSize, TupleDestination *defaultTupleDest, TransactionProperties *xactProperties, List *jobIdList, bool localExecutionSupported) @@ -1101,6 +1109,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->remoteTaskList = NIL; execution->paramListInfo = paramListInfo; + execution->paramExecData = paramExecData; + execution->paramExecTypes = paramExecTypes; execution->workerList = NIL; execution->sessionList = NIL; execution->targetPoolSize = targetPoolSize; @@ -4355,23 +4365,101 @@ SendNextQuery(TaskPlacementExecution *placementExecution, bool binaryResults = shardCommandExecution->binaryResults; Task *task = shardCommandExecution->task; ParamListInfo paramListInfo = execution->paramListInfo; + ParamExecData *paramExecData = execution->paramExecData; + List *paramExecTypes = execution->paramExecTypes; int querySent = 0; uint32 queryIndex = placementExecution->queryIndex; Assert(queryIndex < task->queryCount); char *queryString = TaskQueryStringAtIndex(task, queryIndex); - if (paramListInfo != NULL && !task->parametersInQueryStringResolved) + bool hasParameters = (paramListInfo != NULL) + || (paramExecData != NULL && list_length(paramExecTypes) > 0); + if (hasParameters && !task->parametersInQueryStringResolved) { - int parameterCount = paramListInfo->numParams; - Oid *parameterTypes = NULL; - const char **parameterValues = NULL; + int parameterCount = 0; + if (paramListInfo != NULL) + { + parameterCount += paramListInfo->numParams; + } + parameterCount += list_length(paramExecTypes); - /* force evaluation of bound params */ - paramListInfo = copyParamList(paramListInfo); - ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, - ¶meterValues); + Oid *parameterTypes = palloc0(parameterCount * sizeof(Oid)); + const char **parameterValues = palloc0(parameterCount * sizeof(char *)); + + Index parameterIndex = 0; + if (paramListInfo) + { + /* force evaluation of bound params */ + paramListInfo = copyParamList(paramListInfo); + + ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, + ¶meterValues); + parameterIndex += paramListInfo->numParams; + } + + Oid execParamType = 0; + const bool useOriginalCustomTypeOids = false; + Index execParamIndex = 0; + foreach_oid(execParamType, paramExecTypes) + { + + /* + * Use 0 for data types where the oid values can be different on + * the coordinator and worker nodes. Therefore, the worker nodes can + * infer the correct oid. + */ + if (execParamType >= FirstNormalObjectId && !useOriginalCustomTypeOids) + { + parameterTypes[parameterIndex] = 0; + } + else + { + parameterTypes[parameterIndex] = execParamType; + } + + /* + * If the parameter is not referenced / used (ptype == 0) and + * would otherwise have errored out inside standard_planner()), + * don't pass a value to the remote side, and pass text oid to prevent + * undetermined data type errors on workers. + */ + if (execParamType == 0) + { + parameterValues[parameterIndex] = NULL; + parameterTypes[parameterIndex] = TEXTOID; + + execParamIndex++; + parameterIndex++; + continue; + } + + /* + * If the parameter is NULL then we preserve its type, but + * don't need to evaluate its value. + */ + if (paramExecData[execParamIndex].isnull) + { + parameterValues[parameterIndex] = NULL; + + execParamIndex++; + parameterIndex++; + continue; + } + + Oid typeOutputFunctionId = 0; + bool variableLengthType = 0; + getTypeOutputInfo(execParamType, &typeOutputFunctionId, + &variableLengthType); + + parameterValues[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, + paramExecData[execParamIndex].value); + + execParamIndex++; + parameterIndex++; + } + querySent = SendRemoteCommandParams(connection, queryString, parameterCount, parameterTypes, parameterValues, binaryResults); @@ -5538,8 +5626,15 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo, { int parameterCount = paramListInfo->numParams; - *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); - *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); + if (*parameterTypes == NULL) + { + *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); + } + + if (*parameterValues == NULL) + { + *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); + } /* get parameter types and values */ for (int parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index a06f060f3..f6c5b80a7 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -28,16 +28,17 @@ #include "distributed/local_executor.h" #include "distributed/local_plan_cache.h" #include "distributed/multi_executor.h" -#include "distributed/multi_server_executor.h" #include "distributed/multi_router_planner.h" +#include "distributed/multi_server_executor.h" #include "distributed/query_stats.h" #include "distributed/subplan_execution.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_protocol.h" #include "executor/executor.h" #include "nodes/makefuncs.h" -#include "optimizer/optimizer.h" +#include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "optimizer/optimizer.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -565,6 +566,30 @@ RegenerateTaskForFasthPathQuery(Job *workerJob) } +static bool +NumberOfParametersWalker(Node *node, int *count) +{ + if (node == NULL) + { + return false; + } + else if (IsA(node, Param)) + { + *count = *count + 1; + return false; + } + return expression_tree_walker(node, NumberOfParametersWalker, count); +} + + +static int +NumberOfParameters(Node *expr) +{ + int count = 0; + NumberOfParametersWalker(expr, &count); + return count; +} + /* * AdaptiveExecutorCreateScan creates the scan state for the adaptive executor. */ @@ -583,6 +608,11 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->finishedPreScan = false; scanState->finishedRemoteScan = false; + int numParams = NumberOfParameters((Node *) scan->custom_exprs); + scanState->numParameters = numParams; + scanState->paramValues = palloc0(scanState->numParameters * sizeof(Datum)); + scanState->paramNulls = palloc0(scanState->numParameters * sizeof(bool)); + return (Node *) scanState; } @@ -691,6 +721,21 @@ CitusReScan(CustomScanState *node) EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; + /* for all the changed parameters, store them locally so we can use them during query dispatch */ + int x = -1; + while ((x = bms_next_member(node->ss.ps.chgParam, x)) >= 0) + { + /* make sure the parameter that changed is within bounds */ + Assert(scanState->numParameters > x); + + /* copy the parameter information into local state*/ + ParamExecData paramExecData = executorState->es_param_exec_vals[x]; + scanState->paramValues[x] = paramExecData.value; + scanState->paramNulls[x] = paramExecData.isnull; + + ereport(NOTICE, (errmsg("changed parameter: %d", x))); + } + if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index a6a8490d0..939ee00e8 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -22,6 +22,7 @@ #include "nodes/pathnodes.h" #include "nodes/pg_list.h" #include "nodes/plannodes.h" +#include "optimizer/paramassign.h" #include "optimizer/pathnode.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" @@ -147,6 +148,55 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, } +static Node * +TransformVarToParamExternMutator(Node *node, PlannerInfo *root) +{ + if (node == NULL) + { + return NULL; + } + + if (IsA(node, Var)) + { + Var *var = castNode(Var, node); + + /* If not to be replaced, we can just return the Var unmodified */ + if (!bms_is_member(var->varno, root->curOuterRels)) + { + return (Node *) var; + } + + Param *paramExec = replace_nestloop_param_var(root, var); + + /* TODO: figure out which Var's to replace by which parameters*/ + /* TODO: hack - insert param 1 for now */ + Param *paramExtern = makeNode(Param); + paramExtern->paramkind = PARAM_EXTERN; + + /* Exec is 0-index, Extern is 1-indexed */ + paramExtern->paramid = paramExec->paramid + 1; + + paramExtern->paramtype = paramExec->paramtype; + paramExtern->paramtypmod = paramExec->paramtypmod; + paramExtern->paramcollid = paramExec->paramcollid; + paramExtern->location = paramExec->location; + + return (Node *) paramExtern; + } + else if (IsA(node, Query)) + { + + return (Node *) query_tree_mutator((Query *) node, + TransformVarToParamExternMutator, + (void *) root, + 0); + } + + return expression_tree_mutator(node, TransformVarToParamExternMutator, + (void *) root); +} + + static Plan * CreateDistributedUnionPlan(PlannerInfo *root, RelOptInfo *rel, @@ -183,6 +233,9 @@ CreateDistributedUnionPlan(PlannerInfo *root, Query *qc = copyObject(q); UpdateRelationToShardNames((Node *) qc, relationShardList); + /* transform Var's for other varno's to parameters */ + qc = castNode(Query, TransformVarToParamExternMutator((Node *) qc, root)); + StringInfoData buf; initStringInfo(&buf); pg_get_query_def(qc, &buf); diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 92301fceb..64666e6be 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -27,6 +27,10 @@ typedef struct CitusScanState MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ + + int numParameters; /* number of parameters passed into node per rescan */ + Datum *paramValues; /* array of last passed parameter values */ + bool *paramNulls; /* array of last passed parameter null flags*/ } CitusScanState;