fix check for outer rel to take mapped varno's into account

moonshot/custom-path
Nils Dijk 2021-11-10 16:51:58 +01:00
parent 2a67d0057d
commit 071cae2f6d
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
3 changed files with 60 additions and 35 deletions

View File

@ -608,11 +608,6 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
scanState->finishedPreScan = false; scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false; scanState->finishedRemoteScan = false;
int numParams = NumberOfParameters((Node *) scan->custom_exprs);
scanState->numParameters = numParams;
scanState->paramValues = palloc0(scanState->numParameters * sizeof(Datum));
scanState->paramNulls = palloc0(scanState->numParameters * sizeof(bool));
return (Node *) scanState; return (Node *) scanState;
} }
@ -721,21 +716,6 @@ CitusReScan(CustomScanState *node)
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
/* for all the changed parameters, store them locally so we can use them during query dispatch */
int x = -1;
while ((x = bms_next_member(node->ss.ps.chgParam, x)) >= 0)
{
/* make sure the parameter that changed is within bounds */
Assert(scanState->numParameters > x);
/* copy the parameter information into local state*/
ParamExecData paramExecData = executorState->es_param_exec_vals[x];
scanState->paramValues[x] = paramExecData.value;
scanState->paramNulls[x] = paramExecData.isnull;
ereport(NOTICE, (errmsg("changed parameter: %d", x)));
}
if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved) if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

View File

@ -66,7 +66,7 @@ static CustomPath * WrapTableAccessWithDistributedUnion(Path *originalPath, uint
Expr *partitionValue, Oid Expr *partitionValue, Oid
sampleRelid, List *subPaths); sampleRelid, List *subPaths);
static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, static Query * GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist,
List *clauses); List *clauses, Index **varnoMapping);
static List * ShardIntervalListToRelationShardList(List *shardIntervalList); static List * ShardIntervalListToRelationShardList(List *shardIntervalList);
static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath); static List * OptimizeJoinPath(PlannerInfo *root, Path *originalPath);
static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath); static List * BroadcastOuterJoinPath(PlannerInfo *root, Path *originalPath);
@ -148,8 +148,15 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId,
} }
typedef struct TransformVarToParamExternMutatorContext
{
PlannerInfo *root;
Index *varnoMapping;
} TransformVarToParamExternMutatorContext;
static Node * static Node *
TransformVarToParamExternMutator(Node *node, PlannerInfo *root) TransformVarToParamExternMutator(Node *node, TransformVarToParamExternMutatorContext *context)
{ {
if (node == NULL) if (node == NULL)
{ {
@ -160,13 +167,20 @@ TransformVarToParamExternMutator(Node *node, PlannerInfo *root)
{ {
Var *var = castNode(Var, node); Var *var = castNode(Var, node);
Index originalVarNo = context->varnoMapping[var->varno];
if (originalVarNo == 0)
{
/* no mapping was required */
originalVarNo = var->varno;
}
Assert(originalVarNo > 0);
/* If not to be replaced, we can just return the Var unmodified */ /* If not to be replaced, we can just return the Var unmodified */
if (!bms_is_member(var->varno, root->curOuterRels)) if (!bms_is_member(originalVarNo, context->root->curOuterRels))
{ {
return (Node *) var; return (Node *) var;
} }
Param *paramExec = replace_nestloop_param_var(root, var); Param *paramExec = replace_nestloop_param_var(context->root, var);
/* TODO: figure out which Var's to replace by which parameters*/ /* TODO: figure out which Var's to replace by which parameters*/
/* TODO: hack - insert param 1 for now */ /* TODO: hack - insert param 1 for now */
@ -188,12 +202,24 @@ TransformVarToParamExternMutator(Node *node, PlannerInfo *root)
return (Node *) query_tree_mutator((Query *) node, return (Node *) query_tree_mutator((Query *) node,
TransformVarToParamExternMutator, TransformVarToParamExternMutator,
(void *) root, (void *) context,
0); 0);
} }
return expression_tree_mutator(node, TransformVarToParamExternMutator, return expression_tree_mutator(node, TransformVarToParamExternMutator,
(void *) root); (void *) context);
}
static Query *
TransformVarToParamExtern(Query *query, PlannerInfo *root, Index *varnoMapping)
{
TransformVarToParamExternMutatorContext context = {
root,
varnoMapping
};
return castNode(Query, TransformVarToParamExternMutator((Node *) query, &context));
} }
@ -212,7 +238,8 @@ CreateDistributedUnionPlan(PlannerInfo *root,
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses); Index *varnoMapping = NULL; /* store mapping back for outerrel checks */
Query *q = GetQueryFromPath(root, distUnion->worker_path, tlist, clauses, &varnoMapping);
/* /*
* Assume shards are colocated, any shard should suffice for now to find the initial * Assume shards are colocated, any shard should suffice for now to find the initial
@ -234,7 +261,7 @@ CreateDistributedUnionPlan(PlannerInfo *root,
UpdateRelationToShardNames((Node *) qc, relationShardList); UpdateRelationToShardNames((Node *) qc, relationShardList);
/* transform Var's for other varno's to parameters */ /* transform Var's for other varno's to parameters */
qc = castNode(Query, TransformVarToParamExternMutator((Node *) qc, root)); qc = TransformVarToParamExtern(qc, root, varnoMapping);
StringInfoData buf; StringInfoData buf;
initStringInfo(&buf); initStringInfo(&buf);
@ -294,8 +321,8 @@ CreateDistributedUnionPlan(PlannerInfo *root,
/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
clauses = extract_actual_clauses(clauses, false); clauses = extract_actual_clauses(clauses, false);
plan->scan.plan.qual = clauses; // plan->scan.plan.qual = clauses;
plan->custom_exprs = clauses; // plan->custom_exprs = clauses;
return (Plan *) plan; return (Plan *) plan;
} }
@ -1414,8 +1441,14 @@ ApplyPathToQuery(PlannerInfo *root, Query *query, Path *path, PathQueryInfo *inf
} }
/*
* when varnoMapping is set it stores an array of varno's in the new query to the original
* varno's of the source query. This can later be used to understand if the var's used in
* this query come from an outer rel in a nested loop.
*/
static Query * static Query *
GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses) GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses,
Index **varnoMapping)
{ {
PathQueryInfo info = { 0 }; PathQueryInfo info = { 0 };
info.varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size); info.varno_mapping = palloc0(sizeof(Index) * root->simple_rel_array_size);
@ -1447,6 +1480,22 @@ GetQueryFromPath(PlannerInfo *root, Path *path, List *tlist, List *clauses)
} }
q->jointree->quals = VarNoMutator(q->jointree->quals, info.varno_mapping); q->jointree->quals = VarNoMutator(q->jointree->quals, info.varno_mapping);
if (varnoMapping)
{
/* export the reverse varno mapping */
int mappingSize = list_length(q->rtable);
*varnoMapping = palloc0(sizeof(Index) * root->simple_rel_array_size);
for (int i = 0; i < root->simple_rel_array_size; i++)
{
Index varno = info.varno_mapping[i];
if (varno == 0)
{
continue;
}
(*varnoMapping)[varno] = i;
}
}
return q; return q;
} }

View File

@ -27,10 +27,6 @@ typedef struct CitusScanState
MultiExecutorType executorType; /* distributed executor type */ MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */ bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
int numParameters; /* number of parameters passed into node per rescan */
Datum *paramValues; /* array of last passed parameter values */
bool *paramNulls; /* array of last passed parameter null flags*/
} CitusScanState; } CitusScanState;