From 862dae823e941276a4cb0b0a7bd2c597b1e17b13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Tue, 25 Jul 2023 16:20:13 +0300 Subject: [PATCH] Expand EnableNonColocatedRouterQueryPushdown to cover shard colocation (e.g., shard index) (#7076) Previously, we only checked whether the relations are colocated, but we ignore the shard indexes. That causes certain queries still to be accidentally router. We should enforce colocation checks for both shard index and table colocation id to make the check restrictive enough. For example, the following query should not be router, and after this patch, it won't: ```SQL SELECT user_id FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; ``` DESCRIPTION: Enforce shard level colocation with citus.enable_non_colocated_router_query_pushdown --- .../distributed/metadata/metadata_cache.c | 46 ++++++++ .../planner/multi_router_planner.c | 105 ++++++++++++++++-- src/include/distributed/metadata_cache.h | 1 + .../insert_select_single_shard_table.out | 2 +- .../local_shard_execution_replicated.out | 3 +- .../local_shard_execution_replicated_0.out | 3 +- .../regress/expected/multi_insert_select.out | 12 ++ .../expected/multi_insert_select_0.out | 12 ++ .../expected/multi_mx_router_planner.out | 4 +- .../regress/expected/multi_router_planner.out | 10 +- .../multi_router_planner_fast_path.out | 2 +- .../expected/query_single_shard_table.out | 64 +++++------ src/test/regress/expected/set_operations.out | 4 +- 13 files changed, 214 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index d4aea5f90..196195b7d 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -447,6 +447,52 @@ IsCitusTableType(Oid relationId, CitusTableType tableType) } +/* + * GetCitusTableType is a helper function that returns the CitusTableType + * for the given relationId. + * Note that a single table can be qualified as multiple CitusTableType, such + * as hash distributed tables are both HASH_DISTRIBUTED and DISTRIBUTED_TABLE. + * This function returns the base type for a given table. + * + * If the table is not a Citus table, ANY_CITUS_TABLE_TYPE is returned. + */ +CitusTableType +GetCitusTableType(CitusTableCacheEntry *tableEntry) +{ + /* we do not expect local tables here */ + Assert(tableEntry != NULL); + + if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED)) + { + return HASH_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, SINGLE_SHARD_DISTRIBUTED)) + { + return SINGLE_SHARD_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) + { + return REFERENCE_TABLE; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE)) + { + return CITUS_LOCAL_TABLE; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, APPEND_DISTRIBUTED)) + { + return APPEND_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) + { + return RANGE_DISTRIBUTED; + } + else + { + return ANY_CITUS_TABLE_TYPE; + } +} + + /* * IsCitusTableTypeCacheEntry returns true if the given table cache entry * belongs to a citus table that matches the given table type. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6ad51e0ae..27027e064 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -155,6 +155,7 @@ static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *query static DeferredErrorMessage * ErrorIfQueryHasCTEWithSearchClause(Query *queryTree); static bool ContainsSearchClauseWalker(Node *node, void *context); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); +static bool AllShardsColocated(List *relationShardList); static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation); static ShardPlacement * CreateLocalDummyPlacement(); static int CompareInsertValuesByShardId(const void *leftElement, @@ -2392,6 +2393,15 @@ PlanRouterQuery(Query *originalQuery, RelationShardListForShardIntervalList(*prunedShardIntervalListList, &shardsPresent); + if (!EnableNonColocatedRouterQueryPushdown && + !AllShardsColocated(*relationShardList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "router planner does not support queries that " + "reference non-colocated distributed tables", + NULL, NULL); + } + if (!shardsPresent && !replacePrunedQueryWithDummy) { /* @@ -2460,6 +2470,92 @@ PlanRouterQuery(Query *originalQuery, } +/* + * AllShardsColocated returns true if all the shards in the given relationShardList + * have colocated tables and are on the same shard index. + */ +static bool +AllShardsColocated(List *relationShardList) +{ + RelationShard *relationShard = NULL; + int shardIndex = -1; + int colocationId = -1; + CitusTableType tableType = ANY_CITUS_TABLE_TYPE; + + foreach_ptr(relationShard, relationShardList) + { + Oid relationId = relationShard->relationId; + uint64 shardId = relationShard->shardId; + if (shardId == INVALID_SHARD_ID) + { + /* intermediate results are always colocated, so ignore */ + continue; + } + + CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); + if (tableEntry == NULL) + { + /* local tables never colocated */ + return false; + } + + CitusTableType currentTableType = GetCitusTableType(tableEntry); + if (currentTableType == REFERENCE_TABLE) + { + /* + * Reference tables are always colocated so it is + * safe to skip them. + */ + continue; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, DISTRIBUTED_TABLE)) + { + if (tableType == ANY_CITUS_TABLE_TYPE) + { + tableType = currentTableType; + } + else if (tableType != currentTableType) + { + /* + * We cannot qualify different types of distributed tables + * as colocated. + */ + return false; + } + + if (currentTableType == RANGE_DISTRIBUTED || + currentTableType == APPEND_DISTRIBUTED) + { + /* we do not have further strict colocation chceks */ + continue; + } + } + + int currentColocationId = TableColocationId(relationId); + if (colocationId == -1) + { + colocationId = currentColocationId; + } + else if (colocationId != currentColocationId) + { + return false; + } + + int currentIndex = ShardIndex(LoadShardInterval(shardId)); + if (shardIndex == -1) + { + shardIndex = currentIndex; + } + else if (shardIndex != currentIndex) + { + return false; + } + } + + return true; +} + + /* * ContainsOnlyLocalTables returns true if there is only * local tables and not any distributed or reference table. @@ -3745,15 +3841,6 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) NULL, NULL); } - if (!EnableNonColocatedRouterQueryPushdown && - !AllDistributedRelationsInListColocated(distributedRelationList)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "router planner does not support queries that " - "reference non-colocated distributed tables", - NULL, NULL); - } - DeferredErrorMessage *CTEWithSearchClauseError = ErrorIfQueryHasCTEWithSearchClause(query); if (CTEWithSearchClauseError != NULL) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f92a2a061..4e918ecf7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -139,6 +139,7 @@ typedef enum extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); +extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry); extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType tableType); bool HasDistributionKey(Oid relationId); diff --git a/src/test/regress/expected/insert_select_single_shard_table.out b/src/test/regress/expected/insert_select_single_shard_table.out index 8dbb1cf9a..3d6e8f155 100644 --- a/src/test/regress/expected/insert_select_single_shard_table.out +++ b/src/test/regress/expected/insert_select_single_shard_table.out @@ -645,7 +645,7 @@ JOIN ( ) t2 ON t1.b = t2.b WHERE t2.rn > 2; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT b, row_number() OVER (ORDER BY b DESC) AS rn FROM insert_select_single_shard_table.distributed_table_c2_t1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 JOIN (SELECT q.rn, q.b FROM (SELECT intermediate_result.b, intermediate_result.rn FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer, rn bigint)) q) t2 ON ((t1.b OPERATOR(pg_catalog.=) t2.b))) WHERE (t2.rn OPERATOR(pg_catalog.>) 2) diff --git a/src/test/regress/expected/local_shard_execution_replicated.out b/src/test/regress/expected/local_shard_execution_replicated.out index 07da961c2..bd8690bad 100644 --- a/src/test/regress/expected/local_shard_execution_replicated.out +++ b/src/test/regress/expected/local_shard_execution_replicated.out @@ -1016,7 +1016,8 @@ WHERE distributed_table.value = all_data.value AND distributed_table.key = 1 ORDER BY 1 DESC; -NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT second_distributed_table.key, second_distributed_table.value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (second_distributed_table.key OPERATOR(pg_catalog.=) 2)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC +NOTICE: executing the command locally: SELECT key, value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (key OPERATOR(pg_catalog.=) 2) +NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC key --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/local_shard_execution_replicated_0.out b/src/test/regress/expected/local_shard_execution_replicated_0.out index c913bf628..67016d96b 100644 --- a/src/test/regress/expected/local_shard_execution_replicated_0.out +++ b/src/test/regress/expected/local_shard_execution_replicated_0.out @@ -1016,7 +1016,8 @@ WHERE distributed_table.value = all_data.value AND distributed_table.key = 1 ORDER BY 1 DESC; -NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT second_distributed_table.key, second_distributed_table.value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (second_distributed_table.key OPERATOR(pg_catalog.=) 2)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC +NOTICE: executing the command locally: SELECT key, value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (key OPERATOR(pg_catalog.=) 2) +NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC key --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index ac339a620..a5340a89d 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -741,6 +741,18 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 15 +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM multi_insert_select.raw_events_first WHERE (user_id OPERATOR(pg_catalog.=) 15) +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 17 +DEBUG: generating subplan XXX_2 for subquery SELECT user_id FROM multi_insert_select.raw_events_second WHERE (user_id OPERATOR(pg_catalog.=) 17) +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) EXCEPT SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- some supported LEFT joins diff --git a/src/test/regress/expected/multi_insert_select_0.out b/src/test/regress/expected/multi_insert_select_0.out index a4988bceb..a0b5a9ada 100644 --- a/src/test/regress/expected/multi_insert_select_0.out +++ b/src/test/regress/expected/multi_insert_select_0.out @@ -741,6 +741,18 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 15 +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM multi_insert_select.raw_events_first WHERE (user_id OPERATOR(pg_catalog.=) 15) +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 17 +DEBUG: generating subplan XXX_2 for subquery SELECT user_id FROM multi_insert_select.raw_events_second WHERE (user_id OPERATOR(pg_catalog.=) 17) +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) EXCEPT SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- some supported LEFT joins diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index cba5b8181..e7855a898 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -275,7 +275,7 @@ id_title AS (SELECT id, title from articles_hash_mx WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; DEBUG: CTE id_author is going to be inlined via distributed planning DEBUG: CTE id_title is going to be inlined via distributed planning -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Creating router plan DEBUG: query has a single distribution column value: 2 DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM public.articles_hash_mx WHERE (author_id OPERATOR(pg_catalog.=) 2) @@ -385,7 +385,7 @@ WITH RECURSIVE hierarchy as ( h.company_id = ce.company_id AND ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: recursive CTEs are not supported in distributed queries -- grouping sets are supported on single shard SELECT diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index edfc728db..c6d46ccc9 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -357,7 +357,7 @@ DEBUG: Creating router plan WITH id_author AS MATERIALIZED ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), id_title AS MATERIALIZED (SELECT id, title from articles_hash WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE id_author: SELECT id, author_id FROM multi_router_planner.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 1) DEBUG: Creating router plan DEBUG: query has a single distribution column value: 1 @@ -450,7 +450,7 @@ WITH RECURSIVE hierarchy as MATERIALIZED ( h.company_id = ce.company_id AND ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: recursive CTEs are not supported in distributed queries -- Test router modifying CTEs WITH new_article AS MATERIALIZED( @@ -1505,7 +1505,7 @@ SET citus.enable_non_colocated_router_query_pushdown TO OFF; SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and false; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) @@ -1599,7 +1599,7 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and int4eq(1, 2); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) @@ -1637,7 +1637,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and date_ne_timestamp('1954-04-11', '1954-04-11'::timestamp); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 474d4a107..3436971af 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -220,7 +220,7 @@ id_title AS (SELECT id, title from articles_hash WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; DEBUG: CTE id_author is going to be inlined via distributed planning DEBUG: CTE id_title is going to be inlined via distributed planning -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: query has a single distribution column value: 2 diff --git a/src/test/regress/expected/query_single_shard_table.out b/src/test/regress/expected/query_single_shard_table.out index 5716c570d..13083d992 100644 --- a/src/test/regress/expected/query_single_shard_table.out +++ b/src/test/regress/expected/query_single_shard_table.out @@ -293,7 +293,7 @@ DEBUG: Creating router plan -- cartesian product with different table types -- with other table types SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries ERROR: cannot perform distributed planning on this query DETAIL: Cartesian products are currently unsupported SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; @@ -552,14 +552,14 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b IN ( SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c2_t2 and nullkey_c1_t1 are not colocated SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b NOT IN ( SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c2_t2 and nullkey_c1_t1 are not colocated -- join with a reference table @@ -1101,21 +1101,21 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 LEFT JOIN LATERAL ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ) q USING(a); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE EXISTS ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE NOT EXISTS ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 @@ -1136,14 +1136,14 @@ SELECT COUNT(*) FROM distributed_table t1 LEFT JOIN LATERAL ( SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ) q USING(a); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM distributed_table t1 WHERE EXISTS ( SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM distributed_table t1 @@ -1186,14 +1186,14 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b IN ( SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b NOT IN ( SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM nullkey_c1_t1 t1 @@ -1261,14 +1261,14 @@ SELECT COUNT(*) FROM citus_local_table t1 WHERE t1.b IN ( SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM citus_local_table t1 WHERE t1.b NOT IN ( SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM citus_local_table t1 @@ -1515,14 +1515,14 @@ WITH level_0 AS ( SELECT COUNT(*) FROM level_0; DEBUG: CTE level_0 is going to be inlined via distributed planning DEBUG: CTE level_1 is going to be inlined via distributed planning -DEBUG: router planner does not support queries that reference non-colocated distributed tables -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE level_0: WITH level_1 AS (WITH RECURSIVE level_2_recursive(x) AS (VALUES (1) UNION ALL SELECT (nullkey_c1_t1.a OPERATOR(pg_catalog.+) 1) FROM (query_single_shard_table.nullkey_c1_t1 JOIN level_2_recursive level_2_recursive_1 ON ((nullkey_c1_t1.a OPERATOR(pg_catalog.=) level_2_recursive_1.x))) WHERE (nullkey_c1_t1.a OPERATOR(pg_catalog.<) 100)) SELECT level_2_recursive.x, distributed_table.a, distributed_table.b FROM (level_2_recursive JOIN query_single_shard_table.distributed_table ON ((level_2_recursive.x OPERATOR(pg_catalog.=) distributed_table.a)))) SELECT x, a, b FROM level_1 DEBUG: CTE level_1 is going to be inlined via distributed planning -DEBUG: router planner does not support queries that reference non-colocated distributed tables -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE level_1: WITH RECURSIVE level_2_recursive(x) AS (VALUES (1) UNION ALL SELECT (nullkey_c1_t1.a OPERATOR(pg_catalog.+) 1) FROM (query_single_shard_table.nullkey_c1_t1 JOIN level_2_recursive level_2_recursive_1 ON ((nullkey_c1_t1.a OPERATOR(pg_catalog.=) level_2_recursive_1.x))) WHERE (nullkey_c1_t1.a OPERATOR(pg_catalog.<) 100)) SELECT level_2_recursive.x, distributed_table.a, distributed_table.b FROM (level_2_recursive JOIN query_single_shard_table.distributed_table ON ((level_2_recursive.x OPERATOR(pg_catalog.=) distributed_table.a))) -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries ERROR: recursive CTEs are not supported in distributed queries -- grouping set SELECT @@ -1583,7 +1583,7 @@ DEBUG: Creating router plan SELECT COUNT(*), b FROM nullkey_c1_t1 GROUP BY 2 HAVING (SELECT COUNT(*) FROM nullkey_c2_t1) > 0 ORDER BY 1,2; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM query_single_shard_table.nullkey_c2_t1 @@ -2125,8 +2125,8 @@ ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on di UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; DEBUG: Creating router plan UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; -DEBUG: found no worker with all shard placements -ERROR: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables +ERROR: router planner does not support queries that reference non-colocated distributed tables UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; DEBUG: Creating router plan UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; @@ -2177,8 +2177,8 @@ DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; -DEBUG: found no worker with all shard placements -ERROR: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables +ERROR: router planner does not support queries that reference non-colocated distributed tables DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; @@ -2506,7 +2506,7 @@ WITH cte AS ( DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * ) SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE cte: DELETE FROM query_single_shard_table.nullkey_c1_t1 WHERE (a OPERATOR(pg_catalog.=) 1) RETURNING a, b DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -2770,7 +2770,7 @@ WITH cte1 AS ( ) UPDATE non_colocated_users_table dt SET value = cte1.value_1 FROM cte1 WHERE cte1.user_id = dt.id AND dt.id = 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM query_single_shard_table.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -2784,7 +2784,7 @@ WITH cte1 AS MATERIALIZED ( ) UPDATE non_colocated_users_table dt SET value = cte1.value_1 + cte2.event_type FROM cte1, cte2 WHERE cte1.user_id = dt.id AND dt.id = 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM query_single_shard_table.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -3034,21 +3034,21 @@ DEBUG: Creating router plan SELECT event_type, (SELECT time FROM users_table WHERE user_id = e.user_id ORDER BY time LIMIT 1) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 1 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated SELECT event_type, (SELECT max(time) FROM users_table WHERE user_id = e.value_2) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 1 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated SELECT event_type, (SELECT max(time) FROM users_table) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT max("time") AS max FROM query_single_shard_table.users_table @@ -3125,7 +3125,7 @@ SELECT sum(e.user_id) + (SELECT max(value_3) FROM users_table WHERE user_id = e. FROM non_colocated_events_table e GROUP BY e.user_id ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down subquery on the target list DETAIL: Subqueries in the SELECT part of the query can only be pushed down if they happen before aggregates and window functions SELECT e.user_id, sum((SELECT any_value(value_3) FROM users_reference_table WHERE user_id = e.user_id GROUP BY user_id)) OVER (PARTITION BY e.user_id) @@ -3143,7 +3143,7 @@ SELECT (SELECT (SELECT e.user_id + user_id) FROM users_table WHERE user_id = e.u FROM non_colocated_events_table e GROUP BY 1 ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 3 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated @@ -3194,7 +3194,7 @@ SELECT (SELECT value_2 FROM view_1 WHERE user_id = e.user_id GROUP BY value_2) FROM non_colocated_events_table e GROUP BY 1 ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 3 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated @@ -3208,7 +3208,7 @@ GROUP BY user_id (SELECT sum(user_id) FROM users_table WHERE user_id = u1.user_id GROUP BY user_id) FROM users_table u1 GROUP BY user_id) as foo) ORDER BY 1 DESC; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM (SELECT (SELECT sum(users_table.user_id) AS sum FROM query_single_shard_table.users_table WHERE (users_table.user_id OPERATOR(pg_catalog.=) u1.user_id) GROUP BY users_table.user_id) AS sum FROM query_single_shard_table.users_table u1 GROUP BY u1.user_id) foo DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, count(*) AS count FROM query_single_shard_table.non_colocated_events_table e1 GROUP BY user_id HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))) ORDER BY user_id DESC diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index a0dad36a8..f2e0616e7 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -1119,7 +1119,7 @@ DEBUG: Creating router plan -- queries on non-colocated tables that would push down if they were not colocated are recursivelu planned SELECT * FROM (SELECT * FROM test UNION SELECT * FROM test_not_colocated) u ORDER BY 1,2; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test DEBUG: Router planner cannot handle multi-shard select queries @@ -1135,7 +1135,7 @@ DEBUG: Creating router plan (2 rows) SELECT * FROM (SELECT * FROM test UNION ALL SELECT * FROM test_not_colocated) u ORDER BY 1,2; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test DEBUG: Router planner cannot handle multi-shard select queries