mirror of https://github.com/citusdata/citus.git
Merge 7d05498235
into 136306a1fe
commit
b18b59707c
|
@ -31,6 +31,9 @@ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
|
|||
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
|
||||
|
||||
|
||||
static bool IsRoutableQuery(MultiPlan *multiPlan);
|
||||
|
||||
|
||||
/*
|
||||
* JobExecutorType selects the executor type for the given multiPlan using the task
|
||||
* executor type config value. The function then checks if the given multiPlan needs
|
||||
|
@ -63,6 +66,22 @@ JobExecutorType(MultiPlan *multiPlan)
|
|||
}
|
||||
}
|
||||
|
||||
if (executorType == MULTI_EXECUTOR_DYNAMIC)
|
||||
{
|
||||
if (dependedJobCount > 0)
|
||||
{
|
||||
executorType = MULTI_EXECUTOR_TASK_TRACKER;
|
||||
}
|
||||
else if (IsRoutableQuery(multiPlan))
|
||||
{
|
||||
executorType = MULTI_EXECUTOR_ROUTER;
|
||||
}
|
||||
else
|
||||
{
|
||||
executorType = MULTI_EXECUTOR_REAL_TIME;
|
||||
}
|
||||
}
|
||||
|
||||
if (executorType == MULTI_EXECUTOR_REAL_TIME)
|
||||
{
|
||||
double reasonableConnectionCount = 0;
|
||||
|
@ -167,6 +186,60 @@ JobExecutorType(MultiPlan *multiPlan)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsRoutableQuery returns whether a multi-plan can be executed using the
|
||||
* router executor.
|
||||
*/
|
||||
static bool
|
||||
IsRoutableQuery(MultiPlan *multiPlan)
|
||||
{
|
||||
Job *job = multiPlan->workerJob;
|
||||
Query *masterQuery = multiPlan->masterQuery;
|
||||
List *workerTaskList = job->taskList;
|
||||
int taskCount = list_length(workerTaskList);
|
||||
int dependedJobCount = list_length(job->dependedJobList);
|
||||
|
||||
Task *workerTask = NULL;
|
||||
List *workerDependentTaskList = NIL;
|
||||
bool masterQueryHasAggregates = false;
|
||||
|
||||
/* router executor cannot execute repartition jobs */
|
||||
if (dependedJobCount > 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* router executor cannot execute queries that hit more than one shard */
|
||||
if (taskCount != 1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* router executor cannot execute queries with dependent data fetch tasks */
|
||||
workerTask = list_nth(workerTaskList, 0);
|
||||
workerDependentTaskList = workerTask->dependedTaskList;
|
||||
if (list_length(workerDependentTaskList) > 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* router executor cannot execute queries with order by */
|
||||
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* router executor cannot execute queries with aggregates */
|
||||
masterQueryHasAggregates = job->jobQuery->hasAggs;
|
||||
if (masterQueryHasAggregates)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MaxMasterConnectionCount returns the number of connections a master can open.
|
||||
* A master cannot create more than a certain number of file descriptors (FDs).
|
||||
|
|
|
@ -58,6 +58,7 @@ static const struct config_enum_entry task_executor_type_options[] = {
|
|||
{"real-time", MULTI_EXECUTOR_REAL_TIME, false},
|
||||
{"task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false},
|
||||
{"router", MULTI_EXECUTOR_ROUTER, false},
|
||||
{"dynamic", MULTI_EXECUTOR_DYNAMIC, false},
|
||||
{NULL, 0, false}
|
||||
};
|
||||
|
||||
|
@ -463,7 +464,8 @@ RegisterCitusConfigVariables(void)
|
|||
"aggregations and/or co-located joins on multiple shards. The "
|
||||
"task-tracker executor is optimal for long-running, complex "
|
||||
"queries that touch thousands of shards and/or that involve "
|
||||
"table repartitioning."),
|
||||
"table repartitioning. The dynamic executor automatically "
|
||||
"chooses between them."),
|
||||
&TaskExecutorType,
|
||||
MULTI_EXECUTOR_REAL_TIME,
|
||||
task_executor_type_options,
|
||||
|
|
|
@ -96,7 +96,8 @@ typedef enum
|
|||
MULTI_EXECUTOR_INVALID_FIRST = 0,
|
||||
MULTI_EXECUTOR_REAL_TIME = 1,
|
||||
MULTI_EXECUTOR_TASK_TRACKER = 2,
|
||||
MULTI_EXECUTOR_ROUTER = 3
|
||||
MULTI_EXECUTOR_ROUTER = 3,
|
||||
MULTI_EXECUTOR_DYNAMIC = 4
|
||||
|
||||
} MultiExecutorType;
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
--
|
||||
-- MULTI_DYNAMIC_EXECUTOR
|
||||
--
|
||||
-- Run different types of queries using the dynamic executor.
|
||||
SET citusdb.large_table_shard_count TO 2;
|
||||
SET citusdb.task_executor_type TO 'dynamic';
|
||||
-- Should be able to run regular queries with dynamic executor
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders;
|
||||
count
|
||||
-------
|
||||
2984
|
||||
(1 row)
|
||||
|
||||
-- Should be able to run re-partition queries with dynamic executor
|
||||
SELECT
|
||||
o_orderkey, count(*)
|
||||
FROM
|
||||
orders, customer
|
||||
WHERE
|
||||
c_custkey = o_custkey
|
||||
GROUP BY
|
||||
o_orderkey
|
||||
ORDER BY
|
||||
o_orderkey
|
||||
LIMIT 1;
|
||||
o_orderkey | count
|
||||
------------+-------
|
||||
1 | 1
|
||||
(1 row)
|
||||
|
||||
-- Should be able to run router queries with dynamic executor
|
||||
SELECT
|
||||
o_orderkey
|
||||
FROM
|
||||
orders
|
||||
WHERE
|
||||
o_orderkey = 1;
|
||||
o_orderkey
|
||||
------------
|
||||
1
|
||||
(1 row)
|
||||
|
|
@ -118,6 +118,7 @@ test: multi_simple_queries
|
|||
test: multi_utilities
|
||||
test: multi_create_insert_proxy
|
||||
test: multi_data_types
|
||||
test: multi_dynamic_executor
|
||||
|
||||
# ----------
|
||||
# multi_large_shardid stages more shards into lineitem
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
--
|
||||
-- MULTI_DYNAMIC_EXECUTOR
|
||||
--
|
||||
-- Run different types of queries using the dynamic executor.
|
||||
|
||||
SET citusdb.large_table_shard_count TO 2;
|
||||
SET citusdb.task_executor_type TO 'dynamic';
|
||||
|
||||
-- Should be able to run regular queries with dynamic executor
|
||||
SELECT
|
||||
count(*)
|
||||
FROM
|
||||
orders;
|
||||
|
||||
-- Should be able to run re-partition queries with dynamic executor
|
||||
SELECT
|
||||
o_orderkey, count(*)
|
||||
FROM
|
||||
orders, customer
|
||||
WHERE
|
||||
c_custkey = o_custkey
|
||||
GROUP BY
|
||||
o_orderkey
|
||||
ORDER BY
|
||||
o_orderkey
|
||||
LIMIT 1;
|
||||
|
||||
-- Should be able to run router queries with dynamic executor
|
||||
SELECT
|
||||
o_orderkey
|
||||
FROM
|
||||
orders
|
||||
WHERE
|
||||
o_orderkey = 1;
|
Loading…
Reference in New Issue