diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index f6c5b80a7..9c3713270 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -608,11 +608,6 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->finishedPreScan = false; scanState->finishedRemoteScan = false; - int numParams = NumberOfParameters((Node *) scan->custom_exprs); - scanState->numParameters = numParams; - scanState->paramValues = palloc0(scanState->numParameters * sizeof(Datum)); - scanState->paramNulls = palloc0(scanState->numParameters * sizeof(bool)); - return (Node *) scanState; } @@ -721,21 +716,6 @@ CitusReScan(CustomScanState *node) EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - /* for all the changed parameters, store them locally so we can use them during query dispatch */ - int x = -1; - while ((x = bms_next_member(node->ss.ps.chgParam, x)) >= 0) - { - /* make sure the parameter that changed is within bounds */ - Assert(scanState->numParameters > x); - - /* copy the parameter information into local state*/ - ParamExecData paramExecData = executorState->es_param_exec_vals[x]; - scanState->paramValues[x] = paramExecData.value; - scanState->paramNulls[x] = paramExecData.isnull; - - ereport(NOTICE, (errmsg("changed parameter: %d", x))); - } - if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), diff --git a/src/backend/distributed/planner/path_based_planner.c b/src/backend/distributed/planner/path_based_planner.c index 939ee00e8..abc37fbf8 100644 --- a/src/backend/distributed/planner/path_based_planner.c +++ b/src/backend/distributed/planner/path_based_planner.c @@ -66,7 +66,7 @@ static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint Expr *partitionValue, Oid sampleRelid, List *subPaths); static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, - List *clauses); + List *clauses, Index **varnoMapping); static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath); static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath); @@ -148,8 +148,15 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId, } +typedef struct TransformVarToParamExternMutatorContext +{ + PlannerInfo *root; + Index *varnoMapping; +} TransformVarToParamExternMutatorContext; + + static Node * -TransformVarToParamExternMutator(Node *node, PlannerInfo *root) +TransformVarToParamExternMutator(Node *node, TransformVarToParamExternMutatorContext *context) { if (node == NULL) { @@ -160,13 +167,20 @@ TransformVarToParamExternMutator(Node *node, PlannerInfo *root) { Var *var = castNode(Var, node); + Index originalVarNo = context->varnoMapping[var->varno]; + if (originalVarNo == 0) + { + /* no mapping was required */ + originalVarNo = var->varno; + } + Assert(originalVarNo > 0); /* If not to be replaced, we can just return the Var unmodified */ - if (!bms_is_member(var->varno, root->curOuterRels)) + if (!bms_is_member(originalVarNo, context->root->curOuterRels)) { return (Node *) var; } - Param *paramExec = replace_nestloop_param_var(root, var); + Param *paramExec = replace_nestloop_param_var(context->root, var); /* TODO: figure out which Var's to replace by which parameters*/ /* TODO: hack - insert param 1 for now */ @@ -188,12 +202,24 @@ TransformVarToParamExternMutator(Node *node, PlannerInfo *root) return (Node *) query_tree_mutator((Query *) node, TransformVarToParamExternMutator, - (void *) root, + (void *) context, 0); } return expression_tree_mutator(node, TransformVarToParamExternMutator, - (void *) root); + (void *) context); +} + + +static Query * +TransformVarToParamExtern(Query *query, PlannerInfo *root, Index *varnoMapping) +{ + TransformVarToParamExternMutatorContext context = { + root, + varnoMapping + }; + + return castNode(Query, TransformVarToParamExternMutator((Node *) query, &context)); } @@ -212,7 +238,8 @@ CreateDistributedUnionPlan(PlannerInfo *root, ShardInterval *shardInterval = NULL; - Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses); + Index *varnoMapping = NULL; /* store mapping back for outerrel checks */ + Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses, &varnoMapping); /* * Assume shards are colocated, any shard should suffice for now to find the initial @@ -234,7 +261,7 @@ CreateDistributedUnionPlan(PlannerInfo *root, UpdateRelationToShardNames((Node *) qc, relationShardList); /* transform Var's for other varno's to parameters */ - qc = castNode(Query, TransformVarToParamExternMutator((Node *) qc, root)); + qc = TransformVarToParamExtern(qc, root, varnoMapping); StringInfoData buf; initStringInfo(&buf); @@ -294,8 +321,8 @@ CreateDistributedUnionPlan(PlannerInfo *root, /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ clauses = extract_actual_clauses(clauses, false); - plan->scan.plan.qual = clauses; - plan->custom_exprs = clauses; +// plan->scan.plan.qual = clauses; +// plan->custom_exprs = clauses; return (Plan *) plan; } @@ -1414,8 +1441,14 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf } +/* + * when varnoMapping is set it stores an array of varno's in the new query to the original + * varno's of the source query. This can later be used to understand if the var's used in + * this query come from an outer rel in a nested loop. + */ static Query * -GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses) +GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses, + Index **varnoMapping) { PathQueryInfo info = { 0 }; info.varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size); @@ -1447,6 +1480,22 @@ GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses) } q->jointree->quals = VarNoMutator(q->jointree->quals, info.varno_mapping); + if (varnoMapping) + { + /* export the reverse varno mapping */ + int mappingSize = list_length(q->rtable); + *varnoMapping = palloc0(sizeof(Index) * root->simple_rel_array_size); + for (int i = 0; i < root->simple_rel_array_size; i++) + { + Index varno = info.varno_mapping[i]; + if (varno == 0) + { + continue; + } + (*varnoMapping)[varno] = i; + } + } + return q; } diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 64666e6be..92301fceb 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -27,10 +27,6 @@ typedef struct CitusScanState MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ - - int numParameters; /* number of parameters passed into node per rescan */ - Datum *paramValues; /* array of last passed parameter values */ - bool *paramNulls; /* array of last passed parameter null flags*/ } CitusScanState;