Introduce a method to check if the left join is safe to push-down, use the same method to compute constraints for push

leftjoin_push
eaydingol 2025-07-11 23:11:19 +03:00
parent abf51818f6
commit 809c869c3d
5 changed files with 481 additions and 113 deletions

View File

@ -264,7 +264,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
{ {
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
} }
ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname))); ereport(DEBUG5, (errmsg("Distributed table from the inner part of the outer join: %s.", innerRte->eref->aliasname)));
RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList); RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList);
@ -293,7 +293,6 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true); Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true);
Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true); Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true);
// TOOD: the following is only for hash partitioned tables
/* create a function expression node for the hash partition column */ /* create a function expression node for the hash partition column */
FuncExpr *hashFunction = makeNode(FuncExpr); FuncExpr *hashFunction = makeNode(FuncExpr);
hashFunction->funcid = cacheEntry->hashFunction->fn_oid; hashFunction->funcid = cacheEntry->hashFunction->fn_oid;
@ -367,7 +366,6 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals); fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals);
} }
// TODO: verify this, do we need the recursive call for all nodes?
/* We need to continue the recursive walk for the nested join statements.*/ /* We need to continue the recursive walk for the nested join statements.*/
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
} }

View File

@ -644,35 +644,6 @@ RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
} }
/*
* Returns true if the given node is recurring, or the node is a
* JoinExpr that contains a recurring node.
*/
static bool
JoinExprHasNonRecurringTable(Node *node, Query *query)
{
if (node == NULL)
{
return false;
}
else if (IsA(node, RangeTblRef))
{
return IsRTERefRecurring((RangeTblRef *) node, query);
}
else if (IsA(node, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) node;
return JoinExprHasNonRecurringTable(joinExpr->larg, query) ||
JoinExprHasNonRecurringTable(joinExpr->rarg, query);
}
else
{
return false;
}
}
/* /*
* RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and * RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and
* recursively plans all non-recurring (i.e., distributed) rels that that * recursively plans all non-recurring (i.e., distributed) rels that that
@ -742,16 +713,6 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
{ {
/* <recurring> left join <distributed> */ /* <recurring> left join <distributed> */
/* Recursively plan the right side of the left left join when the following
* conditions are met:
* 1. The left side is recurring
* 2. The right side is not recurring
* 3. Either of the following:
* a. The left side is not a RangeTblRef (i.e., it is not a reference/local table)
* b. The tables in the rigt side are not colocated.
* 5. The left side does not have the distribution column (TODO: CHECK THIS)
*/
if (leftNodeRecurs && !rightNodeRecurs) if (leftNodeRecurs && !rightNodeRecurs)
{ {
if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query)) if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query))
@ -769,20 +730,6 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
} }
} }
/*
* rightNodeRecurs if there is a recurring table in the right side. However, if the right side
* is a join expression, we need to check if it contains a recurring table. If it does, we need to
* recursively plan the right side of the left join. Push-down path does not handle the nested joins
* yet, once we have that, we can remove this check.
*/
else if (leftNodeRecurs && rightNodeRecurs && JoinExprHasNonRecurringTable(rightNode, query))
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since right side is a joinexpr with non-recurring tables")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
}
/* /*
* A LEFT JOIN is recurring if the lhs is recurring. * A LEFT JOIN is recurring if the lhs is recurring.
* Note that we might have converted the rhs into a recurring * Note that we might have converted the rhs into a recurring
@ -2682,6 +2629,139 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
} }
/*
* Check if the given rte is a citus table whose distribution column appears in
* the using clause and if it is hash distributed. If so, return true and set
* the partitionColumnVar to the Var representing the distribution column. If not, it return false.
*/
bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar)
{
if (rte->rtekind != RTE_RELATION && rte->rtekind != RTE_FUNCTION)
{
return false;
}
if (usingClause == NULL)
{
return false;
}
if(!IsCitusTable(rte->relid))
{
return false;
}
/* Get the partition column of the Citus table */
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
*partitionColumnVar = cacheEntry->partitionColumn;
char *partitionColumnName = get_attname(rte->relid, (*partitionColumnVar)->varattno, false); // WHERE I GET ERROR FOR FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
/* Check if the partition column is in the using clause */
ListCell *lc;
foreach(lc, usingClause)
{
char *colname = strVal(lfirst(lc));
if (strcmp(colname, partitionColumnName) == 0)
{
return true;
}
}
return false;
}
/*
* FindPartitionColumnInSubquery iteratively searches for a partition column
* in the subquery's range table entries. If it finds a match, it sets the
* partitionColumnVar to the Var representing the partition column and
* rte to the RangeTblEntry containing the partition column. It returns true
* if it finds the partition column, otherwise it returns false.
*/
bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte)
{
if (query == NULL || query->rtable == NULL || usingClause == NULL)
{
return false;
}
ListCell *lc;
foreach(lc, query->rtable)
{
RangeTblEntry *rteTmp = (RangeTblEntry *) lfirst(lc);
/* cases for RTE_RELATION and RTE_FUNCTION */
if(CheckUsingClauseForRte(rteTmp, usingClause, partitionColumnVar))
{
*rte = rteTmp;
return true;
}
if (rteTmp->rtekind == RTE_SUBQUERY )
{
if (FindPartitionColumnInSubquery(rteTmp->subquery, usingClause, partitionColumnVar, rte))
{
return true;
}
}
}
return false;
}
/*
* Recursively resolve a Var from a subquery target list to the base Var and RTE
*/
bool ResolveBaseVarFromSubquery(Var *var, Query *query,
Var **baseVar, RangeTblEntry **baseRte)
{
TargetEntry *tle = get_tle_by_resno(query->targetList, var->varattno);
if (!tle || !IsA(tle->expr, Var))
return false;
Var *tleVar = (Var *) tle->expr;
RangeTblEntry *rte = rt_fetch(tleVar->varno, query->rtable);
if (rte == NULL)
{
return false;
}
if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION)
{
*baseVar = tleVar;
*baseRte = rte;
return true;
}
else if (rte->rtekind == RTE_SUBQUERY)
{
return ResolveBaseVarFromSubquery(tleVar, rte->subquery, baseVar, baseRte);
}
return false;
}
/*
* CheckAttrNumAndDistributionTypeForJoinPushdown checks if the given attribute
* number is valid and if the Citus table is hash distributed. If both conditions
* are met, it returns true, otherwise it returns false.
*/
bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte)
{
if (attnum == InvalidAttrNumber)
{
return false;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
if(GetCitusTableType(cacheEntry) != HASH_DISTRIBUTED)
{
return false;
}
return true;
}
/* /*
* CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression * CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression
* is a left outer join and if it is feasible to push down the join. If feasible, * is a left outer join and if it is feasible to push down the join. If feasible,
@ -2711,57 +2791,60 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
return false; return false;
} }
/* If the right side is itself a join, we currently do not support pushdown for such cases */
if (IsA(joinExpr->rarg, JoinExpr))
{
ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right side is a join expression, pushdown is not supported")));
return false;
}
RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg;
RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable); RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable);
// Check if the join is performed on the distribution column /* Check if the join is performed on the distribution column */
if (joinExpr->usingClause) if (joinExpr->usingClause)
{ {
if(rightTableEntry->rtekind != RTE_RELATION && rightTableEntry->rtekind != RTE_FUNCTION) Var *partitionColumnVar = NULL;
if(rightTableEntry->rtekind == RTE_FUNCTION || rightTableEntry->rtekind == RTE_RELATION)
{ {
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a relation or function when using clause is present"))); if(!CheckUsingClauseForRte(rightTableEntry, joinExpr->usingClause, &partitionColumnVar))
return false;
}
if(!IsCitusTable(rightTableEntry->relid))
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a Citus table when using clause is present")));
return false;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rightTableEntry->relid);
Var *partitionColumnVar = cacheEntry->partitionColumn;
char *partitionColumnName = get_attname(rightTableEntry->relid, partitionColumnVar->varattno, false);
// Here we check if the partition column is in the using clause
ListCell *lc;
foreach(lc, joinExpr->usingClause)
{
char *colname = strVal(lfirst(lc));
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: checking column %s in using clause", colname)));
// If the column name matches the partition column name
if (strcmp(colname, partitionColumnName) == 0)
{ {
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: found partition column in using clause"))); return false;
*distRte = rightTableEntry; }
// Get the attribute number for the outer table *distRte = rightTableEntry;
*attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar);
if(*attnum != InvalidAttrNumber) /* Get the attribute number for the outer table */
{ *attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar);
return true; return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
} }
else if(rightTableEntry->rtekind == RTE_SUBQUERY)
{
if(FindPartitionColumnInSubquery(rightTableEntry->subquery, joinExpr->usingClause, &partitionColumnVar, distRte))
{
/* Get the attribute number for the outer table */
*attnum = GetAttrNumForMatchingColumn(*outerRte, (*distRte)->relid, partitionColumnVar);
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
}
else
{
return false;
} }
} }
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: partition column not found in using clause"))); else
return false; {
ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right table of kind %d is not supported for pushdown when using clause is present",
rightTableEntry->rtekind)));
return false;
}
} }
else else /* join is defined with on clause */
{ {
List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals); List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals);
if (joinClauseList == NIL) if (joinClauseList == NIL)
{ {
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no quals in join clause")));
return false; return false;
} }
Node *joinClause = NULL; Node *joinClause = NULL;
foreach_declared_ptr(joinClause, joinClauseList) foreach_declared_ptr(joinClause, joinClauseList)
{ {
@ -2779,34 +2862,54 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
} }
RangeTblEntry *rte; RangeTblEntry *rte;
int attnumForInner; Var *innerVar;
if (leftColumn->varno == *outerRtIndex) if (leftColumn->varno == *outerRtIndex)
{ {
/* left column is the outer table of the comparison, get right */ /* left column is the outer table of the comparison, get right */
rte = rt_fetch(rightColumn->varno, query->rtable); rte = rt_fetch(rightColumn->varno, query->rtable);
attnumForInner = rightColumn->varattno; innerVar = rightColumn;
*attnum = leftColumn->varattno; *attnum = leftColumn->varattno;
} }
else if (rightColumn->varno == *outerRtIndex) else if (rightColumn->varno == *outerRtIndex)
{ {
/* right column is the outer table of the comparison, get left*/ /* right column is the outer table of the comparison, get left*/
rte = rt_fetch(leftColumn->varno, query->rtable); rte = rt_fetch(leftColumn->varno, query->rtable);
attnumForInner = leftColumn->varattno; innerVar = leftColumn;
*attnum = rightColumn->varattno; *attnum = rightColumn->varattno;
} }
else
{
continue;
}
if (rte && IsCitusTable(rte->relid)) if (rte && IsCitusTable(rte->relid))
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
if(attnumForInner == cacheEntry->partitionColumn->varattno) if(innerVar->varattno == cacheEntry->partitionColumn->varattno)
{ {
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: join on distribution column of %s",
rte->eref->aliasname)));
*distRte = rte; *distRte = rte;
return true; return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
} }
} }
else if (rte && rte->rtekind == RTE_SUBQUERY)
{
Var *baseVar = NULL;
RangeTblEntry *baseRte = NULL;
if (ResolveBaseVarFromSubquery(innerVar, rte->subquery, &baseVar, &baseRte))
{
if (IsCitusTable(baseRte->relid))
{
CitusTableCacheEntry *entry = GetCitusTableCacheEntry(baseRte->relid);
if (baseVar->varattno == entry->partitionColumn->varattno)
{
*distRte = baseRte;
return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte);
}
}
}
return false;
}
} }
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no join clause on distribution column found")));
return false; return false;
} }

View File

@ -54,6 +54,8 @@ extern void UpdateVarNosInNode(Node *node, Index newVarNo);
extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte);
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum); extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum);
extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query); extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query);
bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar);
bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte);
bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar, RangeTblEntry **baseRte);
bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte);
#endif /* RECURSIVE_PLANNING_H */ #endif /* RECURSIVE_PLANNING_H */

View File

@ -20,7 +20,7 @@ SELECT create_distributed_table('d1', 'a');
(1 row) (1 row)
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10);
--- For testing, remove before merge --- For testing, remove before merge
CREATE TABLE d1_local(like d1); CREATE TABLE d1_local(like d1);
INSERT INTO d1_local select * from d1; INSERT INTO d1_local select * from d1;
@ -97,13 +97,13 @@ DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushd
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
14 16
(1 row) (1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b); SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
14 16
(1 row) (1 row)
-- Basic test cases with ON syntax -- Basic test cases with ON syntax
@ -213,13 +213,13 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
DEBUG: Creating router plan DEBUG: Creating router plan
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
14 16
(1 row) (1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
14 16
(1 row) (1 row)
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b;
@ -243,13 +243,13 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
DEBUG: Creating router plan DEBUG: Creating router plan
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
11 13
(1 row) (1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
11 13
(1 row) (1 row)
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
@ -262,12 +262,245 @@ DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_join_pu
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b)))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b))))
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
26 28
(1 row) (1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
26 28
(1 row)
-- Test join pushdown behavior when the inner part of the join is a subquery
-- Using 'using' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) USING (a);
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) USING (a);
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) WHERE a > 1) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) WHERE a > 1) USING (a);
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 JOIN d1 as d1_1 USING (a)) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
57
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as d1_1 USING (a)) USING (a);
count
---------------------------------------------------------------------
57
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_join_pushdown.d1 WHERE true
DEBUG: recursively planning distributed relation "d1" "d1_1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" "d1_1" to a subquery
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_2 for subquery SELECT a FROM recurring_join_pushdown.d1 d1_1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN ((SELECT d1_2.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_2) d1 LEFT JOIN (SELECT d1_1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_1_1) d1_1 USING (a)) USING (a))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
57
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a);
count
---------------------------------------------------------------------
57
(1 row)
-- Using 'on' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) AS d1_local ON r1_local.a = d1_local.a;
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) AS d1 ON r1.a = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a;
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) AS d1 WHERE a > 1) AS d1 ON r1.a = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) AS d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a;
count
---------------------------------------------------------------------
15
(1 row)
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
57
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a;
count
---------------------------------------------------------------------
57
(1 row) (1 row)

View File

@ -14,7 +14,7 @@ INSERT INTO r1_local select * from r1;
CREATE TABLE d1(a int, b int); CREATE TABLE d1(a int, b int);
SELECT create_distributed_table('d1', 'a'); SELECT create_distributed_table('d1', 'a');
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10);
--- For testing, remove before merge --- For testing, remove before merge
CREATE TABLE d1_local(like d1); CREATE TABLE d1_local(like d1);
@ -57,3 +57,35 @@ SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions -- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;
-- Test join pushdown behavior when the inner part of the join is a subquery
-- Using 'using' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) USING (a);
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) USING (a);
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) WHERE a > 1) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) WHERE a > 1) USING (a);
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 JOIN d1 as d1_1 USING (a)) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as d1_1 USING (a)) USING (a);
SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a);
SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a);
-- Using 'on' syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) AS d1_local ON r1_local.a = d1_local.a;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) AS d1 ON r1.a = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a;
SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) AS d1 WHERE a > 1) AS d1 ON r1.a = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) AS d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a;
SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a;
SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a;