[Bug Fix] Incorrect Query Results with Distributed Query Plans

This PR addresses two reported issues: citusdata#7698 and citusdata#7697.

The Problem:
Both issues involve incorrect query results when a query references both local
and distributed tables, and includes a WHERE EXISTS clause on the local table.

For example:
SELECT ...
WHERE EXISTS (SELECT * FROM local_table);

In such cases, the WHERE EXISTS clause typically generates an InitPlan
or "one-time filter," which determines whether the rest of the plan's output
qualifies for the result. If this InitPlan relies on the contents of a
local table, it must be executed locally on the coordinator. However, the
planner's decisions regarding whether to convert local or distributed tables
into intermediate results fail to account for the references within the InitPlan.
This results in an incorrect query execution plan and, subsequently, incorrect data.

The Fix:
This PR ensures that when the standard planner (standard_planner) generates an
InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in
the join qualifiers for relations referenced by the InitPlan. If such references
exist, distributed table references are converted to intermediate results rather
than local tables. This adjustment ensures that local tables used in the
InitPlan remain intact and behave as expected.

This fix prevents incorrect query results in cases involving mixed local and
distributed tables with WHERE EXISTS clauses and improves the accuracy of
distributed query planning.
pull/7809/head
Muhammad Usama 2024-12-25 19:01:07 +05:00 committed by Muhammad Usama
parent 73411915a4
commit 503484cf88
1 changed files with 88 additions and 2 deletions

View File

@ -87,6 +87,7 @@
#include "optimizer/optimizer.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
@ -135,6 +136,7 @@ typedef struct RangeTableEntryDetails
RangeTblEntry *rangeTableEntry;
List *requiredAttributeNumbers;
bool hasConstantFilterOnUniqueColumn;
bool hasDependencyOnInitPlanParam;
#if PG_VERSION_NUM >= PG_VERSION_16
RTEPermissionInfo *perminfo;
#endif
@ -175,6 +177,10 @@ typedef enum ConversionChoice
static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);
static bool HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);
static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList,
@ -290,7 +296,11 @@ GetConversionChoice(ConversionCandidates *conversionCandidates,
if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
{
return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
/* If Local table is referenced by the InitPlan that it kind of a One time filter,
* In that case we should refrain from converting the local tables.
*/
return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ?
CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
}
else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
{
@ -314,7 +324,8 @@ GetConversionChoice(ConversionCandidates *conversionCandidates,
}
else
{
return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
return localRTECandidate && (!localRTECandidate->hasDependencyOnInitPlanParam) ?
CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
}
}
}
@ -383,6 +394,78 @@ ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList)
}
/*
* HasDependencyOnInitPlanParam
*
* This function returns true if the given rangeTableEntry has a dependency
* on an InitPlan parameter.
*/
static bool
HasDependencyOnInitPlanParam(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction)
{
List* whereClauseList;
List* initPlanParamIDs = NIL;
ListCell *lc = NULL;
/*
* Bail out if the plan does not contain the initPlan or if relationRestriction
* does not contain the joininfo
*/
if (rangeTableEntry == NULL || relationRestriction == NULL)
return false;
if (relationRestriction->relOptInfo->joininfo == NULL)
return false;
if (relationRestriction->plannerInfo->init_plans == NULL)
return false;
/*
* collect all param Ids referenced by the InitPlan
*/
foreach(lc, relationRestriction->plannerInfo->init_plans)
{
Node *plan = (Node *) lfirst(lc);
if (IsA(plan, SubPlan))
{
SubPlan *subplan = (SubPlan *) plan;
if (subplan->setParam != NIL)
{
initPlanParamIDs = list_concat_unique_int(initPlanParamIDs, subplan->setParam);
}
}
}
if (initPlanParamIDs == NIL)
return false;
return false;
/*
* Check if any parameter in the join conditions (join info) for this relation
* is referenced by the initPlan. This is important to ensure that the query
* planner correctly handles dependencies on parameters set by the initPlan.
*/
/* check if any parameter in the join info for this relation is referenced by the initPlan */
whereClauseList = extract_actual_clauses(relationRestriction->relOptInfo->joininfo, true);
foreach(lc, whereClauseList)
{
Node *clause = (Node *) lfirst(lc);
if (IsA(clause, Param))
{
Param *param = (Param *) clause;
if (param->paramkind == PARAM_EXEC)
{
if (list_member_int(initPlanParamIDs, param->paramid))
{
return true;
}
}
}
}
return false;
}
/*
* HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant
* filter on a unique column.
@ -581,6 +664,9 @@ CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext);
rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction);
rangeTableEntryDetails->hasDependencyOnInitPlanParam =
HasDependencyOnInitPlanParam(rangeTableEntry, relationRestriction);
#if PG_VERSION_NUM >= PG_VERSION_16
rangeTableEntryDetails->perminfo = NULL;
if (rangeTableEntry->perminfoindex)