From 809c869c3d151a07395f47be77f4523a932a3603 Mon Sep 17 00:00:00 2001 From: eaydingol Date: Fri, 11 Jul 2025 23:11:19 +0300 Subject: [PATCH] Introduce a method to check if the left join is safe to push-down, use the same method to compute constraints for push --- .../distributed/planner/deparse_shard_query.c | 4 +- .../distributed/planner/recursive_planning.c | 297 ++++++++++++------ src/include/distributed/recursive_planning.h | 6 +- .../expected/recurring_join_pushdown.out | 251 ++++++++++++++- .../regress/sql/recurring_join_pushdown.sql | 36 ++- 5 files changed, 481 insertions(+), 113 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 311989fef..22d484a40 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -264,7 +264,7 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) { return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); } - ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname))); + ereport(DEBUG5, (errmsg("Distributed table from the inner part of the outer join: %s.", innerRte->eref->aliasname))); RelationShard *relationShard = FindRelationShard(innerRte->relid, relationShardList); @@ -293,7 +293,6 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) Const *constNodeUpperBound = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), shardInterval->maxValue, false, true); Const *constNodeZero = makeConst(INT4OID, -1, InvalidOid, sizeof(int32), Int32GetDatum(0), false, true); - // TOOD: the following is only for hash partitioned tables /* create a function expression node for the hash partition column */ FuncExpr *hashFunction = makeNode(FuncExpr); hashFunction->funcid = cacheEntry->hashFunction->fn_oid; @@ -367,7 +366,6 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) fromExpr->quals = make_and_qual(fromExpr->quals, shardIntervalBoundQuals); } - // TODO: verify this, do we need the recursive call for all nodes? /* We need to continue the recursive walk for the nested join statements.*/ return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index cfd714688..1f4913051 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -644,35 +644,6 @@ RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query, } -/* - * Returns true if the given node is recurring, or the node is a - * JoinExpr that contains a recurring node. -*/ -static bool -JoinExprHasNonRecurringTable(Node *node, Query *query) -{ - if (node == NULL) - { - return false; - } - else if (IsA(node, RangeTblRef)) - { - return IsRTERefRecurring((RangeTblRef *) node, query); - } - else if (IsA(node, JoinExpr)) - { - JoinExpr *joinExpr = (JoinExpr *) node; - - return JoinExprHasNonRecurringTable(joinExpr->larg, query) || - JoinExprHasNonRecurringTable(joinExpr->rarg, query); - } - else - { - return false; - } -} - - /* * RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and * recursively plans all non-recurring (i.e., distributed) rels that that @@ -742,16 +713,6 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, { /* left join */ - /* Recursively plan the right side of the left left join when the following - * conditions are met: - * 1. The left side is recurring - * 2. The right side is not recurring - * 3. Either of the following: - * a. The left side is not a RangeTblRef (i.e., it is not a reference/local table) - * b. The tables in the rigt side are not colocated. - * 5. The left side does not have the distribution column (TODO: CHECK THIS) - */ - if (leftNodeRecurs && !rightNodeRecurs) { if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query)) @@ -768,20 +729,6 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side"))); } } - - /* - * rightNodeRecurs if there is a recurring table in the right side. However, if the right side - * is a join expression, we need to check if it contains a recurring table. If it does, we need to - * recursively plan the right side of the left join. Push-down path does not handle the nested joins - * yet, once we have that, we can remove this check. - */ - else if (leftNodeRecurs && rightNodeRecurs && JoinExprHasNonRecurringTable(rightNode, query)) - { - ereport(DEBUG1, (errmsg("recursively planning right side of the left join " - "since right side is a joinexpr with non-recurring tables"))); - RecursivelyPlanDistributedJoinNode(rightNode, query, - recursivePlanningContext); - } /* * A LEFT JOIN is recurring if the lhs is recurring. @@ -2682,6 +2629,139 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte) } + +/* +* Check if the given rte is a citus table whose distribution column appears in +* the using clause and if it is hash distributed. If so, return true and set +* the partitionColumnVar to the Var representing the distribution column. If not, it return false. +*/ +bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar) +{ + if (rte->rtekind != RTE_RELATION && rte->rtekind != RTE_FUNCTION) + { + return false; + } + + if (usingClause == NULL) + { + return false; + } + + if(!IsCitusTable(rte->relid)) + { + return false; + } + + /* Get the partition column of the Citus table */ + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); + *partitionColumnVar = cacheEntry->partitionColumn; + char *partitionColumnName = get_attname(rte->relid, (*partitionColumnVar)->varattno, false); // WHERE I GET ERROR FOR FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a); + + /* Check if the partition column is in the using clause */ + ListCell *lc; + foreach(lc, usingClause) + { + char *colname = strVal(lfirst(lc)); + if (strcmp(colname, partitionColumnName) == 0) + { + return true; + } + } + return false; +} + + +/* +* FindPartitionColumnInSubquery iteratively searches for a partition column +* in the subquery's range table entries. If it finds a match, it sets the +* partitionColumnVar to the Var representing the partition column and +* rte to the RangeTblEntry containing the partition column. It returns true +* if it finds the partition column, otherwise it returns false. +*/ +bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte) +{ + if (query == NULL || query->rtable == NULL || usingClause == NULL) + { + return false; + } + + ListCell *lc; + foreach(lc, query->rtable) + { + RangeTblEntry *rteTmp = (RangeTblEntry *) lfirst(lc); + /* cases for RTE_RELATION and RTE_FUNCTION */ + if(CheckUsingClauseForRte(rteTmp, usingClause, partitionColumnVar)) + { + *rte = rteTmp; + return true; + } + if (rteTmp->rtekind == RTE_SUBQUERY ) + { + if (FindPartitionColumnInSubquery(rteTmp->subquery, usingClause, partitionColumnVar, rte)) + { + return true; + } + } + } + return false; +} + + +/* + * Recursively resolve a Var from a subquery target list to the base Var and RTE + */ +bool ResolveBaseVarFromSubquery(Var *var, Query *query, + Var **baseVar, RangeTblEntry **baseRte) +{ + TargetEntry *tle = get_tle_by_resno(query->targetList, var->varattno); + if (!tle || !IsA(tle->expr, Var)) + return false; + + Var *tleVar = (Var *) tle->expr; + RangeTblEntry *rte = rt_fetch(tleVar->varno, query->rtable); + + if (rte == NULL) + { + return false; + } + + if (rte->rtekind == RTE_RELATION || rte->rtekind == RTE_FUNCTION) + { + *baseVar = tleVar; + *baseRte = rte; + return true; + } + else if (rte->rtekind == RTE_SUBQUERY) + { + return ResolveBaseVarFromSubquery(tleVar, rte->subquery, baseVar, baseRte); + } + + return false; +} + + +/* + * CheckAttrNumAndDistributionTypeForJoinPushdown checks if the given attribute + * number is valid and if the Citus table is hash distributed. If both conditions + * are met, it returns true, otherwise it returns false. +*/ +bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte) +{ + if (attnum == InvalidAttrNumber) + { + return false; + } + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); + if(GetCitusTableType(cacheEntry) != HASH_DISTRIBUTED) + { + return false; + } + + return true; +} + + /* * CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression * is a left outer join and if it is feasible to push down the join. If feasible, @@ -2711,57 +2791,60 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, return false; } + /* If the right side is itself a join, we currently do not support pushdown for such cases */ + if (IsA(joinExpr->rarg, JoinExpr)) + { + ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right side is a join expression, pushdown is not supported"))); + return false; + } + RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable); - // Check if the join is performed on the distribution column + /* Check if the join is performed on the distribution column */ if (joinExpr->usingClause) { - if(rightTableEntry->rtekind != RTE_RELATION && rightTableEntry->rtekind != RTE_FUNCTION) + Var *partitionColumnVar = NULL; + if(rightTableEntry->rtekind == RTE_FUNCTION || rightTableEntry->rtekind == RTE_RELATION) { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a relation or function when using clause is present"))); - return false; - } - - if(!IsCitusTable(rightTableEntry->relid)) - { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a Citus table when using clause is present"))); - return false; - } - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rightTableEntry->relid); - Var *partitionColumnVar = cacheEntry->partitionColumn; - char *partitionColumnName = get_attname(rightTableEntry->relid, partitionColumnVar->varattno, false); - // Here we check if the partition column is in the using clause - ListCell *lc; - foreach(lc, joinExpr->usingClause) - { - char *colname = strVal(lfirst(lc)); - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: checking column %s in using clause", colname))); - // If the column name matches the partition column name - if (strcmp(colname, partitionColumnName) == 0) + if(!CheckUsingClauseForRte(rightTableEntry, joinExpr->usingClause, &partitionColumnVar)) { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: found partition column in using clause"))); - *distRte = rightTableEntry; - // Get the attribute number for the outer table - *attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar); - if(*attnum != InvalidAttrNumber) - { - return true; - } + return false; + } + *distRte = rightTableEntry; + + /* Get the attribute number for the outer table */ + *attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar); + return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); + } + else if(rightTableEntry->rtekind == RTE_SUBQUERY) + { + if(FindPartitionColumnInSubquery(rightTableEntry->subquery, joinExpr->usingClause, &partitionColumnVar, distRte)) + { + /* Get the attribute number for the outer table */ + *attnum = GetAttrNumForMatchingColumn(*outerRte, (*distRte)->relid, partitionColumnVar); + return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); + } + else + { + return false; } } - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: partition column not found in using clause"))); - return false; + else + { + ereport(DEBUG5, (errmsg("CheckPushDownFeasibilityAndComputeIndexes: right table of kind %d is not supported for pushdown when using clause is present", + rightTableEntry->rtekind))); + return false; + } } - else + else /* join is defined with on clause */ { List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals); if (joinClauseList == NIL) { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no quals in join clause"))); return false; } + Node *joinClause = NULL; foreach_declared_ptr(joinClause, joinClauseList) { @@ -2779,34 +2862,54 @@ bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, } RangeTblEntry *rte; - int attnumForInner; + Var *innerVar; if (leftColumn->varno == *outerRtIndex) { /* left column is the outer table of the comparison, get right */ rte = rt_fetch(rightColumn->varno, query->rtable); - attnumForInner = rightColumn->varattno; + innerVar = rightColumn; *attnum = leftColumn->varattno; } else if (rightColumn->varno == *outerRtIndex) { /* right column is the outer table of the comparison, get left*/ rte = rt_fetch(leftColumn->varno, query->rtable); - attnumForInner = leftColumn->varattno; + innerVar = leftColumn; *attnum = rightColumn->varattno; } + else + { + continue; + } if (rte && IsCitusTable(rte->relid)) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); - if(attnumForInner == cacheEntry->partitionColumn->varattno) + if(innerVar->varattno == cacheEntry->partitionColumn->varattno) { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: join on distribution column of %s", - rte->eref->aliasname))); *distRte = rte; - return true; + return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); } - } + } + else if (rte && rte->rtekind == RTE_SUBQUERY) + { + Var *baseVar = NULL; + RangeTblEntry *baseRte = NULL; + + if (ResolveBaseVarFromSubquery(innerVar, rte->subquery, &baseVar, &baseRte)) + { + if (IsCitusTable(baseRte->relid)) + { + CitusTableCacheEntry *entry = GetCitusTableCacheEntry(baseRte->relid); + if (baseVar->varattno == entry->partitionColumn->varattno) + { + *distRte = baseRte; + return CheckAttrNumAndDistributionTypeForJoinPushdown(*attnum, *distRte); + } + } + } + return false; + } } - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no join clause on distribution column found"))); return false; } diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index e04869273..700f9b001 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -54,6 +54,8 @@ extern void UpdateVarNosInNode(Node *node, Index newVarNo); extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum); extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query); - - +bool CheckUsingClauseForRte(RangeTblEntry *rte, List *usingClause, Var **partitionColumnVar); +bool FindPartitionColumnInSubquery(Query *query, List *usingClause, Var **partitionColumnVar, RangeTblEntry **rte); +bool ResolveBaseVarFromSubquery(Var *var, Query *query, Var **baseVar, RangeTblEntry **baseRte); +bool CheckAttrNumAndDistributionTypeForJoinPushdown(int attnum, RangeTblEntry *rte); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/recurring_join_pushdown.out b/src/test/regress/expected/recurring_join_pushdown.out index 66f80d727..64df6360e 100644 --- a/src/test/regress/expected/recurring_join_pushdown.out +++ b/src/test/regress/expected/recurring_join_pushdown.out @@ -20,7 +20,7 @@ SELECT create_distributed_table('d1', 'a'); (1 row) -INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); +INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10); --- For testing, remove before merge CREATE TABLE d1_local(like d1); INSERT INTO d1_local select * from d1; @@ -97,13 +97,13 @@ DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushd DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b)) count --------------------------------------------------------------------- - 14 + 16 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b); count --------------------------------------------------------------------- - 14 + 16 (1 row) -- Basic test cases with ON syntax @@ -213,13 +213,13 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c DEBUG: Creating router plan count --------------------------------------------------------------------- - 14 + 16 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; count --------------------------------------------------------------------- - 14 + 16 (1 row) SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; @@ -243,13 +243,13 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c DEBUG: Creating router plan count --------------------------------------------------------------------- - 11 + 13 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; count --------------------------------------------------------------------- - 11 + 13 (1 row) SET client_min_messages TO DEBUG1; @@ -262,12 +262,245 @@ DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_join_pu DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b)))) count --------------------------------------------------------------------- - 26 + 28 (1 row) SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; count --------------------------------------------------------------------- - 26 + 28 +(1 row) + +-- Test join pushdown behavior when the inner part of the join is a subquery +-- Using 'using' syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) USING (a); + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) USING (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) USING (a); + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) WHERE a > 1) USING (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) WHERE a > 1) USING (a); + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 JOIN d1 as d1_1 USING (a)) USING (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 57 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as d1_1 USING (a)) USING (a); + count +--------------------------------------------------------------------- + 57 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_join_pushdown.d1 WHERE true +DEBUG: recursively planning distributed relation "d1" "d1_1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" "d1_1" to a subquery +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_2 for subquery SELECT a FROM recurring_join_pushdown.d1 d1_1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN ((SELECT d1_2.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_2) d1 LEFT JOIN (SELECT d1_1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_1_1) d1_1 USING (a)) USING (a)) +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 57 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a); + count +--------------------------------------------------------------------- + 57 +(1 row) + +-- Using 'on' syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) AS d1_local ON r1_local.a = d1_local.a; + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) AS d1 ON r1.a = d1.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) AS d1 WHERE a > 1) AS d1 ON r1.a = d1.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) AS d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; + count +--------------------------------------------------------------------- + 15 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 57 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a; + count +--------------------------------------------------------------------- + 57 (1 row) diff --git a/src/test/regress/sql/recurring_join_pushdown.sql b/src/test/regress/sql/recurring_join_pushdown.sql index b44b6b192..f580a8074 100644 --- a/src/test/regress/sql/recurring_join_pushdown.sql +++ b/src/test/regress/sql/recurring_join_pushdown.sql @@ -14,7 +14,7 @@ INSERT INTO r1_local select * from r1; CREATE TABLE d1(a int, b int); SELECT create_distributed_table('d1', 'a'); -INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); +INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (4, 10); --- For testing, remove before merge CREATE TABLE d1_local(like d1); @@ -56,4 +56,36 @@ SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; -SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; \ No newline at end of file +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; + +-- Test join pushdown behavior when the inner part of the join is a subquery +-- Using 'using' syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) USING (a); +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) USING (a); + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) USING (a); +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) USING (a); + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) WHERE a > 1) USING (a); +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) WHERE a > 1) USING (a); + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 JOIN d1 as d1_1 USING (a)) USING (a); +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local JOIN d1_local as d1_1 USING (a)) USING (a); + +SELECT count(*) FROM r1 LEFT JOIN (d1 LEFT JOIN d1 as d1_1 USING (a)) USING (a); +SELECT count(*) FROM r1_local LEFT JOIN (d1_local LEFT JOIN d1_local as d1_1 USING (a)) USING (a); + +-- Using 'on' syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1) AS d1 ON r1.a = d1.a; +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local) AS d1_local ON r1_local.a = d1_local.a; + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM d1 WHERE a > 1) AS d1 ON r1.a = d1.a; +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; + +SELECT count(*) FROM r1 LEFT JOIN (SELECT * FROM (SELECT * FROM d1) AS d1 WHERE a > 1) AS d1 ON r1.a = d1.a; +SELECT count(*) FROM r1_local LEFT JOIN (SELECT * FROM (SELECT * FROM d1_local) AS d1_local WHERE a > 1) AS d1_local ON r1_local.a = d1_local.a; + +SELECT count(*) FROM r1 LEFT JOIN (SELECT d1.a as a, d1.b, d1_1.a AS a_1 FROM d1 LEFT JOIN d1 as d1_1 ON d1.a = d1_1.a) AS d1_2 ON r1.a = d1_2.a; +SELECT count(*) FROM r1_local LEFT JOIN (SELECT d1_local.a as a, d1_local.b, d1_1.a AS a_1 FROM d1_local LEFT JOIN d1_local as d1_1 ON d1_local.a = d1_1.a) AS d1_2 ON r1_local.a = d1_2.a;