diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index dbfd7b54c..a9865a132 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -995,8 +995,8 @@ HasUnsupportedJoinWalker(Node *node, void *context) static bool ErrorHintRequired(const char *errorHint, Query *queryTree) { - List *rangeTableList = NIL; - ListCell *rangeTableCell = NULL; + List *distributedRelationIdList = DistributedRelationIdList(queryTree); + ListCell *relationIdCell = NULL; List *colocationIdList = NIL; if (errorHint == NULL) @@ -1004,11 +1004,9 @@ ErrorHintRequired(const char *errorHint, Query *queryTree) return false; } - ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList); - foreach(rangeTableCell, rangeTableList) + foreach(relationIdCell, distributedRelationIdList) { - RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableCell); - Oid relationId = rte->relid; + Oid relationId = lfirst_oid(relationIdCell); char partitionMethod = PartitionMethod(relationId); if (partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 6e1f126ba..aa50b3e01 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2216,7 +2216,7 @@ static void ErrorIfUnsupportedShardDistribution(Query *query) { Oid firstTableRelationId = InvalidOid; - List *relationIdList = RelationIdList(query); + List *relationIdList = DistributedRelationIdList(query); List *nonReferenceRelations = NIL; ListCell *relationIdCell = NULL; uint32 relationIndex = 0; diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 30fc7c9ed..a4adb248d 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1686,7 +1686,7 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) * RelationIdList returns list of unique relation ids in query tree. */ List * -RelationIdList(Query *query) +DistributedRelationIdList(Query *query) { List *rangeTableList = NIL; List *tableEntryList = NIL; @@ -1701,6 +1701,11 @@ RelationIdList(Query *query) TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell); Oid relationId = tableEntry->relationId; + if (!IsDistributedTable(relationId)) + { + continue; + } + relationIdList = list_append_unique_oid(relationIdList, relationId); } diff --git a/src/backend/distributed/utils/query_utils.c b/src/backend/distributed/utils/query_utils.c index 7db7c6edc..f4741e36c 100644 --- a/src/backend/distributed/utils/query_utils.c +++ b/src/backend/distributed/utils/query_utils.c @@ -17,6 +17,10 @@ #include "distributed/version_compat.h" #include "nodes/nodeFuncs.h" + +static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry); + + /* * ExtractRangeTableList walks over a tree to gather entries. * Execution is parameterized by passing walkerMode flag via ExtractRangeTableWalkerContext @@ -40,12 +44,9 @@ ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context) { RangeTblEntry *rangeTable = (RangeTblEntry *) node; - /* make sure that we are extracting only relation entries if walkerMode is set to EXTRACT_RELATION_ENTRIES*/ - if (walkerMode == EXTRACT_ALL_ENTRIES || (walkerMode == - EXTRACT_RELATION_ENTRIES && - rangeTable->rtekind == RTE_RELATION && - rangeTable->relkind == - RELKIND_RELATION)) + if (walkerMode == EXTRACT_ALL_ENTRIES || + (walkerMode == EXTRACT_RELATION_ENTRIES && + CitusQueryableRangeTableRelation(rangeTable))) { (*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTable); } @@ -81,9 +82,43 @@ ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context) } +/* + * CitusQueryableRangeTableRelation returns true if the input range table + * entry is a relation and it can be used in a distributed query, including + * local tables and materialized views as well. + */ +static bool +CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry) +{ + char relationKind = '\0'; + + if (rangeTableEntry->rtekind != RTE_RELATION) + { + /* we're only interested in relations */ + return false; + } + + relationKind = rangeTableEntry->relkind; + if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE || + relationKind == RELKIND_FOREIGN_TABLE || relationKind == RELKIND_MATVIEW) + { + /* + * RELKIND_VIEW are automatically replaced with a subquery in + * the query tree, so we ignore them here. + * + * RELKIND_MATVIEW is equivalent of a local table in postgres. + */ + return true; + } + + return false; +} + + /* * ExtractRangeTableRelationWalker gathers all range table relation entries - * in a query. + * in a query. The caller is responsible for checking whether the returned + * entries are distributed or not. */ bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList) diff --git a/src/include/distributed/query_utils.h b/src/include/distributed/query_utils.h index 4dce5adfa..5e4a55bf7 100644 --- a/src/include/distributed/query_utils.h +++ b/src/include/distributed/query_utils.h @@ -17,7 +17,7 @@ /* Enum to define execution flow of ExtractRangeTableList */ typedef enum ExtractRangeTableMode { - EXTRACT_RELATION_ENTRIES, + EXTRACT_RELATION_ENTRIES, /* inclduding local, foreign and partitioned tables */ EXTRACT_ALL_ENTRIES } ExtractRangeTableMode; diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 3c4b0fb0a..742b1dbd3 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -31,7 +31,7 @@ extern List * GenerateAllAttributeEquivalences(PlannerRestrictionContext * plannerRestrictionContext); extern uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext); -extern List * RelationIdList(Query *query); +extern List * DistributedRelationIdList(Query *query); extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( PlannerRestrictionContext *plannerRestrictionContext, Query *query); diff --git a/src/test/regress/expected/materialized_view.out b/src/test/regress/expected/materialized_view.out index d7a506408..4be1c304e 100644 --- a/src/test/regress/expected/materialized_view.out +++ b/src/test/regress/expected/materialized_view.out @@ -319,6 +319,19 @@ INSERT INTO small VALUES(14, 14); -- delete statement with CTE WITH all_small_view_ids AS (SELECT id FROM small_view) DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids); +-- make sure that materialized view in a CTE/subquery can be joined with a distributed table +WITH cte AS (SELECT *, random() FROM small_view) SELECT count(*) FROM cte JOIN small USING(id); + count +------- + 4 +(1 row) + +SELECT count(*) FROM (SELECT *, random() FROM small_view) as subquery JOIN small USING(id); + count +------- + 4 +(1 row) + DROP TABLE large_partitioned; DROP TABLE small CASCADE; NOTICE: drop cascades to materialized view small_view diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index bc69ca309..69a5ca51a 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -1054,8 +1054,44 @@ DEBUG: Plan is router executable DEBUG: generating subplan 109_6 for subquery SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint) INTERSECT SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_5'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint) DEBUG: Router planner cannot handle multi-shard select queries ERROR: cannot pushdown the subquery +-- make sure that non-colocated subquery joins work fine in +-- modifications +CREATE TABLE table1 (id int, tenant_id int); +CREATE VIEW table1_view AS SELECT * from table1 where id < 100; +CREATE TABLE table2 (id int, tenant_id int) partition by range(tenant_id); +CREATE TABLE table2_p1 PARTITION OF table2 FOR VALUES FROM (1) TO (10); +-- modifications on the partitons are only allowed with rep=1 +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('table2','tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('table1','tenant_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- all of the above queries are non-colocated subquery joins +-- because the views are replaced with subqueries +UPDATE table2 SET id=20 FROM table1_view WHERE table1_view.id=table2.id; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan 117_1 for subquery SELECT table1.id, table1.tenant_id FROM non_colocated_subquery.table1 WHERE (table1.id OPERATOR(pg_catalog.<) 100) +DEBUG: Plan 117 query after replacing subqueries and CTEs: UPDATE non_colocated_subquery.table2 SET id = 20 FROM (SELECT intermediate_result.id, intermediate_result.tenant_id FROM read_intermediate_result('117_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, tenant_id integer)) table1_view WHERE (table1_view.id OPERATOR(pg_catalog.=) table2.id) +DEBUG: Creating router plan +DEBUG: Plan is router executable +UPDATE table2_p1 SET id=20 FROM table1_view WHERE table1_view.id=table2_p1.id; +DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan 119_1 for subquery SELECT table1.id, table1.tenant_id FROM non_colocated_subquery.table1 WHERE (table1.id OPERATOR(pg_catalog.<) 100) +DEBUG: Plan 119 query after replacing subqueries and CTEs: UPDATE non_colocated_subquery.table2_p1 SET id = 20 FROM (SELECT intermediate_result.id, intermediate_result.tenant_id FROM read_intermediate_result('119_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, tenant_id integer)) table1_view WHERE (table1_view.id OPERATOR(pg_catalog.=) table2_p1.id) +DEBUG: Creating router plan +DEBUG: Plan is router executable RESET client_min_messages; DROP FUNCTION explain_json_2(text); SET search_path TO 'public'; DROP SCHEMA non_colocated_subquery CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 5 other objects diff --git a/src/test/regress/sql/materialized_view.sql b/src/test/regress/sql/materialized_view.sql index 87d0288e3..4578f6086 100644 --- a/src/test/regress/sql/materialized_view.sql +++ b/src/test/regress/sql/materialized_view.sql @@ -246,5 +246,9 @@ INSERT INTO small VALUES(14, 14); WITH all_small_view_ids AS (SELECT id FROM small_view) DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids); +-- make sure that materialized view in a CTE/subquery can be joined with a distributed table +WITH cte AS (SELECT *, random() FROM small_view) SELECT count(*) FROM cte JOIN small USING(id); +SELECT count(*) FROM (SELECT *, random() FROM small_view) as subquery JOIN small USING(id); + DROP TABLE large_partitioned; DROP TABLE small CASCADE; diff --git a/src/test/regress/sql/non_colocated_subquery_joins.sql b/src/test/regress/sql/non_colocated_subquery_joins.sql index 03e233395..6ac5e228a 100644 --- a/src/test/regress/sql/non_colocated_subquery_joins.sql +++ b/src/test/regress/sql/non_colocated_subquery_joins.sql @@ -782,6 +782,24 @@ SELECT count(*) FROM events_table WHERE user_id NOT IN ); +-- make sure that non-colocated subquery joins work fine in +-- modifications +CREATE TABLE table1 (id int, tenant_id int); +CREATE VIEW table1_view AS SELECT * from table1 where id < 100; +CREATE TABLE table2 (id int, tenant_id int) partition by range(tenant_id); +CREATE TABLE table2_p1 PARTITION OF table2 FOR VALUES FROM (1) TO (10); + +-- modifications on the partitons are only allowed with rep=1 +SET citus.shard_replication_factor TO 1; + +SELECT create_distributed_table('table2','tenant_id'); +SELECT create_distributed_table('table1','tenant_id'); + +-- all of the above queries are non-colocated subquery joins +-- because the views are replaced with subqueries +UPDATE table2 SET id=20 FROM table1_view WHERE table1_view.id=table2.id; +UPDATE table2_p1 SET id=20 FROM table1_view WHERE table1_view.id=table2_p1.id; + RESET client_min_messages; DROP FUNCTION explain_json_2(text);