diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index d4aea5f90..196195b7d 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -447,6 +447,52 @@ IsCitusTableType(Oid relationId, CitusTableType tableType) } +/* + * GetCitusTableType is a helper function that returns the CitusTableType + * for the given relationId. + * Note that a single table can be qualified as multiple CitusTableType, such + * as hash distributed tables are both HASH_DISTRIBUTED and DISTRIBUTED_TABLE. + * This function returns the base type for a given table. + * + * If the table is not a Citus table, ANY_CITUS_TABLE_TYPE is returned. + */ +CitusTableType +GetCitusTableType(CitusTableCacheEntry *tableEntry) +{ + /* we do not expect local tables here */ + Assert(tableEntry != NULL); + + if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED)) + { + return HASH_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, SINGLE_SHARD_DISTRIBUTED)) + { + return SINGLE_SHARD_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) + { + return REFERENCE_TABLE; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE)) + { + return CITUS_LOCAL_TABLE; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, APPEND_DISTRIBUTED)) + { + return APPEND_DISTRIBUTED; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) + { + return RANGE_DISTRIBUTED; + } + else + { + return ANY_CITUS_TABLE_TYPE; + } +} + + /* * IsCitusTableTypeCacheEntry returns true if the given table cache entry * belongs to a citus table that matches the given table type. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 6ad51e0ae..27027e064 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -155,6 +155,7 @@ static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *query static DeferredErrorMessage * ErrorIfQueryHasCTEWithSearchClause(Query *queryTree); static bool ContainsSearchClauseWalker(Node *node, void *context); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); +static bool AllShardsColocated(List *relationShardList); static ShardPlacement * CreateDummyPlacement(bool hasLocalRelation); static ShardPlacement * CreateLocalDummyPlacement(); static int CompareInsertValuesByShardId(const void *leftElement, @@ -2392,6 +2393,15 @@ PlanRouterQuery(Query *originalQuery, RelationShardListForShardIntervalList(*prunedShardIntervalListList, &shardsPresent); + if (!EnableNonColocatedRouterQueryPushdown && + !AllShardsColocated(*relationShardList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "router planner does not support queries that " + "reference non-colocated distributed tables", + NULL, NULL); + } + if (!shardsPresent && !replacePrunedQueryWithDummy) { /* @@ -2460,6 +2470,92 @@ PlanRouterQuery(Query *originalQuery, } +/* + * AllShardsColocated returns true if all the shards in the given relationShardList + * have colocated tables and are on the same shard index. + */ +static bool +AllShardsColocated(List *relationShardList) +{ + RelationShard *relationShard = NULL; + int shardIndex = -1; + int colocationId = -1; + CitusTableType tableType = ANY_CITUS_TABLE_TYPE; + + foreach_ptr(relationShard, relationShardList) + { + Oid relationId = relationShard->relationId; + uint64 shardId = relationShard->shardId; + if (shardId == INVALID_SHARD_ID) + { + /* intermediate results are always colocated, so ignore */ + continue; + } + + CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); + if (tableEntry == NULL) + { + /* local tables never colocated */ + return false; + } + + CitusTableType currentTableType = GetCitusTableType(tableEntry); + if (currentTableType == REFERENCE_TABLE) + { + /* + * Reference tables are always colocated so it is + * safe to skip them. + */ + continue; + } + else if (IsCitusTableTypeCacheEntry(tableEntry, DISTRIBUTED_TABLE)) + { + if (tableType == ANY_CITUS_TABLE_TYPE) + { + tableType = currentTableType; + } + else if (tableType != currentTableType) + { + /* + * We cannot qualify different types of distributed tables + * as colocated. + */ + return false; + } + + if (currentTableType == RANGE_DISTRIBUTED || + currentTableType == APPEND_DISTRIBUTED) + { + /* we do not have further strict colocation chceks */ + continue; + } + } + + int currentColocationId = TableColocationId(relationId); + if (colocationId == -1) + { + colocationId = currentColocationId; + } + else if (colocationId != currentColocationId) + { + return false; + } + + int currentIndex = ShardIndex(LoadShardInterval(shardId)); + if (shardIndex == -1) + { + shardIndex = currentIndex; + } + else if (shardIndex != currentIndex) + { + return false; + } + } + + return true; +} + + /* * ContainsOnlyLocalTables returns true if there is only * local tables and not any distributed or reference table. @@ -3745,15 +3841,6 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) NULL, NULL); } - if (!EnableNonColocatedRouterQueryPushdown && - !AllDistributedRelationsInListColocated(distributedRelationList)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "router planner does not support queries that " - "reference non-colocated distributed tables", - NULL, NULL); - } - DeferredErrorMessage *CTEWithSearchClauseError = ErrorIfQueryHasCTEWithSearchClause(query); if (CTEWithSearchClauseError != NULL) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f92a2a061..4e918ecf7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -139,6 +139,7 @@ typedef enum extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); +extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry); extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType tableType); bool HasDistributionKey(Oid relationId); diff --git a/src/test/regress/expected/insert_select_single_shard_table.out b/src/test/regress/expected/insert_select_single_shard_table.out index 8dbb1cf9a..3d6e8f155 100644 --- a/src/test/regress/expected/insert_select_single_shard_table.out +++ b/src/test/regress/expected/insert_select_single_shard_table.out @@ -645,7 +645,7 @@ JOIN ( ) t2 ON t1.b = t2.b WHERE t2.rn > 2; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT b, row_number() OVER (ORDER BY b DESC) AS rn FROM insert_select_single_shard_table.distributed_table_c2_t1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT t1.a, t2.b FROM (insert_select_single_shard_table.nullkey_c1_t1 t1 JOIN (SELECT q.rn, q.b FROM (SELECT intermediate_result.b, intermediate_result.rn FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(b integer, rn bigint)) q) t2 ON ((t1.b OPERATOR(pg_catalog.=) t2.b))) WHERE (t2.rn OPERATOR(pg_catalog.>) 2) diff --git a/src/test/regress/expected/local_shard_execution_replicated.out b/src/test/regress/expected/local_shard_execution_replicated.out index 07da961c2..bd8690bad 100644 --- a/src/test/regress/expected/local_shard_execution_replicated.out +++ b/src/test/regress/expected/local_shard_execution_replicated.out @@ -1016,7 +1016,8 @@ WHERE distributed_table.value = all_data.value AND distributed_table.key = 1 ORDER BY 1 DESC; -NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT second_distributed_table.key, second_distributed_table.value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (second_distributed_table.key OPERATOR(pg_catalog.=) 2)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC +NOTICE: executing the command locally: SELECT key, value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (key OPERATOR(pg_catalog.=) 2) +NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC key --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/local_shard_execution_replicated_0.out b/src/test/regress/expected/local_shard_execution_replicated_0.out index c913bf628..67016d96b 100644 --- a/src/test/regress/expected/local_shard_execution_replicated_0.out +++ b/src/test/regress/expected/local_shard_execution_replicated_0.out @@ -1016,7 +1016,8 @@ WHERE distributed_table.value = all_data.value AND distributed_table.key = 1 ORDER BY 1 DESC; -NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT second_distributed_table.key, second_distributed_table.value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (second_distributed_table.key OPERATOR(pg_catalog.=) 2)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC +NOTICE: executing the command locally: SELECT key, value FROM local_shard_execution_replicated.second_distributed_table_1500008 second_distributed_table WHERE (key OPERATOR(pg_catalog.=) 2) +NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution_replicated.distributed_table_1500001 distributed_table, (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC key --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index ac339a620..a5340a89d 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -741,6 +741,18 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 15 +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM multi_insert_select.raw_events_first WHERE (user_id OPERATOR(pg_catalog.=) 15) +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 17 +DEBUG: generating subplan XXX_2 for subquery SELECT user_id FROM multi_insert_select.raw_events_second WHERE (user_id OPERATOR(pg_catalog.=) 17) +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) EXCEPT SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- some supported LEFT joins diff --git a/src/test/regress/expected/multi_insert_select_0.out b/src/test/regress/expected/multi_insert_select_0.out index a4988bceb..a0b5a9ada 100644 --- a/src/test/regress/expected/multi_insert_select_0.out +++ b/src/test/regress/expected/multi_insert_select_0.out @@ -741,6 +741,18 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; DEBUG: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 15 +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM multi_insert_select.raw_events_first WHERE (user_id OPERATOR(pg_catalog.=) 15) +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 17 +DEBUG: generating subplan XXX_2 for subquery SELECT user_id FROM multi_insert_select.raw_events_second WHERE (user_id OPERATOR(pg_catalog.=) 17) +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) EXCEPT SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo DEBUG: Creating router plan DEBUG: Collecting INSERT ... SELECT results on coordinator -- some supported LEFT joins diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index cba5b8181..e7855a898 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -275,7 +275,7 @@ id_title AS (SELECT id, title from articles_hash_mx WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; DEBUG: CTE id_author is going to be inlined via distributed planning DEBUG: CTE id_title is going to be inlined via distributed planning -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Creating router plan DEBUG: query has a single distribution column value: 2 DEBUG: generating subplan XXX_1 for subquery SELECT id, title FROM public.articles_hash_mx WHERE (author_id OPERATOR(pg_catalog.=) 2) @@ -385,7 +385,7 @@ WITH RECURSIVE hierarchy as ( h.company_id = ce.company_id AND ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: recursive CTEs are not supported in distributed queries -- grouping sets are supported on single shard SELECT diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index edfc728db..c6d46ccc9 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -357,7 +357,7 @@ DEBUG: Creating router plan WITH id_author AS MATERIALIZED ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), id_title AS MATERIALIZED (SELECT id, title from articles_hash WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE id_author: SELECT id, author_id FROM multi_router_planner.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 1) DEBUG: Creating router plan DEBUG: query has a single distribution column value: 1 @@ -450,7 +450,7 @@ WITH RECURSIVE hierarchy as MATERIALIZED ( h.company_id = ce.company_id AND ce.company_id = 2)) SELECT * FROM hierarchy WHERE LEVEL <= 2; -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: recursive CTEs are not supported in distributed queries -- Test router modifying CTEs WITH new_article AS MATERIALIZED( @@ -1505,7 +1505,7 @@ SET citus.enable_non_colocated_router_query_pushdown TO OFF; SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and false; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) @@ -1599,7 +1599,7 @@ HINT: Set citus.enable_repartition_joins to on to enable repartitioning SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and int4eq(1, 2); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) @@ -1637,7 +1637,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and date_ne_timestamp('1954-04-11', '1954-04-11'::timestamp); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Creating router plan first_author | second_word_count --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 474d4a107..3436971af 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -220,7 +220,7 @@ id_title AS (SELECT id, title from articles_hash WHERE author_id = 2) SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; DEBUG: CTE id_author is going to be inlined via distributed planning DEBUG: CTE id_title is going to be inlined via distributed planning -DEBUG: cannot run command which targets multiple shards +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: query has a single distribution column value: 2 diff --git a/src/test/regress/expected/query_single_shard_table.out b/src/test/regress/expected/query_single_shard_table.out index 5716c570d..13083d992 100644 --- a/src/test/regress/expected/query_single_shard_table.out +++ b/src/test/regress/expected/query_single_shard_table.out @@ -293,7 +293,7 @@ DEBUG: Creating router plan -- cartesian product with different table types -- with other table types SELECT COUNT(*) FROM distributed_table d1, nullkey_c1_t1; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries ERROR: cannot perform distributed planning on this query DETAIL: Cartesian products are currently unsupported SELECT COUNT(*) FROM reference_table d1, nullkey_c1_t1; @@ -552,14 +552,14 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b IN ( SELECT b+1 FROM nullkey_c2_t2 t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c2_t2 and nullkey_c1_t1 are not colocated SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b NOT IN ( SELECT a FROM nullkey_c2_t2 t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down this subquery DETAIL: nullkey_c2_t2 and nullkey_c1_t1 are not colocated -- join with a reference table @@ -1101,21 +1101,21 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 LEFT JOIN LATERAL ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ) q USING(a); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE EXISTS ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE NOT EXISTS ( SELECT * FROM distributed_table t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM nullkey_c1_t1 t1 @@ -1136,14 +1136,14 @@ SELECT COUNT(*) FROM distributed_table t1 LEFT JOIN LATERAL ( SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ) q USING(a); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM distributed_table t1 WHERE EXISTS ( SELECT * FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns SELECT COUNT(*) FROM distributed_table t1 @@ -1186,14 +1186,14 @@ SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b IN ( SELECT b+1 FROM citus_local_table t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM nullkey_c1_t1 t1 WHERE t1.b NOT IN ( SELECT a FROM citus_local_table t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM nullkey_c1_t1 t1 @@ -1261,14 +1261,14 @@ SELECT COUNT(*) FROM citus_local_table t1 WHERE t1.b IN ( SELECT b+1 FROM nullkey_c1_t1 t2 WHERE t2.b = t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM citus_local_table t1 WHERE t1.b NOT IN ( SELECT a FROM nullkey_c1_t1 t2 WHERE t2.b > t1.a ); -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: skipping recursive planning for the subquery since it contains references to outer queries ERROR: direct joins between distributed and local tables are not supported SELECT COUNT(*) FROM citus_local_table t1 @@ -1515,14 +1515,14 @@ WITH level_0 AS ( SELECT COUNT(*) FROM level_0; DEBUG: CTE level_0 is going to be inlined via distributed planning DEBUG: CTE level_1 is going to be inlined via distributed planning -DEBUG: router planner does not support queries that reference non-colocated distributed tables -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE level_0: WITH level_1 AS (WITH RECURSIVE level_2_recursive(x) AS (VALUES (1) UNION ALL SELECT (nullkey_c1_t1.a OPERATOR(pg_catalog.+) 1) FROM (query_single_shard_table.nullkey_c1_t1 JOIN level_2_recursive level_2_recursive_1 ON ((nullkey_c1_t1.a OPERATOR(pg_catalog.=) level_2_recursive_1.x))) WHERE (nullkey_c1_t1.a OPERATOR(pg_catalog.<) 100)) SELECT level_2_recursive.x, distributed_table.a, distributed_table.b FROM (level_2_recursive JOIN query_single_shard_table.distributed_table ON ((level_2_recursive.x OPERATOR(pg_catalog.=) distributed_table.a)))) SELECT x, a, b FROM level_1 DEBUG: CTE level_1 is going to be inlined via distributed planning -DEBUG: router planner does not support queries that reference non-colocated distributed tables -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE level_1: WITH RECURSIVE level_2_recursive(x) AS (VALUES (1) UNION ALL SELECT (nullkey_c1_t1.a OPERATOR(pg_catalog.+) 1) FROM (query_single_shard_table.nullkey_c1_t1 JOIN level_2_recursive level_2_recursive_1 ON ((nullkey_c1_t1.a OPERATOR(pg_catalog.=) level_2_recursive_1.x))) WHERE (nullkey_c1_t1.a OPERATOR(pg_catalog.<) 100)) SELECT level_2_recursive.x, distributed_table.a, distributed_table.b FROM (level_2_recursive JOIN query_single_shard_table.distributed_table ON ((level_2_recursive.x OPERATOR(pg_catalog.=) distributed_table.a))) -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries ERROR: recursive CTEs are not supported in distributed queries -- grouping set SELECT @@ -1583,7 +1583,7 @@ DEBUG: Creating router plan SELECT COUNT(*), b FROM nullkey_c1_t1 GROUP BY 2 HAVING (SELECT COUNT(*) FROM nullkey_c2_t1) > 0 ORDER BY 1,2; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM query_single_shard_table.nullkey_c2_t1 @@ -2125,8 +2125,8 @@ ERROR: functions used in the WHERE/ON/WHEN clause of modification queries on di UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; DEBUG: Creating router plan UPDATE nullkey_c1_t1 SET b = 5 FROM nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; -DEBUG: found no worker with all shard placements -ERROR: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables +ERROR: router planner does not support queries that reference non-colocated distributed tables UPDATE nullkey_c1_t1 SET b = 5 FROM reference_table WHERE nullkey_c1_t1.b = reference_table.b; DEBUG: Creating router plan UPDATE nullkey_c1_t1 SET b = 5 FROM distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; @@ -2177,8 +2177,8 @@ DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING nullkey_c1_t2 WHERE nullkey_c1_t1.b = nullkey_c1_t2.b; DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING nullkey_c2_t1 WHERE nullkey_c1_t1.b = nullkey_c2_t1.b; -DEBUG: found no worker with all shard placements -ERROR: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables +ERROR: router planner does not support queries that reference non-colocated distributed tables DELETE FROM nullkey_c1_t1 USING reference_table WHERE nullkey_c1_t1.b = reference_table.b; DEBUG: Creating router plan DELETE FROM nullkey_c1_t1 USING distributed_table WHERE nullkey_c1_t1.b = distributed_table.b; @@ -2506,7 +2506,7 @@ WITH cte AS ( DELETE FROM nullkey_c1_t1 WHERE a = 1 RETURNING * ) SELECT * FROM distributed_table WHERE a IN (SELECT a FROM cte); -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE cte: DELETE FROM query_single_shard_table.nullkey_c1_t1 WHERE (a OPERATOR(pg_catalog.=) 1) RETURNING a, b DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -2770,7 +2770,7 @@ WITH cte1 AS ( ) UPDATE non_colocated_users_table dt SET value = cte1.value_1 FROM cte1 WHERE cte1.user_id = dt.id AND dt.id = 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM query_single_shard_table.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -2784,7 +2784,7 @@ WITH cte1 AS MATERIALIZED ( ) UPDATE non_colocated_users_table dt SET value = cte1.value_1 + cte2.event_type FROM cte1, cte2 WHERE cte1.user_id = dt.id AND dt.id = 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: generating subplan XXX_1 for CTE cte1: SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM query_single_shard_table.users_table WHERE (user_id OPERATOR(pg_catalog.=) 1) DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan @@ -3034,21 +3034,21 @@ DEBUG: Creating router plan SELECT event_type, (SELECT time FROM users_table WHERE user_id = e.user_id ORDER BY time LIMIT 1) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 1 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated SELECT event_type, (SELECT max(time) FROM users_table WHERE user_id = e.value_2) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 1 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated SELECT event_type, (SELECT max(time) FROM users_table) FROM non_colocated_events_table e ORDER BY 1,2 LIMIT 1; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT max("time") AS max FROM query_single_shard_table.users_table @@ -3125,7 +3125,7 @@ SELECT sum(e.user_id) + (SELECT max(value_3) FROM users_table WHERE user_id = e. FROM non_colocated_events_table e GROUP BY e.user_id ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables ERROR: cannot push down subquery on the target list DETAIL: Subqueries in the SELECT part of the query can only be pushed down if they happen before aggregates and window functions SELECT e.user_id, sum((SELECT any_value(value_3) FROM users_reference_table WHERE user_id = e.user_id GROUP BY user_id)) OVER (PARTITION BY e.user_id) @@ -3143,7 +3143,7 @@ SELECT (SELECT (SELECT e.user_id + user_id) FROM users_table WHERE user_id = e.u FROM non_colocated_events_table e GROUP BY 1 ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 3 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated @@ -3194,7 +3194,7 @@ SELECT (SELECT value_2 FROM view_1 WHERE user_id = e.user_id GROUP BY value_2) FROM non_colocated_events_table e GROUP BY 1 ORDER BY 1 LIMIT 3; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: push down of limit count: 3 ERROR: cannot push down this subquery DETAIL: users_table and non_colocated_events_table are not colocated @@ -3208,7 +3208,7 @@ GROUP BY user_id (SELECT sum(user_id) FROM users_table WHERE user_id = u1.user_id GROUP BY user_id) FROM users_table u1 GROUP BY user_id) as foo) ORDER BY 1 DESC; -DEBUG: found no worker with all shard placements +DEBUG: router planner does not support queries that reference non-colocated distributed tables DEBUG: Creating router plan DEBUG: generating subplan XXX_1 for subquery SELECT count(*) AS count FROM (SELECT (SELECT sum(users_table.user_id) AS sum FROM query_single_shard_table.users_table WHERE (users_table.user_id OPERATOR(pg_catalog.=) u1.user_id) GROUP BY users_table.user_id) AS sum FROM query_single_shard_table.users_table u1 GROUP BY u1.user_id) foo DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, count(*) AS count FROM query_single_shard_table.non_colocated_events_table e1 GROUP BY user_id HAVING (count(*) OPERATOR(pg_catalog.>) (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint))) ORDER BY user_id DESC diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index a0dad36a8..f2e0616e7 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -1119,7 +1119,7 @@ DEBUG: Creating router plan -- queries on non-colocated tables that would push down if they were not colocated are recursivelu planned SELECT * FROM (SELECT * FROM test UNION SELECT * FROM test_not_colocated) u ORDER BY 1,2; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test DEBUG: Router planner cannot handle multi-shard select queries @@ -1135,7 +1135,7 @@ DEBUG: Creating router plan (2 rows) SELECT * FROM (SELECT * FROM test UNION ALL SELECT * FROM test_not_colocated) u ORDER BY 1,2; -DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test DEBUG: Router planner cannot handle multi-shard select queries