Fix incorrect join related fields (#4242)

* Fix incorrect join related fields

Ruleutils expect to give the original index of join columns hence we
should consider the dropped columns while setting the fields in
SetJoinRelatedFieldsCompat.

* add some more tests for joins

* Move tests to join.sql and create a utility function
pull/4262/head
SaitTalhaNisanci 2020-10-19 18:28:39 +03:00 committed by GitHub
parent c49077d594
commit 0f209377c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 768 additions and 280 deletions

View File

@ -131,13 +131,17 @@ static List * QueryFromList(List *rangeTableList);
static Node * QueryJoinTree(MultiNode *multiNode, List *dependentJobList,
List **rangeTableList);
static void SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
List *l_colnames, List *r_colnames,
List *leftColVars, List *rightColVars);
Oid leftRelId,
Oid rightRelId,
List *leftColumnVars,
List *rightColumnVars);
static RangeTblEntry * JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList,
List *rangeTableList);
static int ExtractRangeTableId(Node *node);
static void ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId,
List *dependentJobList, List **columnNames, List **columnVars);
static void ExtractColumns(RangeTblEntry *callingRTE, int rangeTableId,
List **columnNames, List **columnVars);
static RangeTblEntry * ConstructCallingRTE(RangeTblEntry *rangeTableEntry,
List *dependentJobList);
static Query * BuildSubqueryJobQuery(MultiNode *multiNode);
static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
List *dependentJobList);
@ -229,6 +233,10 @@ static StringInfo ColumnTypeArrayString(List *targetEntryList);
static bool CoPlacedShardIntervals(ShardInterval *firstInterval,
ShardInterval *secondInterval);
#if PG_VERSION_NUM >= PG_VERSION_13
static List * GetColumnOriginalIndexes(Oid relationId);
#endif
/*
* CreatePhysicalDistributedPlan is the entry point for physical plan generation. The
@ -1259,10 +1267,14 @@ JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTable
rangeTableEntry->subquery = NULL;
rangeTableEntry->eref = makeAlias("unnamed_join", NIL);
ExtractColumns(leftRTE, leftRangeTableId, dependentJobList,
RangeTblEntry *leftCallingRTE = ConstructCallingRTE(leftRTE, dependentJobList);
RangeTblEntry *rightCallingRte = ConstructCallingRTE(rightRTE, dependentJobList);
ExtractColumns(leftCallingRTE, leftRangeTableId,
&leftColumnNames, &leftColumnVars);
ExtractColumns(rightRTE, rightRangeTableId, dependentJobList,
ExtractColumns(rightCallingRte, rightRangeTableId,
&rightColumnNames, &rightColumnVars);
Oid leftRelId = leftCallingRTE->relid;
Oid rightRelId = rightCallingRte->relid;
joinedColumnNames = list_concat(joinedColumnNames, leftColumnNames);
joinedColumnNames = list_concat(joinedColumnNames, rightColumnNames);
joinedColumnVars = list_concat(joinedColumnVars, leftColumnVars);
@ -1271,38 +1283,78 @@ JoinRangeTableEntry(JoinExpr *joinExpr, List *dependentJobList, List *rangeTable
rangeTableEntry->eref->colnames = joinedColumnNames;
rangeTableEntry->joinaliasvars = joinedColumnVars;
SetJoinRelatedColumnsCompat(rangeTableEntry,
leftColumnNames, rightColumnNames, leftColumnVars,
SetJoinRelatedColumnsCompat(rangeTableEntry, leftRelId, rightRelId, leftColumnVars,
rightColumnVars);
return rangeTableEntry;
}
/*
* SetJoinRelatedColumnsCompat sets join related fields on the given range table entry.
* Currently it sets joinleftcols/joinrightcols which are introduced with postgres 13.
* For more info see postgres commit: 9ce77d75c5ab094637cc4a446296dc3be6e3c221
*/
static void
SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry,
List *leftColumnNames, List *rightColumnNames,
SetJoinRelatedColumnsCompat(RangeTblEntry *rangeTableEntry, Oid leftRelId, Oid rightRelId,
List *leftColumnVars, List *rightColumnVars)
{
#if PG_VERSION_NUM >= PG_VERSION_13
/* We don't have any merged columns so set it to 0 */
rangeTableEntry->joinmergedcols = 0;
int numvars = list_length(leftColumnVars);
for (int varId = 1; varId <= numvars; varId++)
if (OidIsValid(leftRelId))
{
rangeTableEntry->joinleftcols = lappend_int(rangeTableEntry->joinleftcols, varId);
rangeTableEntry->joinleftcols = GetColumnOriginalIndexes(leftRelId);
}
numvars = list_length(rightColumnVars);
for (int varId = 1; varId <= numvars; varId++)
else
{
rangeTableEntry->joinrightcols = lappend_int(rangeTableEntry->joinrightcols,
varId);
int leftColsSize = list_length(leftColumnVars);
rangeTableEntry->joinleftcols = GeneratePositiveIntSequenceList(leftColsSize);
}
if (OidIsValid(rightRelId))
{
rangeTableEntry->joinrightcols = GetColumnOriginalIndexes(rightRelId);
}
else
{
int rightColsSize = list_length(rightColumnVars);
rangeTableEntry->joinrightcols = GeneratePositiveIntSequenceList(rightColsSize);
}
#endif
}
#if PG_VERSION_NUM >= PG_VERSION_13
/*
* GetColumnOriginalIndexes gets the original indexes of columns by taking column drops into account.
*/
static List *
GetColumnOriginalIndexes(Oid relationId)
{
List *originalIndexes = NIL;
Relation relation = table_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
{
Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
if (currentColumn->attisdropped)
{
continue;
}
originalIndexes = lappend_int(originalIndexes, columnIndex + 1);
}
table_close(relation, NoLock);
return originalIndexes;
}
#endif
/*
* ExtractRangeTableId gets the range table id from a node that could
* either be a JoinExpr or RangeTblRef.
@ -1331,13 +1383,28 @@ ExtractRangeTableId(Node *node)
/*
* ExtractColumns gets a list of column names and vars for a given range
* table entry using expandRTE. Since the range table entries in a job
* query are mocked RTE_FUNCTION entries, it first translates the RTE's
* to a form that expandRTE can handle.
* table entry using expandRTE.
*/
static void
ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependentJobList,
ExtractColumns(RangeTblEntry *callingRTE, int rangeTableId,
List **columnNames, List **columnVars)
{
int subLevelsUp = 0;
int location = -1;
bool includeDroppedColumns = false;
expandRTE(callingRTE, rangeTableId, subLevelsUp, location, includeDroppedColumns,
columnNames, columnVars);
}
/*
* ConstructCallingRTE constructs a calling RTE from the given range table entry and
* dependentJobList in case of repartition joins. Since the range table entries in a job
* query are mocked RTE_FUNCTION entries, this construction is needed to form an RTE
* that expandRTE can handle.
*/
static RangeTblEntry *
ConstructCallingRTE(RangeTblEntry *rangeTableEntry, List *dependentJobList)
{
RangeTblEntry *callingRTE = NULL;
@ -1379,8 +1446,7 @@ ExtractColumns(RangeTblEntry *rangeTableEntry, int rangeTableId, List *dependent
{
ereport(ERROR, (errmsg("unsupported Citus RTE kind: %d", rangeTableKind)));
}
expandRTE(callingRTE, rangeTableId, 0, -1, false, columnNames, columnVars);
return callingRTE;
}

View File

@ -154,6 +154,23 @@ ListToHashSet(List *itemList, Size keySize, bool isStringList)
}
/*
* GeneratePositiveIntSequenceList generates a positive int
* sequence list up to the given number. The list will have:
* [1:upto]
*/
List *
GeneratePositiveIntSequenceList(int upTo)
{
List *intList = NIL;
for (int i = 1; i <= upTo; i++)
{
intList = lappend_int(intList, i);
}
return intList;
}
/*
* StringJoin gets a list of char * and then simply
* returns a newly allocated char * joined with the

View File

@ -91,6 +91,7 @@ extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
extern char * StringJoin(List *stringList, char delimiter);
extern List * ListTake(List *pointerList, int size);
extern void * safe_list_nth(const List *list, int index);
extern List * GeneratePositiveIntSequenceList(int upTo);
extern List * GenerateListFromElement(void *listElement, int listLength);
extern bool LeftListIsSubset(const List *lhs, const List *rhs);

View File

@ -1,250 +0,0 @@
--
-- Full join with subquery pushdown support
--
SET citus.next_shard_id TO 9000000;
CREATE SCHEMA full_join;
SET search_path TO full_join, public;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id bigint, val1 int);
CREATE TABLE test_table_3(id int, val1 bigint);
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_3', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3);
INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4);
INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
4
(4 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
(4 rows)
-- Join subqueries using single column
SELECT * FROM
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id)
ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id, val1)
ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
|
|
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 5
(4 rows)
-- Full join with complicated target lists
SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null
FROM test_table_1 FULL JOIN test_table_3 using(id)
WHERE id::bigint < 55
GROUP BY id
ORDER BY 2
ASC LIMIT 3;
count | avg_value | not_null
---------------------------------------------------------------------
1 | 2 | t
1 | 6 | t
1 | 12 | t
(3 rows)
SELECT max(val1)
FROM test_table_1 FULL JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
---------------------------------------------------------------------
1
2
3
5
(4 rows)
-- Test the left join as well
SELECT max(val1)
FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
---------------------------------------------------------------------
1
2
3
(3 rows)
-- Full outer join with different distribution column types, should error out
SELECT * FROM test_table_1 full join test_table_2 using(id);
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Test when the non-distributed column has the value of NULL
INSERT INTO test_table_1 VALUES(7, NULL);
INSERT INTO test_table_2 VALUES(7, NULL);
INSERT INTO test_table_3 VALUES(7, NULL);
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
7 | |
(5 rows)
-- Get the same result (with multiple id)
SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1;
id | val1 | id | val1
---------------------------------------------------------------------
1 | 1 | 1 | 1
2 | 2 | |
3 | 3 | 3 | 3
7 | | 7 |
| | 4 | 5
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 5
7 |
7 |
(6 rows)
-- In order to make the same test with different data types use text-varchar pair
-- instead of using int-bigint pair.
DROP TABLE test_table_1;
DROP TABLE test_table_2;
DROP TABLE test_table_3;
CREATE TABLE test_table_1(id int, val1 text);
CREATE TABLE test_table_2(id int, val1 varchar(30));
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL);
INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
4
5
(5 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | val_1 |
2 | val_2 | val_2
3 | val_3 | val_3
4 | | val_4
5 | |
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1
FULL JOIN
(SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2
USING(id, val1)
ORDER BY 1,2;
id | val1
---------------------------------------------------------------------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
|
|
(8 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2;
id | val1
---------------------------------------------------------------------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
(6 rows)
DROP SCHEMA full_join CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table test_table_1
drop cascades to table test_table_2

View File

@ -0,0 +1,472 @@
--
-- join with subquery pushdown support
--
SET citus.next_shard_id TO 9000000;
CREATE SCHEMA join_schema;
SET search_path TO join_schema, public;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id bigint, val1 int);
CREATE TABLE test_table_3(id int, val1 bigint);
CREATE TABLE abcd(a int, b int, c int, d int);
CREATE TABLE distributed_table(a int, b int);
CREATE TABLE reference_table(a int, c int, b int);
SELECT create_distributed_table('distributed_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_3', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('abcd', 'b');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3);
INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4);
INSERT INTO test_table_3 VALUES(1,1),(3,3),(4,5);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
4
(4 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
(4 rows)
-- Join subqueries using single column
SELECT * FROM
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id)
ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j1
FULL JOIN
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_3 using(id)) as j2
USING(id, val1)
ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
|
|
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 5
(4 rows)
-- Full join with complicated target lists
SELECT count(DISTINCT id), (avg(test_table_1.val1) + id * id)::integer as avg_value, id::numeric IS NOT NULL as not_null
FROM test_table_1 FULL JOIN test_table_3 using(id)
WHERE id::bigint < 55
GROUP BY id
ORDER BY 2
ASC LIMIT 3;
count | avg_value | not_null
---------------------------------------------------------------------
1 | 2 | t
1 | 6 | t
1 | 12 | t
(3 rows)
SELECT max(val1)
FROM test_table_1 FULL JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
---------------------------------------------------------------------
1
2
3
5
(4 rows)
-- Test the left join as well
SELECT max(val1)
FROM test_table_1 LEFT JOIN test_table_3 USING(id, val1)
GROUP BY test_table_1.id
ORDER BY 1;
max
---------------------------------------------------------------------
1
2
3
(3 rows)
-- Full outer join with different distribution column types, should error out
SELECT * FROM test_table_1 full join test_table_2 using(id);
ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Test when the non-distributed column has the value of NULL
INSERT INTO test_table_1 VALUES(7, NULL);
INSERT INTO test_table_2 VALUES(7, NULL);
INSERT INTO test_table_3 VALUES(7, NULL);
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_3 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | 1 | 1
2 | 2 |
3 | 3 | 3
4 | | 5
7 | |
(5 rows)
-- Get the same result (with multiple id)
SELECT * FROM test_table_1 FULL JOIN test_table_3 ON (test_table_1.id = test_table_3.id) ORDER BY 1;
id | val1 | id | val1
---------------------------------------------------------------------
1 | 1 | 1 | 1
2 | 2 | |
3 | 3 | 3 | 3
7 | | 7 |
| | 4 | 5
(5 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_3 USING(id, val1) ORDER BY 1;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 5
7 |
7 |
(6 rows)
-- In order to make the same test with different data types use text-varchar pair
-- instead of using int-bigint pair.
DROP TABLE test_table_1;
DROP TABLE test_table_2;
DROP TABLE test_table_3;
CREATE TABLE test_table_1(id int, val1 text);
CREATE TABLE test_table_2(id int, val1 varchar(30));
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,'val_1'),(2,'val_2'),(3,'val_3'), (4, NULL);
INSERT INTO test_table_2 VALUES(2,'val_2'),(3,'val_3'),(4,'val_4'), (5, NULL);
-- Simple full outer join
SELECT id FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id
---------------------------------------------------------------------
1
2
3
4
5
(5 rows)
-- Get all columns as the result of the full join
SELECT * FROM test_table_1 FULL JOIN test_table_2 using(id) ORDER BY 1;
id | val1 | val1
---------------------------------------------------------------------
1 | val_1 |
2 | val_2 | val_2
3 | val_3 | val_3
4 | | val_4
5 | |
(5 rows)
-- Join subqueries using multiple columns
SELECT * FROM
(SELECT test_table_1.id, test_table_1.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j1
FULL JOIN
(SELECT test_table_2.id, test_table_2.val1 FROM test_table_1 FULL JOIN test_table_2 using(id)) as j2
USING(id, val1)
ORDER BY 1,2;
id | val1
---------------------------------------------------------------------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
|
|
(8 rows)
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2;
id | val1
---------------------------------------------------------------------
1 | val_1
2 | val_2
3 | val_3
4 | val_4
4 |
5 |
(6 rows)
SET citus.enable_repartition_joins to ON;
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
a | b
---------------------------------------------------------------------
(0 rows)
ALTER TABLE reference_table DROP COLUMN c;
-- #4129: make sure a join after drop column works
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
a | b
---------------------------------------------------------------------
(0 rows)
BEGIN;
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
a | b
---------------------------------------------------------------------
(0 rows)
END;
INSERT INTO abcd VALUES (1,2,3,4);
INSERT INTO abcd VALUES (2,3,4,5);
INSERT INTO abcd VALUES (3,4,5,6);
SELECT * FROM abcd first join abcd second on first.a = second.a ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first join abcd second on first.a = second.a ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
a | b | c | d | a | b | c | d
---------------------------------------------------------------------
1 | 2 | 3 | 4 | 1 | 2 | 3 | 4
2 | 3 | 4 | 5 | 2 | 3 | 4 | 5
3 | 4 | 5 | 6 | 3 | 4 | 5 | 6
(3 rows)
END;
ALTER TABLE abcd DROP COLUMN a;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
CREATE VIEW abcd_view AS SELECT * FROM abcd;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd_view first join abcd_view second on first.c = second.c ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
SELECT * FROM abcd_view first join abcd_view second on first.c = second.c ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
SELECT * FROM abcd_view first join abcd second USING(b) ORDER BY 1,2,3,4;
b | c | d | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4
3 | 4 | 5 | 4 | 5
4 | 5 | 6 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) ORDER BY 1,2,3,4;
b | c | d | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4
3 | 4 | 5 | 4 | 5
4 | 5 | 6 | 5 | 6
(3 rows)
END;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
b | c | d | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 5 | 6 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
b | c | d | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
DROP SCHEMA join_schema CASCADE;
NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to table abcd
drop cascades to table distributed_table
drop cascades to table reference_table
drop cascades to table test_table_1
drop cascades to table test_table_2
drop cascades to view abcd_view

View File

@ -78,6 +78,17 @@ SELECT create_distributed_table('stats', 'account_id', colocate_with => 'account
INSERT INTO accounts (id) VALUES ('foo');
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
CREATE TABLE abcd(a int, b int, c int, d int);
SELECT create_distributed_table('abcd', 'b');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO abcd VALUES (1,2,3,4);
INSERT INTO abcd VALUES (2,3,4,5);
INSERT INTO abcd VALUES (3,4,5,6);
ALTER TABLE abcd DROP COLUMN a;
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_execution;
@ -380,6 +391,75 @@ INSERT INTO distributed_table VALUES (1, '22', 20);
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 (key, value, age) VALUES (1, '22'::text, 20)
INSERT INTO second_distributed_table VALUES (1, '1');
NOTICE: executing the command locally: INSERT INTO local_shard_execution.second_distributed_table_1470005 (key, value) VALUES (1, '1'::text)
CREATE VIEW abcd_view AS SELECT * FROM abcd;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
BEGIN;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.b, second.c, second.d FROM (local_shard_execution.abcd_1470025 first JOIN local_shard_execution.abcd_1470025 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) WHERE true
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.b, second.c, second.d FROM (local_shard_execution.abcd_1470027 first JOIN local_shard_execution.abcd_1470027 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) WHERE true
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
BEGIN;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
NOTICE: executing the command locally: SELECT abcd.b, abcd.c, abcd.d, abcd_1.b, abcd_1.c, abcd_1.d FROM (local_shard_execution.abcd_1470025 abcd JOIN local_shard_execution.abcd_1470025 abcd_1 ON ((abcd.b OPERATOR(pg_catalog.=) abcd_1.b))) WHERE true
NOTICE: executing the command locally: SELECT abcd.b, abcd.c, abcd.d, abcd_1.b, abcd_1.c, abcd_1.d FROM (local_shard_execution.abcd_1470027 abcd JOIN local_shard_execution.abcd_1470027 abcd_1 ON ((abcd.b OPERATOR(pg_catalog.=) abcd_1.b))) WHERE true
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
BEGIN;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
NOTICE: executing the command locally: SELECT worker_column_1 AS b, worker_column_2 AS c, worker_column_3 AS d, worker_column_4 AS b, worker_column_5 AS c, worker_column_6 AS d FROM (SELECT first.b AS worker_column_1, first.c AS worker_column_2, first.d AS worker_column_3, second.b AS worker_column_4, second.c AS worker_column_5, second.d AS worker_column_6 FROM (local_shard_execution.abcd_1470025 first FULL JOIN local_shard_execution.abcd_1470025 second ON ((first.b OPERATOR(pg_catalog.=) second.b)))) worker_subquery
NOTICE: executing the command locally: SELECT worker_column_1 AS b, worker_column_2 AS c, worker_column_3 AS d, worker_column_4 AS b, worker_column_5 AS c, worker_column_6 AS d FROM (SELECT first.b AS worker_column_1, first.c AS worker_column_2, first.d AS worker_column_3, second.b AS worker_column_4, second.c AS worker_column_5, second.d AS worker_column_6 FROM (local_shard_execution.abcd_1470027 first FULL JOIN local_shard_execution.abcd_1470027 second ON ((first.b OPERATOR(pg_catalog.=) second.b)))) worker_subquery
b | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) ORDER BY 1,2,3,4;
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.c, second.d FROM (local_shard_execution.abcd_1470025 first JOIN local_shard_execution.abcd_1470025 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) WHERE true
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.c, second.d FROM (local_shard_execution.abcd_1470027 first JOIN local_shard_execution.abcd_1470027 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) WHERE true
b | c | d | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4
3 | 4 | 5 | 4 | 5
4 | 5 | 6 | 5 | 6
(3 rows)
END;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.c, second.d, third.b, third.c, third.d FROM ((local_shard_execution.abcd_1470025 first JOIN local_shard_execution.abcd_1470025 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) JOIN local_shard_execution.abcd_1470025 third ON ((first.b OPERATOR(pg_catalog.=) third.b))) WHERE true
NOTICE: executing the command locally: SELECT first.b, first.c, first.d, second.c, second.d, third.b, third.c, third.d FROM ((local_shard_execution.abcd_1470027 first JOIN local_shard_execution.abcd_1470027 second ON ((first.b OPERATOR(pg_catalog.=) second.b))) JOIN local_shard_execution.abcd_1470027 third ON ((first.b OPERATOR(pg_catalog.=) third.b))) WHERE true
b | c | d | c | d | b | c | d
---------------------------------------------------------------------
2 | 3 | 4 | 3 | 4 | 2 | 3 | 4
3 | 4 | 5 | 4 | 5 | 3 | 4 | 5
4 | 5 | 6 | 5 | 6 | 4 | 5 | 6
(3 rows)
END;
-- copy always happens via distributed execution irrespective of the
-- shards that are accessed
COPY reference_table FROM STDIN;

View File

@ -92,7 +92,7 @@ test: multi_explain hyperscale_tutorial partitioned_intermediate_results distrib
test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure multi_function_in_join row_types materialized_view undistribute_table
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
test: multi_subquery_in_where_reference_clause join adaptive_executor propagate_set_commands
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
test: multi_reference_table multi_select_for_update relation_access_tracking pg13_with_ties

View File

@ -63,7 +63,7 @@ test: hyperscale_tutorial
test: multi_basic_queries multi_complex_expressions multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function
test: multi_function_in_join row_types
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
test: multi_subquery_in_where_reference_clause join adaptive_executor propagate_set_commands
test: rollback_to_savepoint insert_select_into_local_table undistribute_table
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_limit_clause_approximate multi_single_relation_subquery set_role_in_transaction

View File

@ -69,7 +69,7 @@ test: hyperscale_tutorial
test: multi_basic_queries multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function
test: multi_function_in_join row_types materialized_view
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
test: multi_subquery_in_where_reference_clause join adaptive_executor propagate_set_commands
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_limit_clause_approximate multi_outer_join_reference
test: multi_select_for_update relation_access_tracking pg13_with_ties

View File

@ -1,19 +1,26 @@
--
-- Full join with subquery pushdown support
-- join with subquery pushdown support
--
SET citus.next_shard_id TO 9000000;
CREATE SCHEMA full_join;
SET search_path TO full_join, public;
CREATE SCHEMA join_schema;
SET search_path TO join_schema, public;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id bigint, val1 int);
CREATE TABLE test_table_3(id int, val1 bigint);
CREATE TABLE abcd(a int, b int, c int, d int);
CREATE TABLE distributed_table(a int, b int);
CREATE TABLE reference_table(a int, c int, b int);
SELECT create_distributed_table('distributed_table', 'a');
SELECT create_reference_table('reference_table');
SELECT create_distributed_table('test_table_1', 'id');
SELECT create_distributed_table('test_table_2', 'id');
SELECT create_distributed_table('test_table_3', 'id');
SELECT create_distributed_table('abcd', 'b');
INSERT INTO test_table_1 VALUES(1,1),(2,2),(3,3);
INSERT INTO test_table_2 VALUES(2,2),(3,3),(4,4);
@ -112,4 +119,67 @@ SELECT * FROM
-- Full join using multiple columns
SELECT * FROM test_table_1 FULL JOIN test_table_2 USING(id, val1) ORDER BY 1,2;
DROP SCHEMA full_join CASCADE;
SET citus.enable_repartition_joins to ON;
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
ALTER TABLE reference_table DROP COLUMN c;
-- #4129: make sure a join after drop column works
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
BEGIN;
SELECT distributed_table.* from distributed_table JOIN reference_table ON (true);
END;
INSERT INTO abcd VALUES (1,2,3,4);
INSERT INTO abcd VALUES (2,3,4,5);
INSERT INTO abcd VALUES (3,4,5,6);
SELECT * FROM abcd first join abcd second on first.a = second.a ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first join abcd second on first.a = second.a ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
END;
ALTER TABLE abcd DROP COLUMN a;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd first join abcd second on first.c = second.c ORDER BY 1,2,3,4;
END;
CREATE VIEW abcd_view AS SELECT * FROM abcd;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd_view first join abcd_view second on first.c = second.c ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
SELECT * FROM abcd_view first join abcd_view second on first.c = second.c ORDER BY 1,2,3,4;
END;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
END;
SELECT * FROM abcd_view first join abcd second USING(b) ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) ORDER BY 1,2,3,4;
END;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
END;
DROP SCHEMA join_schema CASCADE;

View File

@ -56,6 +56,15 @@ SELECT create_distributed_table('stats', 'account_id', colocate_with => 'account
INSERT INTO accounts (id) VALUES ('foo');
INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
CREATE TABLE abcd(a int, b int, c int, d int);
SELECT create_distributed_table('abcd', 'b');
INSERT INTO abcd VALUES (1,2,3,4);
INSERT INTO abcd VALUES (2,3,4,5);
INSERT INTO abcd VALUES (3,4,5,6);
ALTER TABLE abcd DROP COLUMN a;
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_execution;
@ -217,6 +226,29 @@ SELECT * FROM second_distributed_table WHERE key = 1 ORDER BY 1,2;
INSERT INTO distributed_table VALUES (1, '22', 20);
INSERT INTO second_distributed_table VALUES (1, '1');
CREATE VIEW abcd_view AS SELECT * FROM abcd;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
BEGIN;
SELECT * FROM abcd first join abcd second on first.b = second.b ORDER BY 1,2,3,4;
END;
BEGIN;
SELECT * FROM abcd_view first join abcd_view second on first.b = second.b ORDER BY 1,2,3,4;
END;
BEGIN;
SELECT * FROM abcd first full join abcd second on first.b = second.b ORDER BY 1,2,3,4;
END;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) ORDER BY 1,2,3,4;
END;
BEGIN;
SELECT * FROM abcd first join abcd second USING(b) join abcd third on first.b=third.b ORDER BY 1,2,3,4;
END;
-- copy always happens via distributed execution irrespective of the
-- shards that are accessed