diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 38b9f4fcd..57cb46f40 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -29,6 +29,7 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ +bool EnableRepartitionJoins = false; /* @@ -98,13 +99,25 @@ JobExecutorType(DistributedPlan *distributedPlan) "\"task-tracker\"."))); } - /* if we have repartition jobs with real time executor, error out */ + /* if we have repartition jobs with real time executor and repartition + * joins are not enabled, error out. Otherwise, switch to task-tracker + */ dependedJobCount = list_length(job->dependedJobList); if (dependedJobCount > 0) { - ereport(ERROR, (errmsg("cannot use real time executor with repartition jobs"), - errhint("Set citus.task_executor_type to " - "\"task-tracker\"."))); + if (!EnableRepartitionJoins) + { + ereport(ERROR, (errmsg( + "the query contains a join that requires repartitioning"), + errhint("Set citus.enable_repartition_joins to on " + "to enable repartitioning"))); + } + + ereport(DEBUG1, (errmsg( + "cannot use real time executor with repartition jobs"), + errhint("Since you enabled citus.enable_repartition_joins " + "Citus chose to use task-tracker."))); + return MULTI_EXECUTOR_TASK_TRACKER; } } else diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 588336e0f..ab6cb2633 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -746,6 +746,16 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_repartition_joins", + gettext_noop("Allows Citus to use task-tracker executor when necessary."), + NULL, + &EnableRepartitionJoins, + false, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.shard_placement_policy", gettext_noop("Sets the policy to use when choosing nodes for shard placement."), diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index c062bd84a..a22176d95 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -190,6 +190,7 @@ typedef struct WorkerNodeState extern int RemoteTaskCheckInterval; extern int MaxAssignTaskBatchSize; extern int TaskExecutorType; +extern bool EnableRepartitionJoins; extern bool BinaryMasterCopyFormat; extern int MultiTaskQueryLogLevel; diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index d9be03b44..be69db7ce 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -453,8 +453,8 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 14 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SELECT articles_hash_mx.id,test.word_count FROM articles_hash_mx, (SELECT id, word_count FROM articles_hash_mx) AS test WHERE test.id = articles_hash_mx.id and articles_hash_mx.author_id = 1 @@ -487,8 +487,8 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- subqueries are not supported in SELECT clause SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash_mx a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles_hash_mx a; @@ -793,8 +793,8 @@ LIMIT 5; SELECT * FROM articles_hash_mx a, articles_hash_mx b WHERE a.id = b.id AND a.author_id = 1; -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- queries which hit more than 1 shards are not router plannable or executable -- handled by real-time executor SELECT * diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index c683d9b9a..d9e583bcf 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -569,8 +569,8 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 14 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SELECT articles_hash.id,test.word_count FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id and articles_hash.author_id = 1 @@ -603,8 +603,8 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- subqueries are not supported in SELECT clause SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles_hash a; @@ -910,8 +910,23 @@ LIMIT 5; SELECT * FROM articles_hash a, articles_hash b WHERE a.id = b.id AND a.author_id = 1; -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning +-- by setting enable_repartition_joins we can make this query run +SET citus.enable_repartition_joins TO ON; +SELECT * + FROM articles_hash a, articles_hash b + WHERE a.id = b.id AND a.author_id = 1; + id | author_id | title | word_count | id | author_id | title | word_count +----+-----------+--------------+------------+----+-----------+--------------+------------ + 41 | 1 | aznavour | 11814 | 41 | 1 | aznavour | 11814 + 11 | 1 | alamo | 1347 | 11 | 1 | alamo | 1347 + 31 | 1 | athwartships | 7271 | 31 | 1 | athwartships | 7271 + 1 | 1 | arsenous | 9572 | 1 | 1 | arsenous | 9572 + 21 | 1 | arcading | 5890 | 21 | 1 | arcading | 5890 +(5 rows) + +SET citus.enable_repartition_joins TO OFF; -- queries which hit more than 1 shards are not router plannable or executable -- handled by real-time executor SELECT * @@ -1674,8 +1689,8 @@ DEBUG: pruning merge fetch taskId 19 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 22 DETAIL: Creating dependency on merge taskId 9 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- join between a range partitioned table and reference table is router plannable SELECT * FROM articles_range ar join authors_reference au on (ar.author_id = au.id) WHERE ar.author_id = 1; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 04abdf5ee..17aaf100a 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -559,8 +559,8 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- system columns from shard tables can be queried and retrieved SELECT count(*) FROM ( SELECT tableoid, ctid, cmin, cmax, xmin, xmax diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index a1adb8cea..b7c7da2e9 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -40,8 +40,8 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index ed0607a47..fab72b2cf 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -173,8 +173,8 @@ SELECT o_orderkey, l_linenumber FROM priority_orders left join air_shipped_linei -- repartition query on view join -- it passes planning, fails at execution stage SELECT * FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); -ERROR: cannot use real time executor with repartition jobs -HINT: Set citus.task_executor_type to "task-tracker". +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.task_executor_type to "task-tracker"; SELECT count(*) FROM priority_orders JOIN air_shipped_lineitems ON (o_custkey = l_suppkey); count diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 8b6c71e99..66620d1c5 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -445,6 +445,12 @@ SELECT * FROM articles_hash a, articles_hash b WHERE a.id = b.id AND a.author_id = 1; +-- by setting enable_repartition_joins we can make this query run +SET citus.enable_repartition_joins TO ON; +SELECT * + FROM articles_hash a, articles_hash b + WHERE a.id = b.id AND a.author_id = 1; +SET citus.enable_repartition_joins TO OFF; -- queries which hit more than 1 shards are not router plannable or executable -- handled by real-time executor SELECT *