diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 4b439a528..219600906 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -79,9 +79,12 @@ static bool IsRecurringRTE(RangeTblEntry *rangeTableEntry, static bool IsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType); static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType); static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree); -static void FlattenJoinVars(List *columnList, Query *queryTree); +static List * FlattenJoinVars(List *columnList, Query *queryTree); static void UpdateVarMappingsForExtendedOpNode(List *columnList, + List *flattenedColumnList, List *subqueryTargetEntryList); +static void UpdateColumnToMatchingTargetEntry(Var *column, Node *flattenedExpr, + List *targetEntryList); static MultiTable * MultiSubqueryPushdownTable(Query *subquery); static List * CreateSubqueryTargetEntryList(List *columnList); static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, @@ -1413,6 +1416,7 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) { List *targetEntryList = queryTree->targetList; List *columnList = NIL; + List *flattenedExprList = NIL; List *targetColumnList = NIL; MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect); MultiTable *subqueryNode = NULL; @@ -1472,25 +1476,26 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) */ /* - * uniqueColumnList contains all columns returned by subquery. Subquery target + * columnList contains all columns returned by subquery. Subquery target * entry list, subquery range table entry's column name list are derived from - * uniqueColumnList. Columns mentioned in multiProject node and multiExtendedOp - * node are indexed with their respective position in uniqueColumnList. + * columnList. Columns mentioned in multiProject node and multiExtendedOp + * node are indexed with their respective position in columnList. */ targetColumnList = pull_var_clause_default((Node *) targetEntryList); havingClauseColumnList = pull_var_clause_default(queryTree->havingQual); columnList = list_concat(targetColumnList, havingClauseColumnList); - FlattenJoinVars(columnList, queryTree); + flattenedExprList = FlattenJoinVars(columnList, queryTree); /* create a target entry for each unique column */ - subqueryTargetEntryList = CreateSubqueryTargetEntryList(columnList); + subqueryTargetEntryList = CreateSubqueryTargetEntryList(flattenedExprList); /* * Update varno/varattno fields of columns in columnList to * point to corresponding target entry in subquery target entry list. */ - UpdateVarMappingsForExtendedOpNode(columnList, subqueryTargetEntryList); + UpdateVarMappingsForExtendedOpNode(columnList, flattenedExprList, + subqueryTargetEntryList); /* new query only has target entries, join tree, and rtable*/ pushedDownQuery = makeNode(Query); @@ -1553,7 +1558,8 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) /* * FlattenJoinVars iterates over provided columnList to identify * Var's that are referenced from join RTE, and reverts back to their - * original RTEs. + * original RTEs. Then, returns a new list with reverted types. Note that, + * length of the original list and created list must be equal. * * This is required because Postgres allows columns to be referenced using * a join alias. Therefore the same column from a table could be referenced @@ -1568,12 +1574,16 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) * Only exception is that, if a join is given an alias name, we do not want to * flatten those var's. If we do, deparsing fails since it expects to see a join * alias, and cannot access the RTE in the join tree by their names. + * + * Also note that in case of full outer joins, a column could be flattened to a + * coalesce expression if the column appears in the USING clause. */ -static void +static List * FlattenJoinVars(List *columnList, Query *queryTree) { ListCell *columnCell = NULL; List *rteList = queryTree->rtable; + List *flattenedExprList = NIL; foreach(columnCell, columnList) { @@ -1595,7 +1605,7 @@ FlattenJoinVars(List *columnList, Query *queryTree) columnRte = rt_fetch(column->varno, rteList); if (columnRte->rtekind == RTE_JOIN && columnRte->alias == NULL) { - Var *normalizedVar = NULL; + Node *normalizedNode = NULL; if (root == NULL) { @@ -1605,15 +1615,18 @@ FlattenJoinVars(List *columnList, Query *queryTree) root->hasJoinRTEs = true; } - normalizedVar = (Var *) flatten_join_alias_vars(root, (Node *) column); - - /* - * We need to copy values over existing one to make sure it is updated on - * respective places. - */ - memcpy(column, normalizedVar, sizeof(Var)); + normalizedNode = strip_implicit_coercions(flatten_join_alias_vars(root, + (Node *) + column)); + flattenedExprList = lappend(flattenedExprList, copyObject(normalizedNode)); + } + else + { + flattenedExprList = lappend(flattenedExprList, copyObject(column)); } } + + return flattenedExprList; } @@ -1622,28 +1635,28 @@ FlattenJoinVars(List *columnList, Query *queryTree) * in the column list and returns the target entry list. */ static List * -CreateSubqueryTargetEntryList(List *columnList) +CreateSubqueryTargetEntryList(List *exprList) { AttrNumber resNo = 1; - ListCell *columnCell = NULL; - List *uniqueColumnList = NIL; + ListCell *exprCell = NULL; + List *uniqueExprList = NIL; List *subqueryTargetEntryList = NIL; - foreach(columnCell, columnList) + foreach(exprCell, exprList) { - Var *column = (Var *) lfirst(columnCell); - uniqueColumnList = list_append_unique(uniqueColumnList, copyObject(column)); + Node *expr = (Node *) lfirst(exprCell); + uniqueExprList = list_append_unique(uniqueExprList, expr); } - foreach(columnCell, uniqueColumnList) + foreach(exprCell, uniqueExprList) { - Var *column = (Var *) lfirst(columnCell); + Node *expr = (Node *) lfirst(exprCell); TargetEntry *newTargetEntry = makeNode(TargetEntry); - StringInfo columnNameString = makeStringInfo(); + StringInfo exprNameString = makeStringInfo(); - newTargetEntry->expr = (Expr *) copyObject(column); - appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT, resNo); - newTargetEntry->resname = columnNameString->data; + newTargetEntry->expr = (Expr *) copyObject(expr); + appendStringInfo(exprNameString, WORKER_COLUMN_FORMAT, resNo); + newTargetEntry->resname = exprNameString->data; newTargetEntry->resjunk = false; newTargetEntry->resno = resNo; @@ -1661,28 +1674,89 @@ CreateSubqueryTargetEntryList(List *columnList) * list. */ static void -UpdateVarMappingsForExtendedOpNode(List *columnList, List *subqueryTargetEntryList) +UpdateVarMappingsForExtendedOpNode(List *columnList, List *flattenedExprList, + List *subqueryTargetEntryList) { ListCell *columnCell = NULL; - foreach(columnCell, columnList) + ListCell *flattenedExprCell = NULL; + + Assert(list_length(columnList) == list_length(flattenedExprList)); + + forboth(columnCell, columnList, flattenedExprCell, flattenedExprList) { Var *columnOnTheExtendedNode = (Var *) lfirst(columnCell); - ListCell *targetEntryCell = NULL; - foreach(targetEntryCell, subqueryTargetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Var *targetColumn = NULL; + Node *flattenedExpr = (Node *) lfirst(flattenedExprCell); - Assert(IsA(targetEntry->expr, Var)); - targetColumn = (Var *) targetEntry->expr; - if (columnOnTheExtendedNode->varno == targetColumn->varno && - columnOnTheExtendedNode->varattno == targetColumn->varattno) + /* + * As an optimization, subqueryTargetEntryList only consists of + * distinct elements. In other words, any duplicate entries in the + * target list consolidated into a single element to prevent pulling + * unnecessary data from the worker nodes (e.g. SELECT a,a,a,b,b,b FROM x; + * is turned into SELECT a,b FROM x_102008). + * + * Thus, at this point we should iterate on the subqueryTargetEntryList + * and ensure that the column on the extended op node points to the + * correct target entry. + */ + UpdateColumnToMatchingTargetEntry(columnOnTheExtendedNode, flattenedExpr, + subqueryTargetEntryList); + } +} + + +/* + * UpdateColumnToMatchingTargetEntry sets the variable of given column entry to + * the matching entry of the targetEntryList. Since data type of the column can + * be different from the types of the elements of targetEntryList, we use flattenedExpr. + */ +static void +UpdateColumnToMatchingTargetEntry(Var *column, Node *flattenedExpr, List *targetEntryList) +{ + ListCell *targetEntryCell = NULL; + + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (IsA(targetEntry->expr, Var)) + { + Var *targetEntryVar = (Var *) targetEntry->expr; + + if (IsA(flattenedExpr, Var) && equal(flattenedExpr, targetEntryVar)) { - columnOnTheExtendedNode->varno = 1; - columnOnTheExtendedNode->varattno = targetEntry->resno; + column->varno = 1; + column->varattno = targetEntry->resno; break; } } + else if (IsA(targetEntry->expr, CoalesceExpr)) + { + /* + * flatten_join_alias_vars() flattens full oter joins' columns that is + * in the USING part into COALESCE(left_col, right_col) + */ + CoalesceExpr *targetCoalesceExpr = (CoalesceExpr *) targetEntry->expr; + + if (IsA(flattenedExpr, CoalesceExpr) && equal(flattenedExpr, + targetCoalesceExpr)) + { + Oid expressionType = exprType(flattenedExpr); + int32 expressionTypmod = exprTypmod(flattenedExpr); + Oid expressionCollation = exprCollation(flattenedExpr); + + column->varno = 1; + column->varattno = targetEntry->resno; + column->vartype = expressionType; + column->vartypmod = expressionTypmod; + column->varcollid = expressionCollation; + break; + } + } + else + { + elog(ERROR, "unrecognized node type on the target list: %d", + nodeTag(targetEntry->expr)); + } } } diff --git a/src/test/regress/expected/full_join.out b/src/test/regress/expected/full_join.out new file mode 100644 index 000000000..522376d78 --- /dev/null +++ b/src/test/regress/expected/full_join.out @@ -0,0 +1,250 @@ +-- +-- Full join with subquery pushdown support +-- +SET citus.next_shard_id TO 9000000; +CREATE SCHEMA full_join; +SET search_path TO full_join, public; +CREATE TABLE test_table_1(id int, val1 int); +CREATE TABLE test_table_2(id bigint, val1 int); +CREATE TABLE test_table_3(id int, val1 bigint); +SELECT create_distributed_table('test_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_table_3', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3); +INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4); +INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5); +-- Simple full outer join +SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + id +---- + 1 + 2 + 3 + 4 +(4 rows) + +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + id | val1 | val1 +----+------+------ + 1 | 1 | 1 + 2 | 2 | + 3 | 3 | 3 + 4 | | 5 +(4 rows) + +-- Join subqueries using single column +SELECT * FROM + (SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1 + FULL JOIN + (SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2 + USING(id) + ORDER BY 1; + id +---- + 1 + 2 + 3 + + +(5 rows) + +-- Join subqueries using multiple columns +SELECT * FROM + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1 + FULL JOIN + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2 + USING(id, val1) + ORDER BY 1; + id | val1 +----+------ + 1 | 1 + 2 | 2 + 3 | 3 + | + | +(5 rows) + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1; + id | val1 +----+------ + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 5 +(4 rows) + +-- Full join with complicated target lists +SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null +FROM test_table_1 FULL JOIN test_table_3 using(id) +WHERE id::bigint < 55 +GROUP BY id +ORDER BY 2 +ASC LIMIT 3; + count | avg_value | not_null +-------+-----------+---------- + 1 | 2 | t + 1 | 6 | t + 1 | 12 | t +(3 rows) + +SELECT max(val1) +FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) +GROUP BY test_table_1.id +ORDER BY 1; + max +----- + 1 + 2 + 3 + 5 +(4 rows) + +-- Test the left join as well +SELECT max(val1) +FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1) +GROUP BY test_table_1.id +ORDER BY 1; + max +----- + 1 + 2 + 3 +(3 rows) + +-- Full outer join with different distribution column types, should error out +SELECT * FROM test_table_1 full join test_table_2 using(id); +ERROR: cannot push down this subquery +DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning +-- Test when the non-distributed column has the value of NULL +INSERT INTO test_table_1 VALUES(7, NULL); +INSERT INTO test_table_2 VALUES(7, NULL); +INSERT INTO test_table_3 VALUES(7, NULL); +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + id | val1 | val1 +----+------+------ + 1 | 1 | 1 + 2 | 2 | + 3 | 3 | 3 + 4 | | 5 + 7 | | +(5 rows) + +-- Get the same result (with multiple id) +SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1; + id | val1 | id | val1 +----+------+----+------ + 1 | 1 | 1 | 1 + 2 | 2 | | + 3 | 3 | 3 | 3 + 7 | | 7 | + | | 4 | 5 +(5 rows) + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1; + id | val1 +----+------ + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 5 + 7 | + 7 | +(6 rows) + +-- In order to make the same test with different data types use text-varchar pair +-- instead of using int-bigint pair. +DROP TABLE test_table_1; +DROP TABLE test_table_2; +DROP TABLE test_table_3; +CREATE TABLE test_table_1(id int, val1 text); +CREATE TABLE test_table_2(id int, val1 varchar(30)); +SELECT create_distributed_table('test_table_1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_table_2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL); +INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL); +-- Simple full outer join +SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1; + id +---- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1; + id | val1 | val1 +----+-------+------- + 1 | val_1 | + 2 | val_2 | val_2 + 3 | val_3 | val_3 + 4 | | val_4 + 5 | | +(5 rows) + +-- Join subqueries using multiple columns +SELECT * FROM + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1 + FULL JOIN + (SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2 + USING(id, val1) + ORDER BY 1,2; + id | val1 +----+------- + 1 | val_1 + 2 | val_2 + 3 | val_3 + 4 | val_4 + 4 | + 5 | + | + | +(8 rows) + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2; + id | val1 +----+------- + 1 | val_1 + 2 | val_2 + 3 | val_3 + 4 | val_4 + 4 | + 5 | +(6 rows) + +DROP SCHEMA full_join CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_table_1 +drop cascades to table test_table_2 diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4425b9e96..ddf2ccc92 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -65,7 +65,7 @@ test: multi_explain test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: sql_procedure multi_function_in_join -test: multi_subquery_in_where_reference_clause +test: multi_subquery_in_where_reference_clause full_join test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql test: multi_reference_table multi_select_for_update relation_access_tracking diff --git a/src/test/regress/sql/full_join.sql b/src/test/regress/sql/full_join.sql new file mode 100644 index 000000000..19d06f19d --- /dev/null +++ b/src/test/regress/sql/full_join.sql @@ -0,0 +1,115 @@ +-- +-- Full join with subquery pushdown support +-- + +SET citus.next_shard_id TO 9000000; + +CREATE SCHEMA full_join; +SET search_path TO full_join, public; + +CREATE TABLE test_table_1(id int, val1 int); +CREATE TABLE test_table_2(id bigint, val1 int); +CREATE TABLE test_table_3(id int, val1 bigint); + +SELECT create_distributed_table('test_table_1', 'id'); +SELECT create_distributed_table('test_table_2', 'id'); +SELECT create_distributed_table('test_table_3', 'id'); + +INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3); +INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4); +INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5); + +-- Simple full outer join +SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + +-- Join subqueries using single column +SELECT * FROM + (SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1 + FULL JOIN + (SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2 + USING(id) + ORDER BY 1; + +-- Join subqueries using multiple columns +SELECT * FROM + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1 + FULL JOIN + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2 + USING(id, val1) + ORDER BY 1; + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1; + +-- Full join with complicated target lists +SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null +FROM test_table_1 FULL JOIN test_table_3 using(id) +WHERE id::bigint < 55 +GROUP BY id +ORDER BY 2 +ASC LIMIT 3; + +SELECT max(val1) +FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) +GROUP BY test_table_1.id +ORDER BY 1; + +-- Test the left join as well +SELECT max(val1) +FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1) +GROUP BY test_table_1.id +ORDER BY 1; + +-- Full outer join with different distribution column types, should error out +SELECT * FROM test_table_1 full join test_table_2 using(id); + +-- Test when the non-distributed column has the value of NULL +INSERT INTO test_table_1 VALUES(7, NULL); +INSERT INTO test_table_2 VALUES(7, NULL); +INSERT INTO test_table_3 VALUES(7, NULL); + +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1; + +-- Get the same result (with multiple id) +SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1; + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1; + +-- In order to make the same test with different data types use text-varchar pair +-- instead of using int-bigint pair. +DROP TABLE test_table_1; +DROP TABLE test_table_2; +DROP TABLE test_table_3; + +CREATE TABLE test_table_1(id int, val1 text); +CREATE TABLE test_table_2(id int, val1 varchar(30)); + +SELECT create_distributed_table('test_table_1', 'id'); +SELECT create_distributed_table('test_table_2', 'id'); + +INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL); +INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL); + +-- Simple full outer join +SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1; + +-- Get all columns as the result of the full join +SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1; + +-- Join subqueries using multiple columns +SELECT * FROM + (SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1 + FULL JOIN + (SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2 + USING(id, val1) + ORDER BY 1,2; + +-- Full join using multiple columns +SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2; + +DROP SCHEMA full_join CASCADE;