diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index d216bb8b4..599810b5c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -114,6 +114,7 @@ typedef struct WalkerState } WalkerState; bool EnableRouterExecution = true; +bool EnableNonColocatedRouterQueryPushdown = false; /* planner functions forward declarations */ @@ -3615,6 +3616,8 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) bool hasDistributedTable = false; bool hasReferenceTable = false; + List *distributedRelationList = NIL; + ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList); foreach(rangeTableRelationCell, rangeTableRelationList) { @@ -3652,6 +3655,8 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) if (IsCitusTableType(distributedTableId, DISTRIBUTED_TABLE)) { hasDistributedTable = true; + distributedRelationList = lappend_oid(distributedRelationList, + distributedTableId); } /* @@ -3706,6 +3711,15 @@ DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *query) NULL, NULL); } + if (!EnableNonColocatedRouterQueryPushdown && + !AllDistributedRelationsInListColocated(distributedRelationList)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "router planner does not support queries that " + "reference non-colocated distributed tables", + NULL, NULL); + } + #if PG_VERSION_NUM >= PG_VERSION_14 DeferredErrorMessage *CTEWithSearchClauseError = ErrorIfQueryHasCTEWithSearchClause(query); diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 3fa3068dc..d39c4affb 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -154,7 +154,6 @@ static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex, static bool AllDistributedRelationsInRestrictionContextColocated( RelationRestrictionContext * restrictionContext); -static bool AllDistributedRelationsInListColocated(List *relationList); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static JoinRestrictionContext * FilterJoinRestrictionContext( JoinRestrictionContext *joinRestrictionContext, Relids @@ -1964,7 +1963,7 @@ AllDistributedRelationsInRTEListColocated(List *rangeTableEntryList) * AllDistributedRelationsInListColocated determines whether all of the * distributed relations in the given list are co-located. */ -static bool +bool AllDistributedRelationsInListColocated(List *relationList) { int initialColocationId = INVALID_COLOCATION_ID; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3c67d9b78..9bed016a5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1268,6 +1268,26 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_non_colocated_router_query_pushdown", + gettext_noop("Enables router planner for the queries that reference " + "non-colocated distributed tables."), + gettext_noop("Normally, router planner planner is only enabled for " + "the queries that reference colocated distributed tables " + "because it is not guaranteed to have the target shards " + "always on the same node, e.g., after rebalancing the " + "shards. For this reason, while enabling this flag allows " + "some degree of optimization for the queries that reference " + "non-colocated distributed tables, it is not guaranteed " + "that the same query will work after rebalancing the shards " + "or altering the shard count of one of those distributed " + "tables."), + &EnableNonColocatedRouterQueryPushdown, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_repartition_joins", gettext_noop("Allows Citus to repartition data between nodes."), diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 89134415b..200c498ef 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -28,6 +28,8 @@ extern bool EnableRouterExecution; extern bool EnableFastPathRouterPlanner; +extern bool EnableNonColocatedRouterQueryPushdown; + extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 07b6348d9..42b2b801f 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -55,4 +55,5 @@ extern RelationRestrictionContext * FilterRelationRestrictionContext( Relids queryRteIdentities); extern bool AllDistributedRelationsInRTEListColocated(List *rangeTableEntryList); +extern bool AllDistributedRelationsInListColocated(List *relationList); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 0294e1060..99cdc9ce4 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -129,12 +129,25 @@ BEGIN; INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs VALUES (5, 'Los Alamos'); COMMIT; +SET citus.enable_non_colocated_router_query_pushdown TO ON; SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; id | lab_id | name | id | name --------------------------------------------------------------------- 8 | 5 | Douglas Engelbart | 5 | Los Alamos (1 row) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- fails because researchers and labs are not colocated +SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers OFFSET 0) researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; + id | lab_id | name | id | name +--------------------------------------------------------------------- + 8 | 5 | Douglas Engelbart | 5 | Los Alamos +(1 row) + +RESET citus.enable_non_colocated_router_query_pushdown; -- and the other way around is also allowed BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index dfbdc7603..a9013889f 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -129,12 +129,25 @@ BEGIN; INSERT INTO researchers_mx VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs_mx VALUES (5, 'Los Alamos'); COMMIT; -SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5;; +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; id | lab_id | name | id | name --------------------------------------------------------------------- 8 | 5 | Douglas Engelbart | 5 | Los Alamos (1 row) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- fails because researchers and labs are not colocated +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5; +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers_mx OFFSET 0) researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + id | lab_id | name | id | name +--------------------------------------------------------------------- + 8 | 5 | Douglas Engelbart | 5 | Los Alamos +(1 row) + +RESET citus.enable_non_colocated_router_query_pushdown; -- and the other way around is also allowed BEGIN; SET LOCAL citus.enable_local_execution TO off; @@ -148,7 +161,8 @@ BEGIN; INSERT INTO researchers_mx VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs_mx VALUES (5, 'Los Alamos'); COMMIT; -SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5; +SET citus.enable_non_colocated_router_query_pushdown TO ON; +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; id | lab_id | name | id | name --------------------------------------------------------------------- 8 | 5 | Douglas Engelbart | 5 | Los Alamos @@ -157,6 +171,21 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id a 8 | 5 | Douglas Engelbart | 5 | Los Alamos (4 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- fails because researchers and labs are not colocated +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5; +ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers_mx OFFSET 0) researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + id | lab_id | name | id | name +--------------------------------------------------------------------- + 8 | 5 | Douglas Engelbart | 5 | Los Alamos + 8 | 5 | Douglas Engelbart | 5 | Los Alamos + 8 | 5 | Douglas Engelbart | 5 | Los Alamos + 8 | 5 | Douglas Engelbart | 5 | Los Alamos +(4 rows) + +RESET citus.enable_non_colocated_router_query_pushdown; -- and the other way around is also allowed BEGIN; SET LOCAL citus.enable_local_execution TO off; diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index bf007be9d..cba5b8181 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -586,11 +586,13 @@ DEBUG: query has a single distribution column value: 10 (3 rows) -- following join is router plannable since the same worker --- has both shards +-- has both shards when citus.enable_non_colocated_router_query_pushdown +-- is enabled +SET citus.enable_non_colocated_router_query_pushdown TO ON; SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash_mx a, articles_single_shard_hash_mx b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER by 1,2 LIMIT 3; DEBUG: Creating router plan DEBUG: query has a single distribution column value: 10 first_author | second_word_count @@ -600,6 +602,45 @@ DEBUG: query has a single distribution column value: 10 10 | 19519 (3 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- but this is not the case otherwise +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash_mx a, articles_single_shard_hash_mx b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER by 1,2 LIMIT 3; +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +RESET citus.enable_non_colocated_router_query_pushdown; -- following join is not router plannable since there are no -- workers containing both shards, but will work through recursive -- planning diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index e0e5bc541..1553309d2 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -767,11 +767,13 @@ DEBUG: query has a single distribution column value: 10 (3 rows) -- following join is router plannable since the same worker --- has both shards +-- has both shards when citus.enable_non_colocated_router_query_pushdown +-- is enabled +SET citus.enable_non_colocated_router_query_pushdown TO ON; SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER BY 1,2 LIMIT 3; DEBUG: Creating router plan DEBUG: query has a single distribution column value: 10 first_author | second_word_count @@ -781,6 +783,45 @@ DEBUG: query has a single distribution column value: 10 10 | 19519 (3 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- but this is not the case otherwise +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +RESET citus.enable_non_colocated_router_query_pushdown; -- following join is not router plannable since there are no -- workers containing both shards, but will work through recursive -- planning @@ -1420,6 +1461,11 @@ DEBUG: Creating router plan --------------------------------------------------------------------- (0 rows) +-- Even if the where clause contains "false", the query is not router +-- plannable when citus.enable_non_colocated_router_query_pushdown +-- is disabled. This is because, the tables are not colocated. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +-- the same query, router plannable SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and false; @@ -1428,6 +1474,17 @@ DEBUG: Creating router plan --------------------------------------------------------------------- (0 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- the same query, _not_ router plannable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id and false; +DEBUG: router planner does not support queries that reference non-colocated distributed tables + first_author | second_word_count +--------------------------------------------------------------------- +(0 rows) + +RESET citus.enable_non_colocated_router_query_pushdown; SELECT * FROM articles_hash WHERE null; @@ -1900,15 +1957,54 @@ DEBUG: Creating router plan -- join between hash and range partition tables are router plannable -- only if both tables pruned down to single shard and co-located on the same -- node. --- router plannable +SET citus.enable_non_colocated_router_query_pushdown TO ON; +-- router plannable when citus.enable_non_colocated_router_query_pushdown is on SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) - WHERE ar.author_id = 2; + WHERE ar.author_id = 2 ORDER BY 1,2,3,4,5,6; DEBUG: Creating router plan DEBUG: query has a single distribution column value: 2 id | author_id | title | word_count | name | id --------------------------------------------------------------------- (0 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +-- not router plannable otherwise +SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) + WHERE ar.author_id = 2 ORDER BY 1,2,3,4,5,6; +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 15 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 20 + id | author_id | title | word_count | name | id +--------------------------------------------------------------------- +(0 rows) + +RESET citus.enable_non_colocated_router_query_pushdown; -- not router plannable SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) WHERE ar.author_id = 3; @@ -2476,5 +2572,55 @@ DROP USER router_user; \c - - - :master_port DROP OWNED BY router_user; DROP USER router_user; +SET search_path TO multi_router_planner; +SET citus.next_shard_id TO 850000; +SET citus.shard_replication_factor TO 1; +CREATE TABLE single_shard_dist(a int, b int); +SELECT create_distributed_table('single_shard_dist', 'a', shard_count=>1); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_replication_factor TO 2; +CREATE TABLE table_with_four_shards(a int, b int); +SELECT create_distributed_table('table_with_four_shards', 'a', shard_count=>4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG2; +-- Make sure that router rejects planning this query because +-- the target shards are not placed on the same node when +-- citus.enable_non_colocated_router_query_pushdown is disabled. +-- Otherwise, it throws a somewhat meaningless error but we assume +-- that the user is aware of the setting. +SET citus.enable_non_colocated_router_query_pushdown TO ON; +WITH cte AS ( + DELETE FROM table_with_four_shards WHERE a = 1 RETURNING * +) +SELECT * FROM single_shard_dist WHERE b IN (SELECT b FROM cte); +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +ERROR: relation "multi_router_planner.single_shard_dist_850000" does not exist +CONTEXT: while executing command on localhost:xxxxx +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +WITH cte AS ( + DELETE FROM table_with_four_shards WHERE a = 1 RETURNING * +) +SELECT * FROM single_shard_dist WHERE b IN (SELECT b FROM cte); +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: generating subplan XXX_1 for CTE cte: DELETE FROM multi_router_planner.table_with_four_shards WHERE (a OPERATOR(pg_catalog.=) 1) RETURNING a, b +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM multi_router_planner.single_shard_dist WHERE (b OPERATOR(pg_catalog.=) ANY (SELECT cte.b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte)) +DEBUG: Creating router plan + a | b +--------------------------------------------------------------------- +(0 rows) + +RESET citus.enable_non_colocated_router_query_pushdown; SET client_min_messages TO WARNING; DROP SCHEMA multi_router_planner CASCADE; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 4578d69a8..646c42599 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -481,12 +481,13 @@ DEBUG: query has a single distribution column value: 10 10 | 6363 (3 rows) --- now show that JOINs with multiple tables are not router executable --- they are executed by real-time executor +-- Not router plannable when citus.enable_non_colocated_router_query_pushdown +-- is disabled. +SET citus.enable_non_colocated_router_query_pushdown TO ON; SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER BY 1,2 LIMIT 3; DEBUG: Creating router plan DEBUG: query has a single distribution column value: 10 first_author | second_word_count @@ -496,6 +497,88 @@ DEBUG: query has a single distribution column value: 10 10 | 19519 (3 rows) +SET citus.enable_non_colocated_router_query_pushdown TO OFF; +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles_single_shard b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- but they can be executed via repartition join planner +SET citus.enable_repartition_joins TO ON; +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles_single_shard b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; +DEBUG: router planner does not support queries that reference non-colocated distributed tables +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 + first_author | second_word_count +--------------------------------------------------------------------- + 10 | 19519 + 10 | 19519 + 10 | 19519 +(3 rows) + +RESET citus.enable_repartition_joins; +RESET citus.enable_non_colocated_router_query_pushdown; -- do not create the master query for LIMIT on a single shard SELECT SELECT * FROM articles diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 506480093..d38f0cc99 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -117,8 +117,20 @@ INSERT INTO researchers VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs VALUES (5, 'Los Alamos'); COMMIT; +SET citus.enable_non_colocated_router_query_pushdown TO ON; + SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- fails because researchers and labs are not colocated +SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; + +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers OFFSET 0) researchers, labs WHERE labs.id = researchers.lab_id AND researchers.lab_id = 5; + +RESET citus.enable_non_colocated_router_query_pushdown; + -- and the other way around is also allowed BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); diff --git a/src/test/regress/sql/multi_mx_modifying_xacts.sql b/src/test/regress/sql/multi_mx_modifying_xacts.sql index 15335f579..924267c8d 100644 --- a/src/test/regress/sql/multi_mx_modifying_xacts.sql +++ b/src/test/regress/sql/multi_mx_modifying_xacts.sql @@ -116,7 +116,19 @@ INSERT INTO researchers_mx VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs_mx VALUES (5, 'Los Alamos'); COMMIT; -SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5;; +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- fails because researchers and labs are not colocated +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5; + +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers_mx OFFSET 0) researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + +RESET citus.enable_non_colocated_router_query_pushdown; -- and the other way around is also allowed BEGIN; @@ -133,8 +145,20 @@ INSERT INTO researchers_mx VALUES (8, 5, 'Douglas Engelbart'); INSERT INTO labs_mx VALUES (5, 'Los Alamos'); COMMIT; +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- fails because researchers and labs are not colocated SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5; +-- works thanks to "OFFSET 0" trick +SELECT * FROM (SELECT * FROM researchers_mx OFFSET 0) researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id and researchers_mx.lab_id = 5 ORDER BY 1,2,3,4,5; + +RESET citus.enable_non_colocated_router_query_pushdown; + -- and the other way around is also allowed BEGIN; SET LOCAL citus.enable_local_execution TO off; diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index 6a1271720..3593c2ac8 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -275,11 +275,25 @@ SELECT a.author_id as first_author, b.word_count as second_word_count LIMIT 3; -- following join is router plannable since the same worker --- has both shards +-- has both shards when citus.enable_non_colocated_router_query_pushdown +-- is enabled + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash_mx a, articles_single_shard_hash_mx b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER by 1,2 LIMIT 3; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- but this is not the case otherwise +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash_mx a, articles_single_shard_hash_mx b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER by 1,2 LIMIT 3; + +RESET citus.enable_non_colocated_router_query_pushdown; -- following join is not router plannable since there are no -- workers containing both shards, but will work through recursive diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 142568d5d..2ccd43ea3 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -384,11 +384,26 @@ SELECT a.author_id as first_author, b.word_count as second_word_count LIMIT 3; -- following join is router plannable since the same worker --- has both shards +-- has both shards when citus.enable_non_colocated_router_query_pushdown +-- is enabled + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER BY 1,2 LIMIT 3; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- but this is not the case otherwise + +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; + +RESET citus.enable_non_colocated_router_query_pushdown; -- following join is not router plannable since there are no -- workers containing both shards, but will work through recursive @@ -649,10 +664,26 @@ SELECT * FROM articles_hash WHERE author_id = 1 and 1=0; +-- Even if the where clause contains "false", the query is not router +-- plannable when citus.enable_non_colocated_router_query_pushdown +-- is disabled. This is because, the tables are not colocated. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +-- the same query, router plannable SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id and false; +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- the same query, _not_ router plannable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id and false; + +RESET citus.enable_non_colocated_router_query_pushdown; + SELECT * FROM articles_hash WHERE null; @@ -903,9 +934,20 @@ SELECT * FROM articles_range ar join authors_range au on (ar.id = au.id) -- join between hash and range partition tables are router plannable -- only if both tables pruned down to single shard and co-located on the same -- node. --- router plannable + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +-- router plannable when citus.enable_non_colocated_router_query_pushdown is on SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) - WHERE ar.author_id = 2; + WHERE ar.author_id = 2 ORDER BY 1,2,3,4,5,6; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +-- not router plannable otherwise +SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) + WHERE ar.author_id = 2 ORDER BY 1,2,3,4,5,6; + +RESET citus.enable_non_colocated_router_query_pushdown; -- not router plannable SELECT * FROM articles_hash ar join authors_range au on (ar.author_id = au.id) @@ -1213,5 +1255,40 @@ DROP USER router_user; DROP OWNED BY router_user; DROP USER router_user; +SET search_path TO multi_router_planner; +SET citus.next_shard_id TO 850000; + +SET citus.shard_replication_factor TO 1; +CREATE TABLE single_shard_dist(a int, b int); +SELECT create_distributed_table('single_shard_dist', 'a', shard_count=>1); + +SET citus.shard_replication_factor TO 2; +CREATE TABLE table_with_four_shards(a int, b int); +SELECT create_distributed_table('table_with_four_shards', 'a', shard_count=>4); + +SET client_min_messages TO DEBUG2; + +-- Make sure that router rejects planning this query because +-- the target shards are not placed on the same node when +-- citus.enable_non_colocated_router_query_pushdown is disabled. +-- Otherwise, it throws a somewhat meaningless error but we assume +-- that the user is aware of the setting. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + +WITH cte AS ( + DELETE FROM table_with_four_shards WHERE a = 1 RETURNING * +) +SELECT * FROM single_shard_dist WHERE b IN (SELECT b FROM cte); + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +WITH cte AS ( + DELETE FROM table_with_four_shards WHERE a = 1 RETURNING * +) +SELECT * FROM single_shard_dist WHERE b IN (SELECT b FROM cte); + +RESET citus.enable_non_colocated_router_query_pushdown; + SET client_min_messages TO WARNING; DROP SCHEMA multi_router_planner CASCADE; diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 7fcf45b1c..bb1a1f85b 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -248,12 +248,34 @@ SELECT a.author_id as first_author, b.word_count as second_word_count WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; --- now show that JOINs with multiple tables are not router executable --- they are executed by real-time executor +-- Not router plannable when citus.enable_non_colocated_router_query_pushdown +-- is disabled. + +SET citus.enable_non_colocated_router_query_pushdown TO ON; + SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id - LIMIT 3; + ORDER BY 1,2 LIMIT 3; + +SET citus.enable_non_colocated_router_query_pushdown TO OFF; + +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles_single_shard b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; + +-- but they can be executed via repartition join planner +SET citus.enable_repartition_joins TO ON; + +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles a, articles_single_shard b + WHERE a.author_id = 10 and a.author_id = b.author_id + ORDER BY 1,2 LIMIT 3; + +RESET citus.enable_repartition_joins; + +RESET citus.enable_non_colocated_router_query_pushdown; -- do not create the master query for LIMIT on a single shard SELECT SELECT *