Refactor the code, add checks for the join clause, add basic test cases.

leftjoin_push
eaydingol 2025-05-28 19:27:25 +03:00
parent 851386f94a
commit abf51818f6
13 changed files with 596 additions and 128 deletions

View File

@ -206,58 +206,10 @@ UpdateTaskQueryString(Query *query, Task *task)
SetTaskQueryIfShouldLazyDeparse(task, query);
}
/*
* Iterates through the FROM clause of the query and checks if there is a join
* expr with a reference and distributed table.
* If there is, it adds the index of the range table entry of the outer
* table in the join clause to the constraintIndexes list. It also sets the
* innerRte to point to the range table entry inner table.
*/
bool ExtractIndexesForConstaints(List *fromList, List *rtable,
int *outerRtIndex, RangeTblEntry **distRte)
{
ereport(DEBUG5, (errmsg("******")));
ListCell *fromExprCell;
// Check the first element of the from clause, the rest is already handled
foreach(fromExprCell, fromList)
{
Node *fromElement = (Node *) lfirst(fromExprCell);
if (IsA(fromElement, JoinExpr))
{
JoinExpr *joinExpr = (JoinExpr *) fromElement;
if(!IS_OUTER_JOIN(joinExpr->jointype))
{
continue;
}
*outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex);
RangeTblEntry *outerRte = rt_fetch(*outerRtIndex, rtable);
if(!IsPushdownSafeForRTEInLeftJoin(outerRte))
{
return false;
}
if (!CheckIfAllCitusRTEsAreColocated((Node *)joinExpr->rarg, rtable, distRte))
{
return false;
}
return true;
}
}
return false;
}
/*
* UpdateWhereClauseForOuterJoin walks over the query tree and appends quals
* to the WHERE clause to filter w.r.to the distribution column of the corresponding shard.
* TODO:
* - Not supported cases should not call this function.
* - Remove the excessive debug messages.
*/
bool
UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
@ -273,17 +225,27 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
}
Query *query = (Query *) node;
if (query->jointree == NULL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
FromExpr *fromExpr = query->jointree;
if(fromExpr == NULL)
if(fromExpr == NULL || fromExpr->fromlist == NIL)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
// TODO: generalize to the list
Node *firstFromItem = linitial(fromExpr->fromlist);
if (!IsA(firstFromItem, JoinExpr))
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
JoinExpr *joinExpr = (JoinExpr *) firstFromItem;
/*
* We need to find the outer table in the join clause to add the constraints w.r.to the shard
* intervals of the inner table.
@ -293,13 +255,15 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
RangeTblEntry *innerRte = NULL;
RangeTblEntry *outerRte = NULL;
int outerRtIndex = -1;
bool result = ExtractIndexesForConstaints(fromExpr->fromlist, query->rtable, &outerRtIndex, &innerRte);
if (!result)
int attnum;
if(!CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum))
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
if( attnum == InvalidAttrNumber)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: failed to extract indexes")));
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
ereport(DEBUG5, (errmsg("\t Distributed table from the inner part: %s", innerRte->eref->aliasname)));
@ -311,18 +275,9 @@ UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList)
Var *partitionColumnVar = cacheEntry->partitionColumn;
/*
* we will add constraints for the outer table, we need to find the column in the outer
* table that is comparable to the partition column of the inner table.
* If the column does not exist, we return without modifying the query.
* If the column exists, we create a Var node for the outer table's partition column.
* we will add constraints for the outer table,
* we create a Var node for the outer table's column that is compared with the distribution column.
*/
outerRte = rt_fetch(outerRtIndex, query->rtable);
AttrNumber attnum = GetAttrNumForMatchingColumn(outerRte, relationId, partitionColumnVar);
// TODO: we also have to check that the tables are joined on the partition column.
if( attnum == InvalidAttrNumber)
{
return query_tree_walker(query, UpdateWhereClauseForOuterJoin, relationShardList, 0);
}
Var* outerTablePartitionColumnVar = makeVar(
outerRtIndex, attnum, partitionColumnVar->vartype,

View File

@ -2231,8 +2231,6 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
*/
if (IsInnerTableOfOuterJoin(relationRestriction))
{
ereport(DEBUG1, errmsg("Inner Table of Outer Join %d",
relationRestriction->relationId));
innerTableOfOuterJoin = true;
}

View File

@ -76,6 +76,7 @@
#include "distributed/combine_query_planner.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h"
#include "distributed/distribution_column.h"
#include "distributed/errormessage.h"
#include "distributed/listutils.h"
#include "distributed/local_distributed_join_planner.h"
@ -753,47 +754,21 @@ RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
if (leftNodeRecurs && !rightNodeRecurs)
{
int outerRtIndex = ((RangeTblRef *) leftNode)->rtindex;
RangeTblEntry *rte = rt_fetch(outerRtIndex, query->rtable);
RangeTblEntry *innerRte = NULL;
bool planned = false;
if (!IsPushdownSafeForRTEInLeftJoin(rte))
if(!CheckPushDownFeasibilityLeftJoin(joinExpr, query))
{
ereport(DEBUG1, (errmsg("recursively planning right side of "
"the left join since the outer side "
"is a recurring rel that is not an RTE")));
"is a recurring rel and it is not "
"feasible to push down")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
planned = true;
}
else if (!CheckIfAllCitusRTEsAreColocated(rightNode, query->rtable, &innerRte))
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since tables in the inner side of the left "
"join are not colocated")));
RecursivelyPlanDistributedJoinNode(rightNode, query,
recursivePlanningContext);
planned = true;
}
if(!planned)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(innerRte->relid)
if(GetAttrNumForMatchingColumn(rte, innerRte->relid, cacheEntry->partitionColumn) == InvalidAttrNumber)
{
ereport(DEBUG1, (errmsg("recursively planning right side of the left join "
"since the outer side does not have the distribution column")));
RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext);
}
else
{
ereport(DEBUG1, (errmsg("not recursively planning right side of the left join "
"since the outer side is a RangeTblRef and "
"the inner side is colocated with it")));
ereport(DEBUG3, (errmsg("a push down safe left join with recurring left side")));
}
}
}
/*
* rightNodeRecurs if there is a recurring table in the right side. However, if the right side
* is a join expression, we need to check if it contains a recurring table. If it does, we need to
@ -2706,3 +2681,146 @@ bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte)
}
}
/*
* CheckPushDownFeasibilityAndComputeIndexes checks if the given join expression
* is a left outer join and if it is feasible to push down the join. If feasible,
* it computes the outer relation's range table index, the outer relation's
* range table entry, the inner (distributed) relation's range table entry, and the
* attribute number of the partition column in the outer relation.
*/
bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum)
{
if(!IS_OUTER_JOIN(joinExpr->jointype))
{
return false;
}
// TODO: generalize to right joins
if(joinExpr->jointype != JOIN_LEFT)
{
return false;
}
*outerRtIndex = (((RangeTblRef *)joinExpr->larg)->rtindex);
*outerRte = rt_fetch(*outerRtIndex, query->rtable);
if(!IsPushdownSafeForRTEInLeftJoin(*outerRte))
{
return false;
}
RangeTblRef *rightTableRef = (RangeTblRef *) joinExpr->rarg;
RangeTblEntry *rightTableEntry = rt_fetch(rightTableRef->rtindex, query->rtable);
// Check if the join is performed on the distribution column
if (joinExpr->usingClause)
{
if(rightTableEntry->rtekind != RTE_RELATION && rightTableEntry->rtekind != RTE_FUNCTION)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a relation or function when using clause is present")));
return false;
}
if(!IsCitusTable(rightTableEntry->relid))
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: right table is not a Citus table when using clause is present")));
return false;
}
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rightTableEntry->relid);
Var *partitionColumnVar = cacheEntry->partitionColumn;
char *partitionColumnName = get_attname(rightTableEntry->relid, partitionColumnVar->varattno, false);
// Here we check if the partition column is in the using clause
ListCell *lc;
foreach(lc, joinExpr->usingClause)
{
char *colname = strVal(lfirst(lc));
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: checking column %s in using clause", colname)));
// If the column name matches the partition column name
if (strcmp(colname, partitionColumnName) == 0)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: found partition column in using clause")));
*distRte = rightTableEntry;
// Get the attribute number for the outer table
*attnum = GetAttrNumForMatchingColumn(*outerRte, rightTableEntry->relid, partitionColumnVar);
if(*attnum != InvalidAttrNumber)
{
return true;
}
}
}
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: partition column not found in using clause")));
return false;
}
else
{
List *joinClauseList = make_ands_implicit((Expr *) joinExpr->quals);
if (joinClauseList == NIL)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no quals in join clause")));
return false;
}
Node *joinClause = NULL;
foreach_declared_ptr(joinClause, joinClauseList)
{
if (!NodeIsEqualsOpExpr(joinClause))
{
continue;
}
OpExpr *joinClauseExpr = castNode(OpExpr, joinClause);
Var *leftColumn = LeftColumnOrNULL(joinClauseExpr);
Var *rightColumn = RightColumnOrNULL(joinClauseExpr);
if (leftColumn == NULL || rightColumn == NULL)
{
continue;
}
RangeTblEntry *rte;
int attnumForInner;
if (leftColumn->varno == *outerRtIndex)
{
/* left column is the outer table of the comparison, get right */
rte = rt_fetch(rightColumn->varno, query->rtable);
attnumForInner = rightColumn->varattno;
*attnum = leftColumn->varattno;
}
else if (rightColumn->varno == *outerRtIndex)
{
/* right column is the outer table of the comparison, get left*/
rte = rt_fetch(leftColumn->varno, query->rtable);
attnumForInner = leftColumn->varattno;
*attnum = rightColumn->varattno;
}
if (rte && IsCitusTable(rte->relid))
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(rte->relid);
if(attnumForInner == cacheEntry->partitionColumn->varattno)
{
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: join on distribution column of %s",
rte->eref->aliasname)));
*distRte = rte;
return true;
}
}
}
ereport(DEBUG5, (errmsg("ExtractIndexesForConstaints: no join clause on distribution column found")));
return false;
}
}
/*
* Initializes input variables to call CheckPushDownFeasibilityAndComputeIndexes.
* See CheckPushDownFeasibilityAndComputeIndexes for more details.
*/
bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query)
{
int outerRtIndex;
RangeTblEntry *outerRte = NULL;
RangeTblEntry *innerRte = NULL;
int attnum;
return CheckPushDownFeasibilityAndComputeIndexes(joinExpr, query, &outerRtIndex, &outerRte, &innerRte, &attnum);
}

View File

@ -282,3 +282,29 @@ ColumnToColumnName(Oid relationId, Node *columnNode)
return columnName;
}
/*
* GetAttrNumForMatchingColumn returns the attribute number for the column
* in the target relation that matches the given Var. If the column does not
* exist or is not comparable, it returns InvalidAttrNumber.
*/
AttrNumber
GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var)
{
char *targetColumnName = get_attname(relid, var->varattno, false);
AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName);
if (attnum == InvalidAttrNumber)
{
ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s",
targetColumnName, rteTarget->eref->aliasname)));
return InvalidAttrNumber;
}
if(var->vartype != get_atttype(rteTarget->relid, attnum))
{
ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d",
targetColumnName, rteTarget->relid, relid)));
return InvalidAttrNumber;
}
return attnum;
}

View File

@ -17,6 +17,7 @@
#include "nodes/primnodes.h"
#include "parser/parsetree.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/query_utils.h"
#include "distributed/relation_restriction_equivalence.h"
@ -241,7 +242,7 @@ ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context)
* representative table.
*/
bool
CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte, List **citusRelids)
{
ExtractRangeTableIdsContext context;
List *idList = NIL;
@ -250,7 +251,6 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
ExtractRangeTableIds(node, &context);
RangeTblEntry *rteTmp;
List *citusRelids = NIL;
ListCell *lc = NULL;
foreach(lc, idList)
@ -258,7 +258,7 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
rteTmp = (RangeTblEntry *) lfirst(lc);
if (IsCitusTable(rteTmp->relid))
{
citusRelids = lappend_int(citusRelids, rteTmp->relid);
*citusRelids = lappend_int(*citusRelids, rteTmp->relid);
*rte = rteTmp; // set the value of rte, a representative table
}
}
@ -272,27 +272,4 @@ CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte)
return true;
}
/*
* GetAttrNumForMatchingColumn returns the attribute number for the column
* in the target relation that matches the given Var. If the column does not
* exist or is not comparable, it returns InvalidAttrNumber.
*/
AttrNumber
GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var)
{
char *targetColumnName = get_attname(relid, var->varattno, false);
AttrNumber attnum = get_attnum(rteTarget->relid, targetColumnName);
if (attnum == InvalidAttrNumber)
{
ereport(DEBUG5, (errmsg("Column %s does not exist in relation %s",
targetColumnName, rteTarget->eref->aliasname)));
return InvalidAttrNumber;
}
if(var->vartype != get_atttype(rteTarget->relid, attnum))
{
ereport(DEBUG5, (errmsg("Column %s is not comparable for tables with relids %d and %d",
targetColumnName, rteTarget->relid, relid)));
return InvalidAttrNumber;
}
return attnum;
}

View File

@ -23,7 +23,6 @@
#include "distributed/query_utils.h"
bool ExtractIndexesForConstaints(List *fromList, List *rtable, int *outerRtIndex, RangeTblEntry **distRte);
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern bool UpdateWhereClauseForOuterJoin(Node *node, List *relationShardList);

View File

@ -25,5 +25,6 @@ extern Var * BuildDistributionKeyFromColumnName(Oid relationId,
extern char * ColumnToColumnName(Oid relationId, Node *columnNode);
extern Oid ColumnTypeIdForRelationColumnName(Oid relationId, char *columnName);
extern void EnsureValidDistributionColumn(Oid relationId, char *columnName);
extern AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var);
#endif /* DISTRIBUTION_COLUMN_H */

View File

@ -46,6 +46,5 @@ extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
extern bool ExtractRangeTableIds(Node *node, ExtractRangeTableIdsContext *context);
extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte);
AttrNumber GetAttrNumForMatchingColumn(RangeTblEntry *rteTarget, Oid relid, Var *var);
extern bool CheckIfAllCitusRTEsAreColocated(Node *node, List *rtable, RangeTblEntry **rte, List **citusRelids);
#endif /* QUERY_UTILS_H */

View File

@ -52,6 +52,8 @@ extern bool IsRelationLocalTableOrMatView(Oid relationId);
extern bool ContainsReferencesToOuterQuery(Query *query);
extern void UpdateVarNosInNode(Node *node, Index newVarNo);
extern bool IsPushdownSafeForRTEInLeftJoin(RangeTblEntry *rte);
extern bool CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum);
extern bool CheckPushDownFeasibilityLeftJoin(JoinExpr *joinExpr, Query *query);
#endif /* RECURSIVE_PLANNING_H */

View File

@ -0,0 +1,273 @@
CREATE SCHEMA recurring_join_pushdown;
SET search_path TO recurring_join_pushdown;
SET citus.next_shard_id TO 1520000;
SET citus.shard_count TO 4;
CREATE TABLE r1(a int, b int);
SELECT create_reference_table('r1');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO r1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (3, 20), (10, 1), (10, 2);
--- For testing, remove before merge
CREATE TABLE r1_local(like r1);
INSERT INTO r1_local select * from r1;
CREATE TABLE d1(a int, b int);
SELECT create_distributed_table('d1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20);
--- For testing, remove before merge
CREATE TABLE d1_local(like d1);
INSERT INTO d1_local select * from d1;
SET client_min_messages TO DEBUG3;
-- Basic test cases
-- Test that the join is pushed down to the worker nodes, using "using" syntax
SELECT count(*) FROM r1 LEFT JOIN d1 using (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a);
count
---------------------------------------------------------------------
21
(1 row)
SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
a | b
---------------------------------------------------------------------
1 | 10
1 | 11
1 | 20
2 | 10
2 | 12
2 | 20
3 | 20
10 | 1
10 | 2
(9 rows)
SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2;
a | b
---------------------------------------------------------------------
1 | 10
1 | 11
1 | 20
2 | 10
2 | 12
2 | 20
3 | 20
10 | 1
10 | 2
(9 rows)
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 USING (b);
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 USING (b))
count
---------------------------------------------------------------------
14
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b);
count
---------------------------------------------------------------------
14
(1 row)
-- Basic test cases with ON syntax
-- Test that the join is pushed down to the worker nodes, using "on" syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
21
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a;
count
---------------------------------------------------------------------
21
(1 row)
SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
a | b | a | b
---------------------------------------------------------------------
1 | 10 | 1 | 10
1 | 11 | 1 | 11
1 | 20 | 1 | 20
2 | 10 | 2 | 10
2 | 12 | 2 | 12
2 | 20 | 2 | 20
3 | 20 | |
10 | 1 | |
10 | 2 | |
(9 rows)
SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2;
a | b | a | b
---------------------------------------------------------------------
1 | 10 | 1 | 10
1 | 11 | 1 | 11
1 | 20 | 1 | 20
2 | 10 | 2 | 10
2 | 12 | 2 | 12
2 | 20 | 2 | 20
3 | 20 | |
10 | 1 | |
10 | 2 | |
(9 rows)
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: a push down safe left join with recurring left side
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
count
---------------------------------------------------------------------
13
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a;
count
---------------------------------------------------------------------
13
(1 row)
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.b OPERATOR(pg_catalog.=) d1.b)))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
14
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b;
count
---------------------------------------------------------------------
14
(1 row)
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b;
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT b FROM recurring_join_pushdown.d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT NULL::integer AS a, d1_1.b FROM (SELECT intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer)) d1_1) d1 ON ((r1.a OPERATOR(pg_catalog.=) d1.b)))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
11
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b;
count
---------------------------------------------------------------------
11
(1 row)
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b;
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel and it is not feasible to push down
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a, b FROM recurring_join_pushdown.d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, d1_1.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) d1_1) d1 ON (((r1.a OPERATOR(pg_catalog.=) d1.a) OR (r1.b OPERATOR(pg_catalog.=) d1.b))))
count
---------------------------------------------------------------------
26
(1 row)
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;
count
---------------------------------------------------------------------
26
(1 row)

View File

@ -226,6 +226,7 @@ test: local_table_join
test: local_dist_join_mixed
test: citus_local_dist_joins
test: recurring_outer_join
test: recurring_join_pushdown
test: query_single_shard_table
test: insert_select_single_shard_table
test: pg_dump

View File

@ -0,0 +1,59 @@
CREATE SCHEMA recurring_join_pushdown;
SET search_path TO recurring_join_pushdown;
SET citus.next_shard_id TO 1520000;
SET citus.shard_count TO 4;
CREATE TABLE r1(a int, b int);
SELECT create_reference_table('r1');
INSERT INTO r1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20), (3, 20), (10, 1), (10, 2);
--- For testing, remove before merge
CREATE TABLE r1_local(like r1);
INSERT INTO r1_local select * from r1;
CREATE TABLE d1(a int, b int);
SELECT create_distributed_table('d1', 'a');
INSERT INTO d1 VALUES (1,10), (1,11), (1,20), (2,10), (2,12), (2, 20);
--- For testing, remove before merge
CREATE TABLE d1_local(like d1);
INSERT INTO d1_local select * from d1;
SET client_min_messages TO DEBUG3;
-- Basic test cases
-- Test that the join is pushed down to the worker nodes, using "using" syntax
SELECT count(*) FROM r1 LEFT JOIN d1 using (a);
SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a);
SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2;
SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2;
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 USING (b);
SELECT count(*) FROM r1_local LEFT JOIN d1_local USING (b);
-- Basic test cases with ON syntax
-- Test that the join is pushed down to the worker nodes, using "on" syntax
SET client_min_messages TO DEBUG3;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a;
SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_local.b = d1_local.b ORDER BY 1, 2;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b;
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;

View File

@ -24,6 +24,9 @@ INSERT INTO dist_1 VALUES
(7, 41),
(7, 42);
CREATE TABLE dist_1_local(LIKE dist_1);
INSERT INTO dist_1_local SELECT * FROM dist_1;
CREATE TABLE dist_2_columnar(LIKE dist_1) USING columnar;
INSERT INTO dist_2_columnar SELECT * FROM dist_1;
SELECT create_distributed_table('dist_2_columnar', 'a');
@ -35,6 +38,28 @@ CREATE TABLE dist_3_partitioned_p3 PARTITION OF dist_3_partitioned FOR VALUES FR
SELECT create_distributed_table('dist_3_partitioned', 'a');
INSERT INTO dist_3_partitioned SELECT * FROM dist_1;
CREATE TABLE dist_4 (a int, b int);
SELECT create_distributed_table('dist_4', 'a');
INSERT INTO dist_4 VALUES
(1, 100),
(1, 101),
(1, 300),
(2, 20),
(2, 21),
(2, 400),
(2, 23),
(3, 102),
(3, 301),
(3, 300),
(3, null),
(3, 34),
(7, 40),
(7, null),
(7, 11);
CREATE TABLE dist_4_local(LIKE dist_4);
INSERT INTO dist_4_local SELECT * FROM dist_4;
CREATE TABLE ref_1 (a int, b int);
SELECT create_reference_table('ref_1');
INSERT INTO ref_1 VALUES
@ -54,6 +79,33 @@ INSERT INTO ref_1 VALUES
(null, 401),
(null, 402);
CREATE TABLE ref_1_local(LIKE ref_1);
INSERT INTO ref_1_local SELECT * FROM ref_1;
--- We create a second reference table that does not have the distribution column
CREATE TABLE ref_2 (a2 int, b int);
SELECT create_reference_table('ref_2');
INSERT INTO ref_2 VALUES
(1, null),
(1, 100),
(1, 11),
(null, 102),
(2, 200),
(2, 21),
(null, 202),
(2, 203),
(4, 300),
(4, 301),
(null, 302),
(4, 303),
(4, 304),
(null, 400),
(null, 401),
(null, 402);
CREATE TABLE ref_2_local(LIKE ref_2);
INSERT INTO ref_2_local SELECT * FROM ref_2;
CREATE TABLE local_1 (a int, b int);
INSERT INTO local_1 VALUES
(null, 1000),
@ -95,8 +147,16 @@ ALTER TABLE dist_5_with_pkey ADD CONSTRAINT pkey_1 PRIMARY KEY (a);
--
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a);
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_1 USING (a,b);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_1_local USING (a,b);
SELECT COUNT(*) FROM ref_1 LEFT JOIN dist_4 USING (b);
SELECT COUNT(*) FROM ref_1_local LEFT JOIN dist_4_local USING (b);
SELECT * FROM ref_2 LEFT JOIN dist_4 USING (b) ORDER BY b, a2, a;
SELECT * FROM ref_2_local LEFT JOIN dist_4_local USING (b) ORDER BY b, a2, a;
SELECT COUNT(*) FROM dist_1 RIGHT JOIN ref_1 USING (a);