Enable partitioned distributed tables in local-dist table joins

pull/4358/head
Sait Talha Nisanci 2020-11-25 20:52:44 +03:00
parent 44953579cf
commit 2ff65f3630
5 changed files with 84 additions and 17 deletions

View File

@ -340,8 +340,7 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context,
rteToSubqueryConverterContext->hasSubqueryRTE = true;
}
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
if (!SubqueryConvertableRelationForJoin(rangeTableEntry))
{
continue;
}

View File

@ -1407,9 +1407,7 @@ AllDataLocallyAccessible(List *rangeTableList)
// TODO:: check if it has distributed table
return false;
}
/* we're only interested in tables */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
if (!SubqueryConvertableRelationForJoin(rangeTableEntry))
{
continue;
}
@ -1444,6 +1442,18 @@ AllDataLocallyAccessible(List *rangeTableList)
return true;
}
/*
* SubqueryConvertableRelationForJoin returns true if the given range table entry
* is a relation type that can be converted to a subquery.
*/
bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry) {
if (rangeTableEntry->rtekind != RTE_RELATION) {
return false;
}
return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE ||
rangeTableEntry->relkind == RELKIND_RELATION;
}
/*
* ContainsLocalTableDistributedTableJoin returns true if the input range table list
* contains a direct join between local and distributed tables.
@ -1459,10 +1469,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/* we're only interested in tables */
/* TODO:: What about partitioned tables? */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
if (!SubqueryConvertableRelationForJoin(rangeTableEntry))
{
continue;
}
@ -1500,10 +1507,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId)
if (rangeTableEntry->rtekind == RTE_SUBQUERY) {
containsSubquery = true;
}
/* we're only interested in tables */
/* TODO:: What about partitioned tables? */
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
if (!SubqueryConvertableRelationForJoin(rangeTableEntry))
{
continue;
}

View File

@ -66,5 +66,6 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
extern bool
ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId);
extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId);
extern bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry);
#endif /* RECURSIVE_PLANNING_H */

View File

@ -22,7 +22,7 @@ SELECT create_distributed_table('distributed_table_pkey', 'key');
(1 row)
CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb);
CREATE TABLE distributed_table_windex (key int primary key, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_windex', 'key');
create_distributed_table
---------------------------------------------------------------------
@ -30,6 +30,15 @@ SELECT create_distributed_table('distributed_table_windex', 'key');
(1 row)
CREATE UNIQUE INDEX key_index ON distributed_table_windex (key);
CREATE TABLE distributed_partitioned_table(key int, value text) PARTITION BY RANGE (key);
CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitioned_table FOR VALUES FROM (0) TO (10);
CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20);
SELECT create_distributed_table('distributed_partitioned_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO DEBUG1;
-- the user doesn't allow local / distributed table joinn
SET citus.local_table_join_policy TO 'never';
@ -112,6 +121,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
-- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10;
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN local_table_join.reference_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0
@ -695,6 +732,18 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key)
JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10;
DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10))
count
---------------------------------------------------------------------
0
(1 row)
-- update
UPDATE
distributed_table_windex
@ -732,4 +781,4 @@ SELECT master_remove_node('localhost', :master_port);
\set VERBOSITY terse
DROP SCHEMA local_table_join CASCADE;
NOTICE: drop cascades to 6 other objects
NOTICE: drop cascades to 7 other objects

View File

@ -10,10 +10,16 @@ SELECT create_distributed_table('distributed_table', 'key');
CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_pkey', 'key');
CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb);
CREATE TABLE distributed_table_windex (key int primary key, value text, value_2 jsonb);
SELECT create_distributed_table('distributed_table_windex', 'key');
CREATE UNIQUE INDEX key_index ON distributed_table_windex (key);
CREATE TABLE distributed_partitioned_table(key int, value text) PARTITION BY RANGE (key);
CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitioned_table FOR VALUES FROM (0) TO (10);
CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20);
SELECT create_distributed_table('distributed_partitioned_table', 'key');
SET client_min_messages TO DEBUG1;
@ -33,7 +39,6 @@ SET citus.local_table_join_policy TO 'prefer-distributed';
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key);
SELECT count(*) FROM postgres_table JOIN reference_table USING(key);
-- update/delete
-- auto tests
@ -46,6 +51,11 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key);
SELECT count(*) FROM reference_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
-- partitioned tables should work as well
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10;
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key);
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value);
@ -297,6 +307,9 @@ SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key
SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key);
SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key)
JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10;
-- update
UPDATE
distributed_table_windex