diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index 2ff2257ec..72499a93c 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -340,8 +340,7 @@ CreateRTEToSubqueryConverterContext(RecursivePlanningContext *context, rteToSubqueryConverterContext->hasSubqueryRTE = true; } /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) + if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 2213f92d5..390209fc0 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1407,9 +1407,7 @@ AllDataLocallyAccessible(List *rangeTableList) // TODO:: check if it has distributed table return false; } - /* we're only interested in tables */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) + if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } @@ -1444,6 +1442,18 @@ AllDataLocallyAccessible(List *rangeTableList) return true; } +/* + * SubqueryConvertableRelationForJoin returns true if the given range table entry + * is a relation type that can be converted to a subquery. + */ +bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry) { + if (rangeTableEntry->rtekind != RTE_RELATION) { + return false; + } + return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE || + rangeTableEntry->relkind == RELKIND_RELATION; +} + /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local and distributed tables. @@ -1459,10 +1469,7 @@ ContainsLocalTableDistributedTableJoin(List *rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); - /* we're only interested in tables */ - /* TODO:: What about partitioned tables? */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) + if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } @@ -1500,10 +1507,8 @@ ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId) if (rangeTableEntry->rtekind == RTE_SUBQUERY) { containsSubquery = true; } - /* we're only interested in tables */ - /* TODO:: What about partitioned tables? */ - if (!(rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->relkind == RELKIND_RELATION)) + + if (!SubqueryConvertableRelationForJoin(rangeTableEntry)) { continue; } diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index d48a20f13..e4b46745f 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -66,5 +66,6 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, extern bool ContainsLocalTableSubqueryJoin(List *rangeTableList, Oid resultRelationId); extern bool ContainsTableToBeConvertedToSubquery(List* rangeTableList, Oid resultRelationId); +extern bool SubqueryConvertableRelationForJoin(RangeTblEntry* rangeTableEntry); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/local_table_join.out b/src/test/regress/expected/local_table_join.out index eac5a5087..95af74534 100644 --- a/src/test/regress/expected/local_table_join.out +++ b/src/test/regress/expected/local_table_join.out @@ -22,7 +22,7 @@ SELECT create_distributed_table('distributed_table_pkey', 'key'); (1 row) -CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb); +CREATE TABLE distributed_table_windex (key int primary key, value text, value_2 jsonb); SELECT create_distributed_table('distributed_table_windex', 'key'); create_distributed_table --------------------------------------------------------------------- @@ -30,6 +30,15 @@ SELECT create_distributed_table('distributed_table_windex', 'key'); (1 row) CREATE UNIQUE INDEX key_index ON distributed_table_windex (key); +CREATE TABLE distributed_partitioned_table(key int, value text) PARTITION BY RANGE (key); +CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20); +SELECT create_distributed_table('distributed_partitioned_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + SET client_min_messages TO DEBUG1; -- the user doesn't allow local / distributed table joinn SET citus.local_table_join_policy TO 'never'; @@ -112,6 +121,34 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +-- partitioned tables should work as well +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10; +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10) + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key); +DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((local_table_join.distributed_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) JOIN local_table_join.reference_table USING (key)) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); DEBUG: Wrapping local relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true OFFSET 0 @@ -695,6 +732,18 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 0 (1 row) +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key) + JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; +DEBUG: Wrapping local relation "distributed_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: Wrapping local relation "reference_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value FROM local_table_join.distributed_partitioned_table WHERE ((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 10)) OFFSET 0 +DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.reference_table WHERE (key OPERATOR(pg_catalog.=) 10) OFFSET 0 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) distributed_partitioned_table JOIN local_table_join.postgres_table USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) reference_table USING (key)) JOIN local_table_join.citus_local USING (key)) WHERE ((distributed_partitioned_table.key OPERATOR(pg_catalog.>) 10) AND (distributed_partitioned_table.key OPERATOR(pg_catalog.=) 10)) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- update UPDATE distributed_table_windex @@ -732,4 +781,4 @@ SELECT master_remove_node('localhost', :master_port); \set VERBOSITY terse DROP SCHEMA local_table_join CASCADE; -NOTICE: drop cascades to 6 other objects +NOTICE: drop cascades to 7 other objects diff --git a/src/test/regress/sql/local_table_join.sql b/src/test/regress/sql/local_table_join.sql index 7805e9dda..517a529cf 100644 --- a/src/test/regress/sql/local_table_join.sql +++ b/src/test/regress/sql/local_table_join.sql @@ -10,10 +10,16 @@ SELECT create_distributed_table('distributed_table', 'key'); CREATE TABLE distributed_table_pkey (key int primary key, value text, value_2 jsonb); SELECT create_distributed_table('distributed_table_pkey', 'key'); -CREATE TABLE distributed_table_windex (key int, value text, value_2 jsonb); +CREATE TABLE distributed_table_windex (key int primary key, value text, value_2 jsonb); SELECT create_distributed_table('distributed_table_windex', 'key'); CREATE UNIQUE INDEX key_index ON distributed_table_windex (key); +CREATE TABLE distributed_partitioned_table(key int, value text) PARTITION BY RANGE (key); +CREATE TABLE distributed_partitioned_table_1 PARTITION OF distributed_partitioned_table FOR VALUES FROM (0) TO (10); +CREATE TABLE distributed_partitioned_table_2 PARTITION OF distributed_partitioned_table FOR VALUES FROM (10) TO (20); +SELECT create_distributed_table('distributed_partitioned_table', 'key'); + + SET client_min_messages TO DEBUG1; @@ -33,7 +39,6 @@ SET citus.local_table_join_policy TO 'prefer-distributed'; SELECT count(*) FROM postgres_table JOIN distributed_table USING(key); SELECT count(*) FROM postgres_table JOIN reference_table USING(key); - -- update/delete -- auto tests @@ -46,6 +51,11 @@ SELECT count(*) FROM distributed_table JOIN postgres_table USING(key); SELECT count(*) FROM reference_table JOIN postgres_table USING(key); SELECT count(*) FROM distributed_table JOIN postgres_table USING(key) JOIN reference_table USING (key); +-- partitioned tables should work as well +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key); +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) WHERE distributed_partitioned_table.key = 10; +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key); + -- a unique index on key so dist table should be recursively planned SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key); SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(value); @@ -297,6 +307,9 @@ SELECT count(*) FROM citus_local JOIN distributed_table ON distributed_table.key SELECT count(*) FROM citus_local JOIN distributed_table USING(key) JOIN postgres_table USING (key) JOIN reference_table USING(key); +SELECT count(*) FROM distributed_partitioned_table JOIN postgres_table USING(key) JOIN reference_table USING (key) + JOIN citus_local USING(key) WHERE distributed_partitioned_table.key > 10 and distributed_partitioned_table.key = 10; + -- update UPDATE distributed_table_windex