Add a dynamic executor that chooses the executor automatically

pull/330/head
Marco Slot 2016-02-11 23:33:01 +01:00
parent 136306a1fe
commit 7d05498235
6 changed files with 158 additions and 2 deletions

View File

@ -31,6 +31,9 @@ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
static bool IsRoutableQuery(MultiPlan *multiPlan);
/*
* JobExecutorType selects the executor type for the given multiPlan using the task
* executor type config value. The function then checks if the given multiPlan needs
@ -63,6 +66,22 @@ JobExecutorType(MultiPlan *multiPlan)
}
}
if (executorType == MULTI_EXECUTOR_DYNAMIC)
{
if (dependedJobCount > 0)
{
executorType = MULTI_EXECUTOR_TASK_TRACKER;
}
else if (IsRoutableQuery(multiPlan))
{
executorType = MULTI_EXECUTOR_ROUTER;
}
else
{
executorType = MULTI_EXECUTOR_REAL_TIME;
}
}
if (executorType == MULTI_EXECUTOR_REAL_TIME)
{
double reasonableConnectionCount = 0;
@ -167,6 +186,60 @@ JobExecutorType(MultiPlan *multiPlan)
}
/*
* IsRoutableQuery returns whether a multi-plan can be executed using the
* router executor.
*/
static bool
IsRoutableQuery(MultiPlan *multiPlan)
{
Job *job = multiPlan->workerJob;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
Task *workerTask = NULL;
List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false;
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/* router executor cannot execute queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* router executor cannot execute queries with dependent data fetch tasks */
workerTask = list_nth(workerTaskList, 0);
workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0)
{
return false;
}
/* router executor cannot execute queries with order by */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/* router executor cannot execute queries with aggregates */
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
return true;
}
/*
* MaxMasterConnectionCount returns the number of connections a master can open.
* A master cannot create more than a certain number of file descriptors (FDs).

View File

@ -58,6 +58,7 @@ static const struct config_enum_entry task_executor_type_options[] = {
{"real-time", MULTI_EXECUTOR_REAL_TIME, false},
{"task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false},
{"router", MULTI_EXECUTOR_ROUTER, false},
{"dynamic", MULTI_EXECUTOR_DYNAMIC, false},
{NULL, 0, false}
};
@ -463,7 +464,8 @@ RegisterCitusConfigVariables(void)
"aggregations and/or co-located joins on multiple shards. The "
"task-tracker executor is optimal for long-running, complex "
"queries that touch thousands of shards and/or that involve "
"table repartitioning."),
"table repartitioning. The dynamic executor automatically "
"chooses between them."),
&TaskExecutorType,
MULTI_EXECUTOR_REAL_TIME,
task_executor_type_options,

View File

@ -96,7 +96,8 @@ typedef enum
MULTI_EXECUTOR_INVALID_FIRST = 0,
MULTI_EXECUTOR_REAL_TIME = 1,
MULTI_EXECUTOR_TASK_TRACKER = 2,
MULTI_EXECUTOR_ROUTER = 3
MULTI_EXECUTOR_ROUTER = 3,
MULTI_EXECUTOR_DYNAMIC = 4
} MultiExecutorType;

View File

@ -0,0 +1,45 @@
--
-- MULTI_DYNAMIC_EXECUTOR
--
-- Run different types of queries using the dynamic executor.
SET citusdb.large_table_shard_count TO 2;
SET citusdb.task_executor_type TO 'dynamic';
-- Should be able to run regular queries with dynamic executor
SELECT
count(*)
FROM
orders;
count
-------
2984
(1 row)
-- Should be able to run re-partition queries with dynamic executor
SELECT
o_orderkey, count(*)
FROM
orders, customer
WHERE
c_custkey = o_custkey
GROUP BY
o_orderkey
ORDER BY
o_orderkey
LIMIT 1;
o_orderkey | count
------------+-------
1 | 1
(1 row)
-- Should be able to run router queries with dynamic executor
SELECT
o_orderkey
FROM
orders
WHERE
o_orderkey = 1;
o_orderkey
------------
1
(1 row)

View File

@ -118,6 +118,7 @@ test: multi_simple_queries
test: multi_utilities
test: multi_create_insert_proxy
test: multi_data_types
test: multi_dynamic_executor
# ----------
# multi_large_shardid stages more shards into lineitem

View File

@ -0,0 +1,34 @@
--
-- MULTI_DYNAMIC_EXECUTOR
--
-- Run different types of queries using the dynamic executor.
SET citusdb.large_table_shard_count TO 2;
SET citusdb.task_executor_type TO 'dynamic';
-- Should be able to run regular queries with dynamic executor
SELECT
count(*)
FROM
orders;
-- Should be able to run re-partition queries with dynamic executor
SELECT
o_orderkey, count(*)
FROM
orders, customer
WHERE
c_custkey = o_custkey
GROUP BY
o_orderkey
ORDER BY
o_orderkey
LIMIT 1;
-- Should be able to run router queries with dynamic executor
SELECT
o_orderkey
FROM
orders
WHERE
o_orderkey = 1;