diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 1f143778d..761d11fcc 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -31,6 +31,9 @@ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ +static bool IsRoutableQuery(MultiPlan *multiPlan); + + /* * JobExecutorType selects the executor type for the given multiPlan using the task * executor type config value. The function then checks if the given multiPlan needs @@ -63,6 +66,22 @@ JobExecutorType(MultiPlan *multiPlan) } } + if (executorType == MULTI_EXECUTOR_DYNAMIC) + { + if (dependedJobCount > 0) + { + executorType = MULTI_EXECUTOR_TASK_TRACKER; + } + else if (IsRoutableQuery(multiPlan)) + { + executorType = MULTI_EXECUTOR_ROUTER; + } + else + { + executorType = MULTI_EXECUTOR_REAL_TIME; + } + } + if (executorType == MULTI_EXECUTOR_REAL_TIME) { double reasonableConnectionCount = 0; @@ -167,6 +186,60 @@ JobExecutorType(MultiPlan *multiPlan) } +/* + * IsRoutableQuery returns whether a multi-plan can be executed using the + * router executor. + */ +static bool +IsRoutableQuery(MultiPlan *multiPlan) +{ + Job *job = multiPlan->workerJob; + Query *masterQuery = multiPlan->masterQuery; + List *workerTaskList = job->taskList; + int taskCount = list_length(workerTaskList); + int dependedJobCount = list_length(job->dependedJobList); + + Task *workerTask = NULL; + List *workerDependentTaskList = NIL; + bool masterQueryHasAggregates = false; + + /* router executor cannot execute repartition jobs */ + if (dependedJobCount > 0) + { + return false; + } + + /* router executor cannot execute queries that hit more than one shard */ + if (taskCount != 1) + { + return false; + } + + /* router executor cannot execute queries with dependent data fetch tasks */ + workerTask = list_nth(workerTaskList, 0); + workerDependentTaskList = workerTask->dependedTaskList; + if (list_length(workerDependentTaskList) > 0) + { + return false; + } + + /* router executor cannot execute queries with order by */ + if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0) + { + return false; + } + + /* router executor cannot execute queries with aggregates */ + masterQueryHasAggregates = job->jobQuery->hasAggs; + if (masterQueryHasAggregates) + { + return false; + } + + return true; +} + + /* * MaxMasterConnectionCount returns the number of connections a master can open. * A master cannot create more than a certain number of file descriptors (FDs). diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 344665eb0..724bdba9d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -58,6 +58,7 @@ static const struct config_enum_entry task_executor_type_options[] = { {"real-time", MULTI_EXECUTOR_REAL_TIME, false}, {"task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false}, {"router", MULTI_EXECUTOR_ROUTER, false}, + {"dynamic", MULTI_EXECUTOR_DYNAMIC, false}, {NULL, 0, false} }; @@ -463,7 +464,8 @@ RegisterCitusConfigVariables(void) "aggregations and/or co-located joins on multiple shards. The " "task-tracker executor is optimal for long-running, complex " "queries that touch thousands of shards and/or that involve " - "table repartitioning."), + "table repartitioning. The dynamic executor automatically " + "chooses between them."), &TaskExecutorType, MULTI_EXECUTOR_REAL_TIME, task_executor_type_options, diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 83105cc54..c070ded68 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -96,7 +96,8 @@ typedef enum MULTI_EXECUTOR_INVALID_FIRST = 0, MULTI_EXECUTOR_REAL_TIME = 1, MULTI_EXECUTOR_TASK_TRACKER = 2, - MULTI_EXECUTOR_ROUTER = 3 + MULTI_EXECUTOR_ROUTER = 3, + MULTI_EXECUTOR_DYNAMIC = 4 } MultiExecutorType; diff --git a/src/test/regress/expected/multi_dynamic_executor.out b/src/test/regress/expected/multi_dynamic_executor.out new file mode 100644 index 000000000..9b9f804b7 --- /dev/null +++ b/src/test/regress/expected/multi_dynamic_executor.out @@ -0,0 +1,45 @@ +-- +-- MULTI_DYNAMIC_EXECUTOR +-- +-- Run different types of queries using the dynamic executor. +SET citusdb.large_table_shard_count TO 2; +SET citusdb.task_executor_type TO 'dynamic'; +-- Should be able to run regular queries with dynamic executor +SELECT + count(*) +FROM + orders; + count +------- + 2984 +(1 row) + +-- Should be able to run re-partition queries with dynamic executor +SELECT + o_orderkey, count(*) +FROM + orders, customer +WHERE + c_custkey = o_custkey +GROUP BY + o_orderkey +ORDER BY + o_orderkey +LIMIT 1; + o_orderkey | count +------------+------- + 1 | 1 +(1 row) + +-- Should be able to run router queries with dynamic executor +SELECT + o_orderkey +FROM + orders +WHERE + o_orderkey = 1; + o_orderkey +------------ + 1 +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 39ea82a34..e44fe80fb 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -118,6 +118,7 @@ test: multi_simple_queries test: multi_utilities test: multi_create_insert_proxy test: multi_data_types +test: multi_dynamic_executor # ---------- # multi_large_shardid stages more shards into lineitem diff --git a/src/test/regress/sql/multi_dynamic_executor.sql b/src/test/regress/sql/multi_dynamic_executor.sql new file mode 100644 index 000000000..403e4c670 --- /dev/null +++ b/src/test/regress/sql/multi_dynamic_executor.sql @@ -0,0 +1,34 @@ +-- +-- MULTI_DYNAMIC_EXECUTOR +-- +-- Run different types of queries using the dynamic executor. + +SET citusdb.large_table_shard_count TO 2; +SET citusdb.task_executor_type TO 'dynamic'; + +-- Should be able to run regular queries with dynamic executor +SELECT + count(*) +FROM + orders; + +-- Should be able to run re-partition queries with dynamic executor +SELECT + o_orderkey, count(*) +FROM + orders, customer +WHERE + c_custkey = o_custkey +GROUP BY + o_orderkey +ORDER BY + o_orderkey +LIMIT 1; + +-- Should be able to run router queries with dynamic executor +SELECT + o_orderkey +FROM + orders +WHERE + o_orderkey = 1;