From abf51818f6d8cdfef4e1cddfbba01d2418f9e99a Mon Sep 17 00:00:00 2001 From: eaydingol Date: Wed, 28 May 2025 19:27:25 +0300 Subject: [PATCH] Refactor the code, add checks for the join clause, add basic test cases. --- .../distributed/planner/deparse_shard_query.c | 83 ++---- .../planner/multi_physical_planner.c | 2 - .../distributed/planner/recursive_planning.c | 182 ++++++++++-- .../distributed/utils/distribution_column.c | 26 ++ src/backend/distributed/utils/query_utils.c | 31 +- src/include/distributed/deparse_shard_query.h | 1 - src/include/distributed/distribution_column.h | 1 + src/include/distributed/query_utils.h | 3 +- src/include/distributed/recursive_planning.h | 2 + .../expected/recurring_join_pushdown.out | 273 ++++++++++++++++++ src/test/regress/multi_1_schedule | 1 + .../regress/sql/recurring_join_pushdown.sql | 59 ++++ src/test/regress/sql/recurring_outer_join.sql | 60 ++++ 13 files changed, 596 insertions(+), 128 deletions(-) create mode 100644 src/test/regress/expected/recurring_join_pushdown.out create mode 100644 src/test/regress/sql/recurring_join_pushdown.sql diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 5511c402c..311989fef 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -206,58 +206,10 @@ UpdateTaskQueryString(Query *query, Task *task) SetTaskQueryIfShouldLazyDeparse(task, query); } -/* - * Iterates through the FROM clause of the query and checks if there is a join - * expr with a reference and distributed table. - * If there is, it adds the index of the range table entry of the outer - * table in the join clause to the constraintIndexes list. It also sets the - * innerRte to point to the range table entry inner table. -*/ -bool ExtractIndexesForConstaints(List *fromList, List *rtable, - int *outerRtIndex, RangeTblEntry **distRte) -{ - ereport(DEBUG5, (errmsg("******"))); - ListCell *fromExprCell; - - // Check the first element of the from clause, the rest is already handled - foreach(fromExprCell, fromList) - { - Node *fromElement = (Node *) lfirst(fromExprCell); - - if (IsA(fromElement, JoinExpr)) - { - JoinExpr *joinExpr = (JoinExpr *) fromElement; - if(!IS_OUTER_JOIN(joinExpr->jointype)) - { - continue; - } - *outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex); - RangeTblEntry *outerRte = rt_fetch(*outerRtIndex, rtable); - - if(!IsPushdownSafeForRTEInLeftJoin(outerRte)) - { - return false; - } - - if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, distRte)) - { - return false; - } - - return true; - } - } - - return false; -} - /* * UpdateWhereClauseForOuterJoin walks over the query tree and appends quals * to the WHERE clause to filter w.r.to the distribution column of the corresponding shard. - * TODO: - * - Not supported cases should not call this function. - * - Remove the excessive debug messages. */ bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) @@ -273,17 +225,27 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) } Query *query = (Query *) node; + if (query->jointree == NULL) { return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); } FromExpr *fromExpr = query->jointree; - if(fromExpr == NULL) + if(fromExpr == NULL || fromExpr->fromlist == NIL) { return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); } + // TODO: generalize to the list + Node *firstFromItem = linitial(fromExpr->fromlist); + if (!IsA(firstFromItem, JoinExpr)) + { + return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); + } + + JoinExpr *joinExpr = (JoinExpr *) firstFromItem; + /* * We need to find the outer table in the join clause to add the constraints w.r.to the shard * intervals of the inner table. @@ -293,13 +255,15 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) RangeTblEntry *innerRte = NULL; RangeTblEntry *outerRte = NULL; int outerRtIndex = -1; - bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte); - if (!result) + int attnum; + if(!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum)) + { + return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); + } + if( attnum == InvalidAttrNumber) { - ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: failed to extract indexes"))); return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); } - ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname))); @@ -311,18 +275,9 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList) Var *partitionColumnVar = cacheEntry->partitionColumn; /* - * we will add constraints for the outer table, we need to find the column in the outer - * table that is comparable to the partition column of the inner table. - * If the column does not exist, we return without modifying the query. - * If the column exists, we create a Var node for the outer table's partition column. + * we will add constraints for the outer table, + * we create a Var node for the outer table's column that is compared with the distribution column. */ - outerRte = rt_fetch(outerRtIndex, query->rtable); - AttrNumber attnum = GetAttrNumForMatchingColumn(outerRte, relationId, partitionColumnVar); - // TODO: we also have to check that the tables are joined on the partition column. - if( attnum == InvalidAttrNumber) - { - return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0); - } Var* outerTablePartitionColumnVar = makeVar( outerRtIndex, attnum, partitionColumnVar->vartype, diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index ddea8df87..800d2f3fe 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2231,8 +2231,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, */ if (IsInnerTableOfOuterJoin(relationRestriction)) { - ereport(DEBUG1, errmsg("Inner Table of Outer Join %d", - relationRestriction->relationId)); innerTableOfOuterJoin = true; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index b1268b073..cfd714688 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -76,6 +76,7 @@ #include "distributed/combine_query_planner.h" #include "distributed/commands/multi_copy.h" #include "distributed/distributed_planner.h" +#include "distributed/distribution_column.h" #include "distributed/errormessage.h" #include "distributed/listutils.h" #include "distributed/local_distributed_join_planner.h" @@ -753,47 +754,21 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, if (leftNodeRecurs && !rightNodeRecurs) { - int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex; - RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable); - RangeTblEntry *innerRte = NULL; - bool planned = false; - if (!IsPushdownSafeForRTEInLeftJoin(rte)) + if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query)) { ereport(DEBUG1, (errmsg("recursively planning right side of " "the left join since the outer side " - "is a recurring rel that is not an RTE"))); + "is a recurring rel and it is not " + "feasible to push down"))); RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); - planned = true; } - else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte)) + else { - ereport(DEBUG1, (errmsg("recursively planning right side of the left join " - "since tables in the inner side of the left " - "join are not colocated"))); - RecursivelyPlanDistributedJoinNode(rightNode, query, - recursivePlanningContext); - planned = true; + ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side"))); } - - if(!planned) - { - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(innerRte->relid) - if(GetAttrNumForMatchingColumn(rte, innerRte->relid, cacheEntry->partitionColumn) == InvalidAttrNumber) - { - ereport(DEBUG1, (errmsg("recursively planning right side of the left join " - "since the outer side does not have the distribution column"))); - RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); - } - else - { - ereport(DEBUG1, (errmsg("not recursively planning right side of the left join " - "since the outer side is a RangeTblRef and " - "the inner side is colocated with it"))); - } - } - } + /* * rightNodeRecurs if there is a recurring table in the right side. However, if the right side * is a join expression, we need to check if it contains a recurring table. If it does, we need to @@ -2706,3 +2681,146 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte) } } + +/* + * CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression + * is a left outer join and if it is feasible to push down the join. If feasible, + * it computes the outer relation's range table index, the outer relation's + * range table entry, the inner (distributed) relation's range table entry, and the + * attribute number of the partition column in the outer relation. +*/ +bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum) +{ + + if(!IS_OUTER_JOIN(joinExpr->jointype)) + { + return false; + } + + // TODO: generalize to right joins + if(joinExpr->jointype != JOIN_LEFT) + { + return false; + } + + *outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex); + *outerRte = rt_fetch(*outerRtIndex, query->rtable); + + if(!IsPushdownSafeForRTEInLeftJoin(*outerRte)) + { + return false; + } + + RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg; + RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable); + + // Check if the join is performed on the distribution column + if (joinExpr->usingClause) + { + if(rightTableEntry->rtekind != RTE_RELATION && rightTableEntry->rtekind != RTE_FUNCTION) + { + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a relation or function when using clause is present"))); + return false; + } + + if(!IsCitusTable(rightTableEntry->relid)) + { + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a Citus table when using clause is present"))); + return false; + } + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rightTableEntry->relid); + Var *partitionColumnVar = cacheEntry->partitionColumn; + char *partitionColumnName = get_attname(rightTableEntry->relid, partitionColumnVar->varattno, false); + // Here we check if the partition column is in the using clause + ListCell *lc; + foreach(lc, joinExpr->usingClause) + { + char *colname = strVal(lfirst(lc)); + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: checking column %s in using clause", colname))); + // If the column name matches the partition column name + if (strcmp(colname, partitionColumnName) == 0) + { + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: found partition column in using clause"))); + *distRte = rightTableEntry; + // Get the attribute number for the outer table + *attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar); + if(*attnum != InvalidAttrNumber) + { + return true; + } + } + } + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: partition column not found in using clause"))); + return false; + } + else + { + List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals); + if (joinClauseList == NIL) + { + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no quals in join clause"))); + return false; + } + Node *joinClause = NULL; + foreach_declared_ptr(joinClause, joinClauseList) + { + if (!NodeIsEqualsOpExpr(joinClause)) + { + continue; + } + OpExpr *joinClauseExpr = castNode(OpExpr, joinClause); + + Var *leftColumn = LeftColumnOrNULL(joinClauseExpr); + Var *rightColumn = RightColumnOrNULL(joinClauseExpr); + if (leftColumn == NULL || rightColumn == NULL) + { + continue; + } + + RangeTblEntry *rte; + int attnumForInner; + if (leftColumn->varno == *outerRtIndex) + { + /* left column is the outer table of the comparison, get right */ + rte = rt_fetch(rightColumn->varno, query->rtable); + attnumForInner = rightColumn->varattno; + *attnum = leftColumn->varattno; + } + else if (rightColumn->varno == *outerRtIndex) + { + /* right column is the outer table of the comparison, get left*/ + rte = rt_fetch(leftColumn->varno, query->rtable); + attnumForInner = leftColumn->varattno; + *attnum = rightColumn->varattno; + } + if (rte && IsCitusTable(rte->relid)) + { + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid); + if(attnumForInner == cacheEntry->partitionColumn->varattno) + { + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: join on distribution column of %s", + rte->eref->aliasname))); + *distRte = rte; + return true; + } + } + } + ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no join clause on distribution column found"))); + return false; + } + +} + +/* + * Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes. + * See CheckPushDownFeasibilityAndComputeIndexes for more details. +*/ +bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query) +{ + int outerRtIndex; + RangeTblEntry *outerRte = NULL; + RangeTblEntry *innerRte = NULL; + int attnum; + return CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum); +} \ No newline at end of file diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 5927be612..181a2a734 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -282,3 +282,29 @@ ColumnToColumnName(Oid relationId, Node *columnNode) return columnName; } + + +/* + * GetAttrNumForMatchingColumn returns the attribute number for the column + * in the target relation that matches the given Var. If the column does not + * exist or is not comparable, it returns InvalidAttrNumber. + */ +AttrNumber +GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var) +{ + char *targetColumnName = get_attname(relid, var->varattno, false); + AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName); + if (attnum == InvalidAttrNumber) + { + ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s", + targetColumnName, rteTarget->eref->aliasname))); + return InvalidAttrNumber; + } + if(var->vartype != get_atttype(rteTarget->relid, attnum)) + { + ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d", + targetColumnName, rteTarget->relid, relid))); + return InvalidAttrNumber; + } + return attnum; +} diff --git a/src/backend/distributed/utils/query_utils.c b/src/backend/distributed/utils/query_utils.c index e4267aea1..aa6f76659 100644 --- a/src/backend/distributed/utils/query_utils.c +++ b/src/backend/distributed/utils/query_utils.c @@ -17,6 +17,7 @@ #include "nodes/primnodes.h" #include "parser/parsetree.h" +#include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/query_utils.h" #include "distributed/relation_restriction_equivalence.h" @@ -241,7 +242,7 @@ ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context) * representative table. */ bool -CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) +CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte, List **citusRelids) { ExtractRangeTableIdsContext context; List *idList = NIL; @@ -250,7 +251,6 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) ExtractRangeTableIds(node, &context); RangeTblEntry *rteTmp; - List *citusRelids = NIL; ListCell *lc = NULL; foreach(lc, idList) @@ -258,7 +258,7 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) rteTmp = (RangeTblEntry *) lfirst(lc); if (IsCitusTable(rteTmp->relid)) { - citusRelids = lappend_int(citusRelids, rteTmp->relid); + *citusRelids = lappend_int(*citusRelids, rteTmp->relid); *rte = rteTmp; // set the value of rte, a representative table } } @@ -272,27 +272,4 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte) return true; } -/* - * GetAttrNumForMatchingColumn returns the attribute number for the column - * in the target relation that matches the given Var. If the column does not - * exist or is not comparable, it returns InvalidAttrNumber. - */ -AttrNumber -GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var) -{ - char *targetColumnName = get_attname(relid, var->varattno, false); - AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName); - if (attnum == InvalidAttrNumber) - { - ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s", - targetColumnName, rteTarget->eref->aliasname))); - return InvalidAttrNumber; - } - if(var->vartype != get_atttype(rteTarget->relid, attnum)) - { - ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d", - targetColumnName, rteTarget->relid, relid))); - return InvalidAttrNumber; - } - return attnum; -} + diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 1a5423711..30aef5865 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -23,7 +23,6 @@ #include "distributed/query_utils.h" -bool ExtractIndexesForConstaints(List *fromList, List *rtable, int *outerRtIndex, RangeTblEntry **distRte); extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList); diff --git a/src/include/distributed/distribution_column.h b/src/include/distributed/distribution_column.h index a7ec6a593..58679fa47 100644 --- a/src/include/distributed/distribution_column.h +++ b/src/include/distributed/distribution_column.h @@ -25,5 +25,6 @@ extern Var * BuildDistributionKeyFromColumnName(Oid relationId, extern char * ColumnToColumnName(Oid relationId, Node *columnNode); extern Oid ColumnTypeIdForRelationColumnName(Oid relationId, char *columnName); extern void EnsureValidDistributionColumn(Oid relationId, char *columnName); +extern AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var); #endif /* DISTRIBUTION_COLUMN_H */ diff --git a/src/include/distributed/query_utils.h b/src/include/distributed/query_utils.h index 680f2bfc2..c3e20d14e 100644 --- a/src/include/distributed/query_utils.h +++ b/src/include/distributed/query_utils.h @@ -46,6 +46,5 @@ extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList); extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context); -extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte); -AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var); +extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte, List **citusRelids); #endif /* QUERY_UTILS_H */ diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index b7fad123d..e04869273 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -52,6 +52,8 @@ extern bool IsRelationLocalTableOrMatView(Oid relationId); extern bool ContainsReferencesToOuterQuery(Query *query); extern void UpdateVarNosInNode(Node *node, Index newVarNo); extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte); +extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum); +extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/recurring_join_pushdown.out b/src/test/regress/expected/recurring_join_pushdown.out new file mode 100644 index 000000000..66f80d727 --- /dev/null +++ b/src/test/regress/expected/recurring_join_pushdown.out @@ -0,0 +1,273 @@ +CREATE SCHEMA recurring_join_pushdown; +SET search_path TO recurring_join_pushdown; +SET citus.next_shard_id TO 1520000; +SET citus.shard_count TO 4; +CREATE TABLE r1(a int, b int); +SELECT create_reference_table('r1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO r1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (3, 20), (10, 1), (10, 2); +--- For testing, remove before merge +CREATE TABLE r1_local(like r1); +INSERT INTO r1_local select * from r1; +CREATE TABLE d1(a int, b int); +SELECT create_distributed_table('d1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); +--- For testing, remove before merge +CREATE TABLE d1_local(like d1); +INSERT INTO d1_local select * from d1; +SET client_min_messages TO DEBUG3; +-- Basic test cases +-- Test that the join is pushed down to the worker nodes, using "using" syntax +SELECT count(*) FROM r1 LEFT JOIN d1 using (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a); + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + a | b +--------------------------------------------------------------------- + 1 | 10 + 1 | 11 + 1 | 20 + 2 | 10 + 2 | 12 + 2 | 20 + 3 | 20 + 10 | 1 + 10 | 2 +(9 rows) + +SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2; + a | b +--------------------------------------------------------------------- + 1 | 10 + 1 | 11 + 1 | 20 + 2 | 10 + 2 | 12 + 2 | 20 + 3 | 20 + 10 | 1 + 10 | 2 +(9 rows) + +SET client_min_messages TO DEBUG1; +-- Test that the join is not pushed down when joined on a non-distributed column +SELECT count(*) FROM r1 LEFT JOIN d1 USING (b); +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b)) + count +--------------------------------------------------------------------- + 14 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b); + count +--------------------------------------------------------------------- + 14 +(1 row) + +-- Basic test cases with ON syntax +-- Test that the join is pushed down to the worker nodes, using "on" syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a; + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + a | b | a | b +--------------------------------------------------------------------- + 1 | 10 | 1 | 10 + 1 | 11 | 1 | 11 + 1 | 20 | 1 | 20 + 2 | 10 | 2 | 10 + 2 | 12 | 2 | 12 + 2 | 20 | 2 | 20 + 3 | 20 | | + 10 | 1 | | + 10 | 2 | | +(9 rows) + +SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2; + a | b | a | b +--------------------------------------------------------------------- + 1 | 10 | 1 | 10 + 1 | 11 | 1 | 11 + 1 | 20 | 1 | 20 + 2 | 10 | 2 | 10 + 2 | 12 | 2 | 12 + 2 | 20 | 2 | 20 + 3 | 20 | | + 10 | 1 | | + 10 | 2 | | +(9 rows) + +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: a push down safe left join with recurring left side +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx + count +--------------------------------------------------------------------- + 13 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a; + count +--------------------------------------------------------------------- + 13 +(1 row) + +-- Test that the join is not pushed down when joined on a non-distributed column +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.b OPERATOR(pg_catalog.=) d1.b))) +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 14 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; + count +--------------------------------------------------------------------- + 14 +(1 row) + +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.b))) +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 11 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; + count +--------------------------------------------------------------------- + 11 +(1 row) + +SET client_min_messages TO DEBUG1; +-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_join_pushdown.d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b)))) + count +--------------------------------------------------------------------- + 26 +(1 row) + +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; + count +--------------------------------------------------------------------- + 26 +(1 row) + diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 39a9e4070..4be4a1cd0 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -226,6 +226,7 @@ test: local_table_join test: local_dist_join_mixed test: citus_local_dist_joins test: recurring_outer_join +test: recurring_join_pushdown test: query_single_shard_table test: insert_select_single_shard_table test: pg_dump diff --git a/src/test/regress/sql/recurring_join_pushdown.sql b/src/test/regress/sql/recurring_join_pushdown.sql new file mode 100644 index 000000000..b44b6b192 --- /dev/null +++ b/src/test/regress/sql/recurring_join_pushdown.sql @@ -0,0 +1,59 @@ +CREATE SCHEMA recurring_join_pushdown; +SET search_path TO recurring_join_pushdown; + +SET citus.next_shard_id TO 1520000; +SET citus.shard_count TO 4; + +CREATE TABLE r1(a int, b int); +SELECT create_reference_table('r1'); +INSERT INTO r1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (3, 20), (10, 1), (10, 2); + +--- For testing, remove before merge +CREATE TABLE r1_local(like r1); +INSERT INTO r1_local select * from r1; + +CREATE TABLE d1(a int, b int); +SELECT create_distributed_table('d1', 'a'); +INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20); + +--- For testing, remove before merge +CREATE TABLE d1_local(like d1); +INSERT INTO d1_local select * from d1; + + +SET client_min_messages TO DEBUG3; + +-- Basic test cases +-- Test that the join is pushed down to the worker nodes, using "using" syntax +SELECT count(*) FROM r1 LEFT JOIN d1 using (a); +SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a); + +SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2; +SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2; + +SET client_min_messages TO DEBUG1; +-- Test that the join is not pushed down when joined on a non-distributed column +SELECT count(*) FROM r1 LEFT JOIN d1 USING (b); +SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b); + +-- Basic test cases with ON syntax +-- Test that the join is pushed down to the worker nodes, using "on" syntax +SET client_min_messages TO DEBUG3; +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a; +SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2; +SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2; + + +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a; +-- Test that the join is not pushed down when joined on a non-distributed column +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; + +SET client_min_messages TO DEBUG1; +-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions +SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; \ No newline at end of file diff --git a/src/test/regress/sql/recurring_outer_join.sql b/src/test/regress/sql/recurring_outer_join.sql index d33309817..8fd1c4340 100644 --- a/src/test/regress/sql/recurring_outer_join.sql +++ b/src/test/regress/sql/recurring_outer_join.sql @@ -24,6 +24,9 @@ INSERT INTO dist_1 VALUES (7, 41), (7, 42); +CREATE TABLE dist_1_local(LIKE dist_1); +INSERT INTO dist_1_local SELECT * FROM dist_1; + CREATE TABLE dist_2_columnar(LIKE dist_1) USING columnar; INSERT INTO dist_2_columnar SELECT * FROM dist_1; SELECT create_distributed_table('dist_2_columnar', 'a'); @@ -35,6 +38,28 @@ CREATE TABLE dist_3_partitioned_p3 PARTITION OF dist_3_partitioned FOR VALUES FR SELECT create_distributed_table('dist_3_partitioned', 'a'); INSERT INTO dist_3_partitioned SELECT * FROM dist_1; +CREATE TABLE dist_4 (a int, b int); +SELECT create_distributed_table('dist_4', 'a'); +INSERT INTO dist_4 VALUES +(1, 100), +(1, 101), +(1, 300), +(2, 20), +(2, 21), +(2, 400), +(2, 23), +(3, 102), +(3, 301), +(3, 300), +(3, null), +(3, 34), +(7, 40), +(7, null), +(7, 11); + +CREATE TABLE dist_4_local(LIKE dist_4); +INSERT INTO dist_4_local SELECT * FROM dist_4; + CREATE TABLE ref_1 (a int, b int); SELECT create_reference_table('ref_1'); INSERT INTO ref_1 VALUES @@ -54,6 +79,33 @@ INSERT INTO ref_1 VALUES (null, 401), (null, 402); +CREATE TABLE ref_1_local(LIKE ref_1); +INSERT INTO ref_1_local SELECT * FROM ref_1; + +--- We create a second reference table that does not have the distribution column +CREATE TABLE ref_2 (a2 int, b int); +SELECT create_reference_table('ref_2'); +INSERT INTO ref_2 VALUES +(1, null), +(1, 100), +(1, 11), +(null, 102), +(2, 200), +(2, 21), +(null, 202), +(2, 203), +(4, 300), +(4, 301), +(null, 302), +(4, 303), +(4, 304), +(null, 400), +(null, 401), +(null, 402); + +CREATE TABLE ref_2_local(LIKE ref_2); +INSERT INTO ref_2_local SELECT * FROM ref_2; + CREATE TABLE local_1 (a int, b int); INSERT INTO local_1 VALUES (null, 1000), @@ -95,8 +147,16 @@ ALTER TABLE dist_5_with_pkey ADD CONSTRAINT pkey_1 PRIMARY KEY (a); -- SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a); +SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a); SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a,b); +SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a,b); + +SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_4 USING (b); +SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_4_local USING (b); + +SELECT * FROM ref_2 LEFT JOIN dist_4 USING (b) ORDER BY b, a2, a; +SELECT * FROM ref_2_local LEFT JOIN dist_4_local USING (b) ORDER BY b, a2, a; SELECT COUNT(*) FROM dist_1 RIGHT JOIN ref_1 USING (a);