From 503484cf8872c63025f3ad456ae8ae41c6be1de9 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Wed, 25 Dec 2024 19:01:07 +0500 Subject: [PATCH] [Bug Fix] Incorrect Query Results with Distributed Query Plans This PR addresses two reported issues: citusdata#7698 and citusdata#7697. The Problem: Both issues involve incorrect query results when a query references both local and distributed tables, and includes a WHERE EXISTS clause on the local table. For example: SELECT ... WHERE EXISTS (SELECT * FROM local_table); In such cases, the WHERE EXISTS clause typically generates an InitPlan or "one-time filter," which determines whether the rest of the plan's output qualifies for the result. If this InitPlan relies on the contents of a local table, it must be executed locally on the coordinator. However, the planner's decisions regarding whether to convert local or distributed tables into intermediate results fail to account for the references within the InitPlan. This results in an incorrect query execution plan and, subsequently, incorrect data. The Fix: This PR ensures that when the standard planner (standard_planner) generates an InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in the join qualifiers for relations referenced by the InitPlan. If such references exist, distributed table references are converted to intermediate results rather than local tables. This adjustment ensures that local tables used in the InitPlan remain intact and behave as expected. This fix prevents incorrect query results in cases involving mixed local and distributed tables with WHERE EXISTS clauses and improves the accuracy of distributed query planning. --- .../planner/local_distributed_join_planner.c | 90 ++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index a6502bf43..9f4f440c1 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -87,6 +87,7 @@ #include "optimizer/optimizer.h" #include "optimizer/planner.h" #include "optimizer/prep.h" +#include "optimizer/restrictinfo.h" #include "parser/parse_relation.h" #include "parser/parsetree.h" #include "utils/builtins.h" @@ -135,6 +136,7 @@ typedef struct RangeTableEntryDetails RangeTblEntry *rangeTableEntry; List *requiredAttributeNumbers; bool hasConstantFilterOnUniqueColumn; + bool hasDependencyOnInitPlanParam; #if PG_VERSION_NUM >= PG_VERSION_16 RTEPermissionInfo *perminfo; #endif @@ -175,6 +177,10 @@ typedef enum ConversionChoice static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, RelationRestriction *relationRestriction); + +static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction); + static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext, List *rangeTableList, @@ -290,7 +296,11 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL) { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + /* If Local table is referenced by the InitPlan that it kind of a One time filter, + * In that case we should refrain from converting the local tables. + */ + return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED) { @@ -314,7 +324,8 @@ GetConversionChoice(ConversionCandidates *conversionCandidates, } else { - return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; + return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ? + CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES; } } } @@ -383,6 +394,78 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList) } +/* + * HasDependencyOnInitPlanParam + * + * This function returns true if the given rangeTableEntry has a dependency + * on an InitPlan parameter. + */ +static bool +HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry, + RelationRestriction *relationRestriction) +{ + List* whereClauseList; + List* initPlanParamIDs = NIL; + ListCell *lc = NULL; + + /* + * Bail out if the plan does not contain the initPlan or if relationRestriction + * does not contain the joininfo + */ + if (rangeTableEntry == NULL || relationRestriction == NULL) + return false; + if (relationRestriction->relOptInfo->joininfo == NULL) + return false; + if (relationRestriction->plannerInfo->init_plans == NULL) + return false; + + /* + * collect all param Ids referenced by the InitPlan + */ + foreach(lc, relationRestriction->plannerInfo->init_plans) + { + Node *plan = (Node *) lfirst(lc); + + if (IsA(plan, SubPlan)) + { + SubPlan *subplan = (SubPlan *) plan; + if (subplan->setParam != NIL) + { + initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, subplan->setParam); + + } + } + } + if (initPlanParamIDs == NIL) + return false; + return false; + /* + * Check if any parameter in the join conditions (join info) for this relation + * is referenced by the initPlan. This is important to ensure that the query + * planner correctly handles dependencies on parameters set by the initPlan. + */ + /* check if any parameter in the join info for this relation is referenced by the initPlan */ + whereClauseList = extract_actual_clauses(relationRestriction->relOptInfo->joininfo, true); + + foreach(lc, whereClauseList) + { + Node *clause = (Node *) lfirst(lc); + + if (IsA(clause, Param)) + { + Param *param = (Param *) clause; + if (param->paramkind == PARAM_EXEC) + { + if (list_member_int(initPlanParamIDs, param->paramid)) + { + return true; + } + } + } + } + return false; +} + /* * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant * filter on a unique column. @@ -581,6 +664,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext, RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext); rangeTableEntryDetails->hasConstantFilterOnUniqueColumn = HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction); + rangeTableEntryDetails->hasDependencyOnInitPlanParam = + HasDependencyOnInitPlanParam(rangeTableEntry, relationRestriction); + #if PG_VERSION_NUM >= PG_VERSION_16 rangeTableEntryDetails->perminfo = NULL; if (rangeTableEntry->perminfoindex)