Merge pull request #2597 from citusdata/full_outer_pushdown

Fix full outer join with subquery pushdown
pull/2609/head^2
Burak Velioglu 2019-03-05 17:08:08 +03:00 committed by GitHub
commit 900ffa76f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 482 additions and 43 deletions

View File

@ -79,9 +79,12 @@ static bool IsRecurringRTE(RangeTblEntry *rangeTableEntry,
static bool IsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType);
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree);
static void FlattenJoinVars(List *columnList, Query *queryTree);
static List * FlattenJoinVars(List *columnList, Query *queryTree);
static void UpdateVarMappingsForExtendedOpNode(List *columnList,
List *flattenedColumnList,
List *subqueryTargetEntryList);
static void UpdateColumnToMatchingTargetEntry(Var *column, Node *flattenedExpr,
List *targetEntryList);
static MultiTable * MultiSubqueryPushdownTable(Query *subquery);
static List * CreateSubqueryTargetEntryList(List *columnList);
static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
@ -1413,6 +1416,7 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
{
List *targetEntryList = queryTree->targetList;
List *columnList = NIL;
List *flattenedExprList = NIL;
List *targetColumnList = NIL;
MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
MultiTable *subqueryNode = NULL;
@ -1472,25 +1476,26 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
*/
/*
* uniqueColumnList contains all columns returned by subquery. Subquery target
* columnList contains all columns returned by subquery. Subquery target
* entry list, subquery range table entry's column name list are derived from
* uniqueColumnList. Columns mentioned in multiProject node and multiExtendedOp
* node are indexed with their respective position in uniqueColumnList.
* columnList. Columns mentioned in multiProject node and multiExtendedOp
* node are indexed with their respective position in columnList.
*/
targetColumnList = pull_var_clause_default((Node *) targetEntryList);
havingClauseColumnList = pull_var_clause_default(queryTree->havingQual);
columnList = list_concat(targetColumnList, havingClauseColumnList);
FlattenJoinVars(columnList, queryTree);
flattenedExprList = FlattenJoinVars(columnList, queryTree);
/* create a target entry for each unique column */
subqueryTargetEntryList = CreateSubqueryTargetEntryList(columnList);
subqueryTargetEntryList = CreateSubqueryTargetEntryList(flattenedExprList);
/*
* Update varno/varattno fields of columns in columnList to
* point to corresponding target entry in subquery target entry list.
*/
UpdateVarMappingsForExtendedOpNode(columnList, subqueryTargetEntryList);
UpdateVarMappingsForExtendedOpNode(columnList, flattenedExprList,
subqueryTargetEntryList);
/* new query only has target entries, join tree, and rtable*/
pushedDownQuery = makeNode(Query);
@ -1553,7 +1558,8 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
/*
* FlattenJoinVars iterates over provided columnList to identify
* Var's that are referenced from join RTE, and reverts back to their
* original RTEs.
* original RTEs. Then, returns a new list with reverted types. Note that,
* length of the original list and created list must be equal.
*
* This is required because Postgres allows columns to be referenced using
* a join alias. Therefore the same column from a table could be referenced
@ -1568,12 +1574,16 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
* Only exception is that, if a join is given an alias name, we do not want to
* flatten those var's. If we do, deparsing fails since it expects to see a join
* alias, and cannot access the RTE in the join tree by their names.
*
* Also note that in case of full outer joins, a column could be flattened to a
* coalesce expression if the column appears in the USING clause.
*/
static void
static List *
FlattenJoinVars(List *columnList, Query *queryTree)
{
ListCell *columnCell = NULL;
List *rteList = queryTree->rtable;
List *flattenedExprList = NIL;
foreach(columnCell, columnList)
{
@ -1595,7 +1605,7 @@ FlattenJoinVars(List *columnList, Query *queryTree)
columnRte = rt_fetch(column->varno, rteList);
if (columnRte->rtekind == RTE_JOIN && columnRte->alias == NULL)
{
Var *normalizedVar = NULL;
Node *normalizedNode = NULL;
if (root == NULL)
{
@ -1605,15 +1615,18 @@ FlattenJoinVars(List *columnList, Query *queryTree)
root->hasJoinRTEs = true;
}
normalizedVar = (Var *) flatten_join_alias_vars(root, (Node *) column);
normalizedNode = strip_implicit_coercions(flatten_join_alias_vars(root,
(Node *)
column));
flattenedExprList = lappend(flattenedExprList, copyObject(normalizedNode));
}
else
{
flattenedExprList = lappend(flattenedExprList, copyObject(column));
}
}
/*
* We need to copy values over existing one to make sure it is updated on
* respective places.
*/
memcpy(column, normalizedVar, sizeof(Var));
}
}
return flattenedExprList;
}
@ -1622,28 +1635,28 @@ FlattenJoinVars(List *columnList, Query *queryTree)
* in the column list and returns the target entry list.
*/
static List *
CreateSubqueryTargetEntryList(List *columnList)
CreateSubqueryTargetEntryList(List *exprList)
{
AttrNumber resNo = 1;
ListCell *columnCell = NULL;
List *uniqueColumnList = NIL;
ListCell *exprCell = NULL;
List *uniqueExprList = NIL;
List *subqueryTargetEntryList = NIL;
foreach(columnCell, columnList)
foreach(exprCell, exprList)
{
Var *column = (Var *) lfirst(columnCell);
uniqueColumnList = list_append_unique(uniqueColumnList, copyObject(column));
Node *expr = (Node *) lfirst(exprCell);
uniqueExprList = list_append_unique(uniqueExprList, expr);
}
foreach(columnCell, uniqueColumnList)
foreach(exprCell, uniqueExprList)
{
Var *column = (Var *) lfirst(columnCell);
Node *expr = (Node *) lfirst(exprCell);
TargetEntry *newTargetEntry = makeNode(TargetEntry);
StringInfo columnNameString = makeStringInfo();
StringInfo exprNameString = makeStringInfo();
newTargetEntry->expr = (Expr *) copyObject(column);
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT, resNo);
newTargetEntry->resname = columnNameString->data;
newTargetEntry->expr = (Expr *) copyObject(expr);
appendStringInfo(exprNameString, WORKER_COLUMN_FORMAT, resNo);
newTargetEntry->resname = exprNameString->data;
newTargetEntry->resjunk = false;
newTargetEntry->resno = resNo;
@ -1661,28 +1674,89 @@ CreateSubqueryTargetEntryList(List *columnList)
* list.
*/
static void
UpdateVarMappingsForExtendedOpNode(List *columnList, List *subqueryTargetEntryList)
UpdateVarMappingsForExtendedOpNode(List *columnList, List *flattenedExprList,
List *subqueryTargetEntryList)
{
ListCell *columnCell = NULL;
foreach(columnCell, columnList)
ListCell *flattenedExprCell = NULL;
Assert(list_length(columnList) == list_length(flattenedExprList));
forboth(columnCell, columnList, flattenedExprCell, flattenedExprList)
{
Var *columnOnTheExtendedNode = (Var *) lfirst(columnCell);
Node *flattenedExpr = (Node *) lfirst(flattenedExprCell);
/*
* As an optimization, subqueryTargetEntryList only consists of
* distinct elements. In other words, any duplicate entries in the
* target list consolidated into a single element to prevent pulling
* unnecessary data from the worker nodes (e.g. SELECT a,a,a,b,b,b FROM x;
* is turned into SELECT a,b FROM x_102008).
*
* Thus, at this point we should iterate on the subqueryTargetEntryList
* and ensure that the column on the extended op node points to the
* correct target entry.
*/
UpdateColumnToMatchingTargetEntry(columnOnTheExtendedNode, flattenedExpr,
subqueryTargetEntryList);
}
}
/*
* UpdateColumnToMatchingTargetEntry sets the variable of given column entry to
* the matching entry of the targetEntryList. Since data type of the column can
* be different from the types of the elements of targetEntryList, we use flattenedExpr.
*/
static void
UpdateColumnToMatchingTargetEntry(Var *column, Node *flattenedExpr, List *targetEntryList)
{
ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, subqueryTargetEntryList)
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Var *targetColumn = NULL;
Assert(IsA(targetEntry->expr, Var));
targetColumn = (Var *) targetEntry->expr;
if (columnOnTheExtendedNode->varno == targetColumn->varno &&
columnOnTheExtendedNode->varattno == targetColumn->varattno)
if (IsA(targetEntry->expr, Var))
{
columnOnTheExtendedNode->varno = 1;
columnOnTheExtendedNode->varattno = targetEntry->resno;
Var *targetEntryVar = (Var *) targetEntry->expr;
if (IsA(flattenedExpr, Var) && equal(flattenedExpr, targetEntryVar))
{
column->varno = 1;
column->varattno = targetEntry->resno;
break;
}
}
else if (IsA(targetEntry->expr, CoalesceExpr))
{
/*
* flatten_join_alias_vars() flattens full oter joins' columns that is
* in the USING part into COALESCE(left_col, right_col)
*/
CoalesceExpr *targetCoalesceExpr = (CoalesceExpr *) targetEntry->expr;
if (IsA(flattenedExpr, CoalesceExpr) && equal(flattenedExpr,
targetCoalesceExpr))
{
Oid expressionType = exprType(flattenedExpr);
int32 expressionTypmod = exprTypmod(flattenedExpr);
Oid expressionCollation = exprCollation(flattenedExpr);
column->varno = 1;
column->varattno = targetEntry->resno;
column->vartype = expressionType;
column->vartypmod = expressionTypmod;
column->varcollid = expressionCollation;
break;
}
}
else
{
elog(ERROR, "unrecognized node type on the target list: %d",
nodeTag(targetEntry->expr));
}
}
}

View File

@ -0,0 +1,250 @@
--
-- Full join with subquery pushdown support
--
SET citus.next_shard_id TO 9000000;
CREATE SCHEMA full_join;
SET search_path TO full_join, public;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id bigint, val1 int);
CREATE TABLE test_table_3(id int, val1 bigint);
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('test_table_3', 'id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3);
INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4);
INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id
----
1
2
3
4
(4 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
----+------+------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
(4 rows)
-- Join subqueries using single column
SELECT * FROM
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id)
ORDER BY 1;
id
----
1
2
3
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id, val1)
ORDER BY 1;
id | val1
----+------
1 | 1
2 | 2
3 | 3
|
|
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
----+------
1 | 1
2 | 2
3 | 3
4 | 5
(4 rows)
-- Full join with complicated target lists
SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null
FROM test_table_1 FULL JOIN test_table_3 using(id)
WHERE id::bigint < 55
GROUP BY id
ORDER BY 2
ASC LIMIT 3;
count | avg_value | not_null
-------+-----------+----------
1 | 2 | t
1 | 6 | t
1 | 12 | t
(3 rows)
SELECT max(val1)
FROM test_table_1 FULL JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
-----
1
2
3
5
(4 rows)
-- Test the left join as well
SELECT max(val1)
FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
-----
1
2
3
(3 rows)
-- Full outer join with different distribution column types, should error out
SELECT * FROM test_table_1 full join test_table_2 using(id);
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Test when the non-distributed column has the value of NULL
INSERT INTO test_table_1 VALUES(7, NULL);
INSERT INTO test_table_2 VALUES(7, NULL);
INSERT INTO test_table_3 VALUES(7, NULL);
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
----+------+------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
7 | |
(5 rows)
-- Get the same result (with multiple id)
SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1;
id | val1 | id | val1
----+------+----+------
1 | 1 | 1 | 1
2 | 2 | |
3 | 3 | 3 | 3
7 | | 7 |
| | 4 | 5
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
----+------
1 | 1
2 | 2
3 | 3
4 | 5
7 |
7 |
(6 rows)
-- In order to make the same test with different data types use text-varchar pair
-- instead of using int-bigint pair.
DROP TABLE test_table_1;
DROP TABLE test_table_2;
DROP TABLE test_table_3;
CREATE TABLE test_table_1(id int, val1 text);
CREATE TABLE test_table_2(id int, val1 varchar(30));
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL);
INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id
----
1
2
3
4
5
(5 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id | val1 | val1
----+-------+-------
1 | val_1 |
2 | val_2 | val_2
3 | val_3 | val_3
4 | | val_4
5 | |
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1
FULL JOIN
(SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2
USING(id, val1)
ORDER BY 1,2;
id | val1
----+-------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
|
|
(8 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2;
id | val1
----+-------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
(6 rows)
DROP SCHEMA full_join CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table test_table_1
drop cascades to table test_table_2

View File

@ -65,7 +65,7 @@ test: multi_explain
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure multi_function_in_join
test: multi_subquery_in_where_reference_clause
test: multi_subquery_in_where_reference_clause full_join
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
test: multi_reference_table multi_select_for_update relation_access_tracking

View File

@ -0,0 +1,115 @@
--
-- Full join with subquery pushdown support
--
SET citus.next_shard_id TO 9000000;
CREATE SCHEMA full_join;
SET search_path TO full_join, public;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id bigint, val1 int);
CREATE TABLE test_table_3(id int, val1 bigint);
SELECT create_distributed_table('test_table_1', 'id');
SELECT create_distributed_table('test_table_2', 'id');
SELECT create_distributed_table('test_table_3', 'id');
INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3);
INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4);
INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
-- Join subqueries using single column
SELECT * FROM
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id)
ORDER BY 1;
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id, val1)
ORDER BY 1;
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
-- Full join with complicated target lists
SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null
FROM test_table_1 FULL JOIN test_table_3 using(id)
WHERE id::bigint < 55
GROUP BY id
ORDER BY 2
ASC LIMIT 3;
SELECT max(val1)
FROM test_table_1 FULL JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
-- Test the left join as well
SELECT max(val1)
FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
-- Full outer join with different distribution column types, should error out
SELECT * FROM test_table_1 full join test_table_2 using(id);
-- Test when the non-distributed column has the value of NULL
INSERT INTO test_table_1 VALUES(7, NULL);
INSERT INTO test_table_2 VALUES(7, NULL);
INSERT INTO test_table_3 VALUES(7, NULL);
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
-- Get the same result (with multiple id)
SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1;
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
-- In order to make the same test with different data types use text-varchar pair
-- instead of using int-bigint pair.
DROP TABLE test_table_1;
DROP TABLE test_table_2;
DROP TABLE test_table_3;
CREATE TABLE test_table_1(id int, val1 text);
CREATE TABLE test_table_2(id int, val1 varchar(30));
SELECT create_distributed_table('test_table_1', 'id');
SELECT create_distributed_table('test_table_2', 'id');
INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL);
INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1
FULL JOIN
(SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2
USING(id, val1)
ORDER BY 1,2;
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2;
DROP SCHEMA full_join CASCADE;