Merge pull request #427 from citusdata/feature/dynamic_executor

Add router planner and dynamic executor selection
pull/1938/head
Önder Kalacı 2016-04-21 15:33:34 +03:00
commit f8f1210c0a
22 changed files with 2309 additions and 262 deletions

View File

@ -42,7 +42,8 @@ bool AllModificationsCommutative = false;
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static int32 ExecuteDistributedModify(Task *task); static int32 ExecuteDistributedModify(Task *task);
static void ExecuteSingleShardSelect(Task *task, EState *executorState, static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount,
Task *task, EState *executorState,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
DestReceiver *destination); DestReceiver *destination);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query); static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
@ -57,7 +58,6 @@ static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
void void
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
{ {
bool topLevel = true;
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
EState *executorState = NULL; EState *executorState = NULL;
CmdType commandType = queryDesc->operation; CmdType commandType = queryDesc->operation;
@ -65,9 +65,13 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
/* ensure that the task is not NULL */ /* ensure that the task is not NULL */
Assert(task != NULL); Assert(task != NULL);
/* disallow transactions and triggers during distributed commands */ /* disallow transactions and triggers during distributed modify commands */
if (commandType != CMD_SELECT)
{
bool topLevel = true;
PreventTransactionChain(topLevel, "distributed commands"); PreventTransactionChain(topLevel, "distributed commands");
eflags |= EXEC_FLAG_SKIP_TRIGGERS; eflags |= EXEC_FLAG_SKIP_TRIGGERS;
}
/* signal that it is a router execution */ /* signal that it is a router execution */
eflags |= EXEC_FLAG_CITUS_ROUTER_EXECUTOR; eflags |= EXEC_FLAG_CITUS_ROUTER_EXECUTOR;
@ -79,6 +83,13 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
queryDesc->estate = executorState; queryDesc->estate = executorState;
/*
* As it's similar to what we're doing, use a MaterialState node to store
* our state. This is used to store our tuplestore, so cursors etc. can
* work.
*/
queryDesc->planstate = (PlanState *) makeNode(MaterialState);
#if (PG_VERSION_NUM < 90500) #if (PG_VERSION_NUM < 90500)
/* make sure that upsertQuery is false for versions that UPSERT is not available */ /* make sure that upsertQuery is false for versions that UPSERT is not available */
@ -181,12 +192,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
errmsg("scan directions other than forward scans " errmsg("scan directions other than forward scans "
"are unsupported"))); "are unsupported")));
} }
if (count != 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("fetching rows from a query using a cursor "
"is unsupported")));
}
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
@ -206,7 +211,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
DestReceiver *destination = queryDesc->dest; DestReceiver *destination = queryDesc->dest;
TupleDesc resultTupleDescriptor = queryDesc->tupDesc; TupleDesc resultTupleDescriptor = queryDesc->tupDesc;
ExecuteSingleShardSelect(task, estate, resultTupleDescriptor, destination); ExecuteSingleShardSelect(queryDesc, count, task, estate,
resultTupleDescriptor, destination);
} }
else else
{ {
@ -310,25 +316,33 @@ ExecuteDistributedModify(Task *task)
/* /*
* ExecuteSingleShardSelect executes the remote select query and sends the * ExecuteSingleShardSelect executes, if not done already, the remote select query and
* resultant tuples to the given destination receiver. If the query fails on a * sends the resulting tuples to the given destination receiver. If the query fails on a
* given placement, the function attempts it on its replica. * given placement, the function attempts it on its replica.
*/ */
static void static void
ExecuteSingleShardSelect(Task *task, EState *executorState, ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task,
TupleDesc tupleDescriptor, DestReceiver *destination) EState *executorState, TupleDesc tupleDescriptor,
DestReceiver *destination)
{ {
Tuplestorestate *tupleStore = NULL;
bool resultsOK = false; bool resultsOK = false;
TupleTableSlot *tupleTableSlot = NULL; TupleTableSlot *tupleTableSlot = NULL;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
Tuplestorestate *tupleStore = routerState->tuplestorestate;
uint64 currentTupleCount = 0;
tupleStore = tuplestore_begin_heap(false, false, work_mem); /* initialize tuplestore for the first call */
if (routerState->tuplestorestate == NULL)
{
routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
tupleStore = routerState->tuplestorestate;
resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore);
if (!resultsOK) if (!resultsOK)
{ {
ereport(ERROR, (errmsg("could not receive query results"))); ereport(ERROR, (errmsg("could not receive query results")));
} }
}
tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor);
@ -348,14 +362,23 @@ ExecuteSingleShardSelect(Task *task, EState *executorState,
executorState->es_processed++; executorState->es_processed++;
ExecClearTuple(tupleTableSlot); ExecClearTuple(tupleTableSlot);
currentTupleCount++;
/*
* If numberTuples is zero fetch all tuples, otherwise stop after
* count tuples.
*/
if (tupleCount > 0 && tupleCount == currentTupleCount)
{
break;
}
} }
/* shutdown the tuple receiver */ /* shutdown the tuple receiver */
(*destination->rShutdown)(destination); (*destination->rShutdown)(destination);
ExecDropSingleTupleTableSlot(tupleTableSlot); ExecDropSingleTupleTableSlot(tupleTableSlot);
tuplestore_end(tupleStore);
} }
@ -555,6 +578,12 @@ void
RouterExecutorEnd(QueryDesc *queryDesc) RouterExecutorEnd(QueryDesc *queryDesc)
{ {
EState *estate = queryDesc->estate; EState *estate = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
if (routerState->tuplestorestate)
{
tuplestore_end(routerState->tuplestorestate);
}
Assert(estate != NULL); Assert(estate != NULL);
Assert(estate->es_finished); Assert(estate->es_finished);

View File

@ -30,6 +30,8 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
static bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
/* /*
* JobExecutorType selects the executor type for the given multiPlan using the task * JobExecutorType selects the executor type for the given multiPlan using the task
@ -42,26 +44,21 @@ MultiExecutorType
JobExecutorType(MultiPlan *multiPlan) JobExecutorType(MultiPlan *multiPlan)
{ {
Job *job = multiPlan->workerJob; Job *job = multiPlan->workerJob;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList; List *workerTaskList = job->taskList;
List *workerNodeList = WorkerNodeList(); List *workerNodeList = WorkerNodeList();
int taskCount = list_length(workerTaskList); int taskCount = list_length(workerTaskList);
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
double tasksPerNode = taskCount / ((double) workerNodeCount); double tasksPerNode = taskCount / ((double) workerNodeCount);
int dependedJobCount = list_length(job->dependedJobList); int dependedJobCount = list_length(job->dependedJobList);
MultiExecutorType executorType = TaskExecutorType; MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = RouterExecutablePlan(multiPlan, executorType);
/* check if the first task is a modify task, short-circuit if so */ /* check if can switch to router executor */
if (taskCount > 0) if (routerExecutablePlan)
{
Task *firstTask = (Task *) linitial(workerTaskList);
if (firstTask->taskType == MODIFY_TASK)
{ {
ereport(DEBUG2, (errmsg("Plan is router executable")));
return MULTI_EXECUTOR_ROUTER; return MULTI_EXECUTOR_ROUTER;
} }
}
if (executorType == MULTI_EXECUTOR_REAL_TIME) if (executorType == MULTI_EXECUTOR_REAL_TIME)
{ {
@ -100,7 +97,7 @@ JobExecutorType(MultiPlan *multiPlan)
"\"task-tracker\"."))); "\"task-tracker\".")));
} }
} }
else if (executorType == MULTI_EXECUTOR_TASK_TRACKER) else
{ {
/* if we have more tasks per node than what can be tracked, warn the user */ /* if we have more tasks per node than what can be tracked, warn the user */
if (tasksPerNode >= MaxTrackedTasksPerNode) if (tasksPerNode >= MaxTrackedTasksPerNode)
@ -109,61 +106,80 @@ JobExecutorType(MultiPlan *multiPlan)
"configured max_tracked_tasks_per_node limit"))); "configured max_tracked_tasks_per_node limit")));
} }
} }
else if (executorType == MULTI_EXECUTOR_ROUTER)
return executorType;
}
/*
* RouterExecutablePlan returns whether a multi-plan can be executed using the
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
static bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{ {
Job *job = multiPlan->workerJob;
TaskType taskType = TASK_TYPE_INVALID_FIRST;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
Task *workerTask = NULL; Task *workerTask = NULL;
List *workerDependentTaskList = NIL; List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false; bool masterQueryHasAggregates = false;
/* if we have repartition jobs with router executor, error out */ /* router executor cannot execute queries that hit more than one shard */
if (dependedJobCount > 0)
{
ereport(ERROR, (errmsg("cannot use router executor with repartition jobs"),
errhint("Set citus.task_executor_type to "
"\"task-tracker\".")));
}
/* if the query hits more than one shards, error out*/
if (taskCount != 1) if (taskCount != 1)
{ {
ereport(ERROR, (errmsg("cannot use router executor with queries that " return false;
"hit multiple shards"),
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
} }
/* if the query has dependent data fetch tasks */ /* check if the first task is a modify or a router task, short-circuit if so */
workerTask = list_nth(workerTaskList, 0); workerTask = (Task *) linitial(workerTaskList);
taskType = workerTask->taskType;
if (taskType == MODIFY_TASK || taskType == ROUTER_TASK)
{
return true;
}
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/* router executor cannot execute queries with dependent data fetch tasks */
workerDependentTaskList = workerTask->dependedTaskList; workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0) if (list_length(workerDependentTaskList) > 0)
{ {
ereport(ERROR, (errmsg("cannot use router executor with JOINs"), return false;
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
} }
/* ORDER BY is always applied on the master table with the current planner */ /* router executor cannot execute queries with order by */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0) if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{ {
ereport(ERROR, (errmsg("cannot use router executor with ORDER BY clauses"), return false;
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
} }
/* /*
* Note that worker query having an aggregate means that the master query should have either * Router executor cannot execute queries with aggregates.
* an aggregate or a function expression which has to be executed for the correct results. * Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/ */
masterQueryHasAggregates = job->jobQuery->hasAggs; masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates) if (masterQueryHasAggregates)
{ {
ereport(ERROR, (errmsg("cannot use router executor with aggregates"), return false;
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
}
} }
return executorType; return true;
} }

View File

@ -68,6 +68,26 @@ LoadShardIntervalList(Oid relationId)
} }
/*
* ShardIntervalCount returns number of shard intervals for a given distributed table.
* The function returns 0 if table is not distributed, or no shards can be found for
* the given relation id.
*/
int
ShardIntervalCount(Oid relationId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
int shardIntervalCount = 0;
if (cacheEntry->isDistributedTable)
{
shardIntervalCount = cacheEntry->shardIntervalArrayLength;
}
return shardIntervalCount;
}
/* /*
* LoadShardList reads list of shards for given relationId from pg_dist_shard, * LoadShardList reads list of shards for given relationId from pg_dist_shard,
* and returns the list of found shardIds. * and returns the list of found shardIds.

View File

@ -130,7 +130,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId,
int16 strategyNumber); int16 strategyNumber);
static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber);
static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn); static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn);
static bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn);
static Var * MakeInt4Column(void); static Var * MakeInt4Column(void);
static Const * MakeInt4Constant(Datum constantValue); static Const * MakeInt4Constant(Datum constantValue);
static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression); static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression);
@ -161,7 +160,6 @@ static uint64 AnchorShardId(List *fragmentList, uint32 anchorRangeTableId);
static List * PruneSqlTaskDependencies(List *sqlTaskList); static List * PruneSqlTaskDependencies(List *sqlTaskList);
static List * AssignTaskList(List *sqlTaskList); static List * AssignTaskList(List *sqlTaskList);
static bool HasMergeTaskDependencies(List *sqlTaskList); static bool HasMergeTaskDependencies(List *sqlTaskList);
static List * AssignAnchorShardTaskList(List *taskList);
static List * GreedyAssignTaskList(List *taskList); static List * GreedyAssignTaskList(List *taskList);
static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList,
List *activeShardPlacementLists); List *activeShardPlacementLists);
@ -2945,7 +2943,7 @@ HashableClauseMutator(Node *originalNode, Var *partitionColumn)
* operator expression which means it is a binary operator expression with * operator expression which means it is a binary operator expression with
* operands of a var and a non-null constant. * operands of a var and a non-null constant.
*/ */
static bool bool
OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn) OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn)
{ {
Node *leftOperand = get_leftop((Expr *) operatorExpression); Node *leftOperand = get_leftop((Expr *) operatorExpression);
@ -4811,7 +4809,7 @@ TaskListUnion(const List *list1, const List *list2)
* configured task assignment policy. The distributed executor later sends these * configured task assignment policy. The distributed executor later sends these
* tasks to their assigned locations for remote execution. * tasks to their assigned locations for remote execution.
*/ */
static List * List *
AssignAnchorShardTaskList(List *taskList) AssignAnchorShardTaskList(List *taskList)
{ {
List *assignedTaskList = NIL; List *assignedTaskList = NIL;

View File

@ -20,7 +20,8 @@
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/modify_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -74,13 +75,12 @@ CreatePhysicalPlan(Query *parse)
{ {
Query *parseCopy = copyObject(parse); Query *parseCopy = copyObject(parse);
MultiPlan *physicalPlan = NULL; MultiPlan *physicalPlan = NULL;
CmdType commandType = parse->commandType; bool routerPlannable = MultiRouterPlannableQuery(parseCopy, TaskExecutorType);
if (routerPlannable)
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{ {
/* modifications go directly from a query to a physical plan */ ereport(DEBUG2, (errmsg("Creating router plan")));
physicalPlan = MultiModifyPlanCreate(parse); physicalPlan = MultiRouterPlanCreate(parseCopy);
CheckNodeIsDumpable((Node *) physicalPlan);
} }
else else
{ {

View File

@ -1,8 +1,9 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* modify_planner.c * multi_router_planner.c
* *
* This file contains functions to plan distributed table modifications. * This file contains functions to plan single shard queries
* including distributed table modifications.
* *
* Copyright (c) 2014-2016, Citus Data, Inc. * Copyright (c) 2014-2016, Citus Data, Inc.
* *
@ -19,14 +20,15 @@
#else #else
#include "access/skey.h" #include "access/skey.h"
#endif #endif
#include "access/xact.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/modify_planner.h" /* IWYU pragma: keep */
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
@ -52,39 +54,60 @@
/* planner functions forward declarations */ /* planner functions forward declarations */
static void ErrorIfQueryNotSupported(Query *queryTree); static void ErrorIfModifyQueryNotSupported(Query *queryTree);
static Task * DistributedModifyTask(Query *query); static Task * RouterModifyTask(Query *query);
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
static OnConflictExpr * RebuildOnConflict(Oid relationId, static OnConflictExpr * RebuildOnConflict(Oid relationId,
OnConflictExpr *originalOnConflict); OnConflictExpr *originalOnConflict);
#endif #endif
static Job * DistributedModifyJob(Query *query, Task *modifyTask); static ShardInterval * TargetShardInterval(Query *query);
static List * QueryRestrictList(Query *query); static List * QueryRestrictList(Query *query);
static ShardInterval * DistributedModifyShardInterval(Query *query);
static Oid ExtractFirstDistributedTableId(Query *query); static Oid ExtractFirstDistributedTableId(Query *query);
static Const * ExtractPartitionValue(Query *query, Var *partitionColumn); static Const * ExtractPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *query);
static Job * RouterQueryJob(Query *query, Task *task);
static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
/* /*
* MultiModifyPlanCreate actually creates the distributed plan for execution * MultiRouterPlanCreate creates a physical plan for given router plannable query.
* of a distribution modification. It expects that the provided MultiTreeRoot * Created plan is either a modify task that changes a single shard, or a router task
* is actually a Query object, which it uses directly to produce a MultiPlan. * that returns query results from a single shard. Supported modify queries
* (insert/update/delete) are router plannble by default. The caller is expected to call
* MultiRouterPlannableQuery to see if the query is router plannable for select queries.
*/ */
MultiPlan * MultiPlan *
MultiModifyPlanCreate(Query *query) MultiRouterPlanCreate(Query *query)
{ {
Task *modifyTask = NULL; Task *task = NULL;
Job *modifyJob = NULL; Job *job = NULL;
MultiPlan *multiPlan = NULL; MultiPlan *multiPlan = NULL;
CmdType commandType = query->commandType;
bool modifyTask = false;
ErrorIfQueryNotSupported(query); if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
modifyTask = true;
}
modifyTask = DistributedModifyTask(query); if (modifyTask)
{
ErrorIfModifyQueryNotSupported(query);
task = RouterModifyTask(query);
}
else
{
Assert(commandType == CMD_SELECT);
modifyJob = DistributedModifyJob(query, modifyTask); task = RouterSelectTask(query);
}
job = RouterQueryJob(query, task);
multiPlan = CitusMakeNode(MultiPlan); multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = modifyJob; multiPlan->workerJob = job;
multiPlan->masterQuery = NULL; multiPlan->masterQuery = NULL;
multiPlan->masterTableName = NULL; multiPlan->masterTableName = NULL;
@ -93,11 +116,11 @@ MultiModifyPlanCreate(Query *query)
/* /*
* ErrorIfQueryNotSupported checks if the query contains unsupported features, * ErrorIfModifyQueryNotSupported checks if the query contains unsupported features,
* and errors out if it does. * and errors out if it does.
*/ */
static void static void
ErrorIfQueryNotSupported(Query *queryTree) ErrorIfModifyQueryNotSupported(Query *queryTree)
{ {
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
uint32 rangeTableId = 1; uint32 rangeTableId = 1;
@ -339,14 +362,14 @@ ErrorIfQueryNotSupported(Query *queryTree)
/* /*
* DistributedModifyTask builds a Task to represent a modification performed by * RouterModifyTask builds a Task to represent a modification performed by
* the provided query against the provided shard interval. This task contains * the provided query against the provided shard interval. This task contains
* shard-extended deparsed SQL to be run during execution. * shard-extended deparsed SQL to be run during execution.
*/ */
static Task * static Task *
DistributedModifyTask(Query *query) RouterModifyTask(Query *query)
{ {
ShardInterval *shardInterval = DistributedModifyShardInterval(query); ShardInterval *shardInterval = TargetShardInterval(query);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
FromExpr *joinTree = NULL; FromExpr *joinTree = NULL;
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
@ -475,40 +498,20 @@ RebuildOnConflict(Oid relationId, OnConflictExpr *originalOnConflict)
/* /*
* DistributedModifyJob creates a Job for the specified query to execute the * TargetShardInterval determines the single shard targeted by a provided command.
* provided modification task. Modification task placements are produced using * If no matching shards exist, or if the modification targets more than one one
* the "first-replica" algorithm, except modifications run against all matching * shard, this function raises an error depending on the command type.
* placements rather than just the first successful one.
*/
Job *
DistributedModifyJob(Query *query, Task *modifyTask)
{
Job *modifyJob = NULL;
List *taskList = FirstReplicaAssignTaskList(list_make1(modifyTask));
modifyJob = CitusMakeNode(Job);
modifyJob->dependedJobList = NIL;
modifyJob->jobId = INVALID_JOB_ID;
modifyJob->subqueryPushdown = false;
modifyJob->jobQuery = query;
modifyJob->taskList = taskList;
return modifyJob;
}
/*
* DistributedModifyShardInterval determines the single shard targeted by a
* provided distributed modification command. If no matching shards exist, or
* if the modification targets more than one one shard, this function raises
* an error.
*/ */
static ShardInterval * static ShardInterval *
DistributedModifyShardInterval(Query *query) TargetShardInterval(Query *query)
{ {
CmdType commandType = query->commandType;
bool selectTask = (commandType == CMD_SELECT);
List *restrictClauseList = NIL; List *restrictClauseList = NIL;
List *prunedShardList = NIL; List *prunedShardList = NIL;
Index tableId = 1; Index tableId = 1;
int prunedShardCount = 0;
Oid distributedTableId = ExtractFirstDistributedTableId(query); Oid distributedTableId = ExtractFirstDistributedTableId(query);
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
@ -520,7 +523,7 @@ DistributedModifyShardInterval(Query *query)
char *relationName = get_rel_name(distributedTableId); char *relationName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for modification"), errmsg("could not find any shards"),
errdetail("No shards exist for distributed table \"%s\".", errdetail("No shards exist for distributed table \"%s\".",
relationName), relationName),
errhint("Run master_create_worker_shards to create shards " errhint("Run master_create_worker_shards to create shards "
@ -530,13 +533,22 @@ DistributedModifyShardInterval(Query *query)
restrictClauseList = QueryRestrictList(query); restrictClauseList = QueryRestrictList(query);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList); shardIntervalList);
prunedShardCount = list_length(prunedShardList);
if (list_length(prunedShardList) != 1) if (prunedShardCount != 1)
{
if (selectTask)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("router executor queries must target exactly one "
"shard")));
}
else
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributed modifications must target exactly one " errmsg("distributed modifications must target exactly one "
"shard"))); "shard")));
} }
}
return (ShardInterval *) linitial(prunedShardList); return (ShardInterval *) linitial(prunedShardList);
} }
@ -574,7 +586,8 @@ QueryRestrictList(Query *query)
queryRestrictList = list_make1(equalityExpr); queryRestrictList = list_make1(equalityExpr);
} }
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) else if (commandType == CMD_UPDATE || commandType == CMD_DELETE ||
commandType == CMD_SELECT)
{ {
queryRestrictList = WhereClauseList(query->jointree); queryRestrictList = WhereClauseList(query->jointree);
} }
@ -640,3 +653,298 @@ ExtractPartitionValue(Query *query, Var *partitionColumn)
return partitionValue; return partitionValue;
} }
/* RouterSelectTask builds a Task to represent a single shard select query */
static Task *
RouterSelectTask(Query *query)
{
Task *task = NULL;
ShardInterval *shardInterval = TargetShardInterval(query);
StringInfo queryString = makeStringInfo();
uint64 shardId = INVALID_SHARD_ID;
bool upsertQuery = false;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
FromExpr *joinTree = NULL;
Assert(shardInterval != NULL);
Assert(commandType == CMD_SELECT);
shardId = shardInterval->shardId;
/*
* Convert the qualifiers to an explicitly and'd clause, which is needed
* before we deparse the query.
*/
joinTree = query->jointree;
if ((joinTree != NULL) && (joinTree->quals != NULL))
{
Node *whereClause = (Node *) make_ands_explicit((List *) joinTree->quals);
joinTree->quals = whereClause;
}
deparse_shard_query(query, shardInterval->relationId, shardId, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->taskType = ROUTER_TASK;
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
return task;
}
/*
* RouterQueryJob creates a Job for the specified query to execute the
* provided single shard select task.
*/
static Job *
RouterQueryJob(Query *query, Task *task)
{
Job *job = NULL;
List *taskList = NIL;
TaskType taskType = task->taskType;
/*
* We send modify task to the first replica, otherwise we choose the target shard
* according to task assignment policy.
*/
if (taskType == MODIFY_TASK)
{
taskList = FirstReplicaAssignTaskList(list_make1(task));
}
else
{
taskList = AssignAnchorShardTaskList(list_make1(task));
}
job = CitusMakeNode(Job);
job->dependedJobList = NIL;
job->jobId = INVALID_JOB_ID;
job->subqueryPushdown = false;
job->jobQuery = query;
job->taskList = taskList;
return job;
}
/*
* MultiRouterPlannableQuery returns true if given query can be router plannable.
* The query is router plannable if it is a select query issued on a hash
* partitioned distributed table, and it has a exact match comparison on the
* partition column. This feature is enabled if task executor is set to real-time
*/
bool
MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
{
uint32 rangeTableId = 1;
List *rangeTableList = NIL;
RangeTblEntry *rangeTableEntry = NULL;
Oid distributedTableId = InvalidOid;
Var *partitionColumn = NULL;
char partitionMethod = '\0';
Node *quals = NULL;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
FromExpr *joinTree = query->jointree;
List *varClauseList = NIL;
ListCell *varClauseCell = NULL;
bool partitionColumnMatchExpression = false;
int partitionColumnReferenceCount = 0;
int shardCount = 0;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
return true;
}
if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME)
{
return false;
}
Assert(commandType == CMD_SELECT);
/*
* Reject subqueries which are in SELECT or WHERE clause.
* Queries which are recursive, with CommonTableExpr, with locking (hasForUpdate),
* or with window functions are also rejected here.
* Queries which have subqueries, or tablesamples in FROM clauses are rejected later
* during RangeTblEntry checks.
*/
if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate ||
query->hasRecursive)
{
return false;
}
#if (PG_VERSION_NUM >= 90500)
if (query->groupingSets)
{
return false;
}
#endif
/* only hash partitioned tables are supported */
distributedTableId = ExtractFirstDistributedTableId(query);
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
partitionMethod = PartitionMethod(distributedTableId);
if (partitionMethod != DISTRIBUTE_BY_HASH)
{
return false;
}
/* extract range table entries */
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
/* query can have only one range table of type RTE_RELATION */
if (list_length(rangeTableList) != 1)
{
return false;
}
rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
if (rangeTableEntry->rtekind != RTE_RELATION)
{
return false;
}
#if (PG_VERSION_NUM >= 90500)
if (rangeTableEntry->tablesample)
{
return false;
}
#endif
if (joinTree == NULL)
{
return false;
}
quals = joinTree->quals;
if (quals == NULL)
{
return false;
}
/* convert list of expressions into expression tree */
if (quals != NULL && IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}
/*
* Partition column must be used in a simple equality match check and it must be
* place at top level conjustion operator.
*/
partitionColumnMatchExpression =
ColumnMatchExpressionAtTopLevelConjunction(quals, partitionColumn);
if (!partitionColumnMatchExpression)
{
return false;
}
/* make sure partition column is used only once in the query */
varClauseList = pull_var_clause_default(quals);
foreach(varClauseCell, varClauseList)
{
Var *column = (Var *) lfirst(varClauseCell);
if (equal(column, partitionColumn))
{
partitionColumnReferenceCount++;
}
}
if (partitionColumnReferenceCount != 1)
{
return false;
}
/*
* We need to make sure there is at least one shard for this hash partitioned
* query to be router plannable. We can not prepare a router plan if there
* are no shards.
*/
shardCount = ShardIntervalCount(distributedTableId);
if (shardCount == 0)
{
return false;
}
return true;
}
/*
* ColumnMatchExpressionAtTopLevelConjunction returns true if the query contains an exact
* match (equal) expression on the provided column. The function returns true only
* if the match expression has an AND relation with the rest of the expression tree.
*/
static bool
ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
{
if (node == NULL)
{
return false;
}
if (IsA(node, OpExpr))
{
OpExpr *opExpr = (OpExpr *) node;
bool simpleExpression = SimpleOpExpression((Expr *) opExpr);
bool columnInExpr = false;
char *operatorName = NULL;
int operatorNameComparison = 0;
bool usingEqualityOperator = false;
if (!simpleExpression)
{
return false;
}
columnInExpr = OpExpressionContainsColumn(opExpr, column);
if (!columnInExpr)
{
return false;
}
operatorName = get_opname(opExpr->opno);
operatorNameComparison = strncmp(operatorName, EQUAL_OPERATOR_STRING,
NAMEDATALEN);
usingEqualityOperator = (operatorNameComparison == 0);
return usingEqualityOperator;
}
else if (IsA(node, BoolExpr))
{
BoolExpr *boolExpr = (BoolExpr *) node;
List *argumentList = boolExpr->args;
ListCell *argumentCell = NULL;
if (boolExpr->boolop != AND_EXPR)
{
return false;
}
foreach(argumentCell, argumentList)
{
Node *argumentNode = (Node *) lfirst(argumentCell);
bool columnMatch =
ColumnMatchExpressionAtTopLevelConjunction(argumentNode, column);
if (columnMatch)
{
return true;
}
}
}
return false;
}

View File

@ -19,7 +19,6 @@
#include "commands/explain.h" #include "commands/explain.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/modify_planner.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_explain.h" #include "distributed/multi_explain.h"
@ -27,6 +26,7 @@
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_transaction.h" #include "distributed/multi_transaction.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
@ -59,7 +59,6 @@ static const struct config_enum_entry task_assignment_policy_options[] = {
static const struct config_enum_entry task_executor_type_options[] = { static const struct config_enum_entry task_executor_type_options[] = {
{ "real-time", MULTI_EXECUTOR_REAL_TIME, false }, { "real-time", MULTI_EXECUTOR_REAL_TIME, false },
{ "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false }, { "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false },
{ "router", MULTI_EXECUTOR_ROUTER, false },
{ NULL, 0, false } { NULL, 0, false }
}; };
@ -483,14 +482,13 @@ RegisterCitusConfigVariables(void)
DefineCustomEnumVariable( DefineCustomEnumVariable(
"citus.task_executor_type", "citus.task_executor_type",
gettext_noop("Sets the executor type to be used for distributed queries."), gettext_noop("Sets the executor type to be used for distributed queries."),
gettext_noop("The master node chooses between three different executor types " gettext_noop("The master node chooses between two different executor types "
"when executing a distributed query. The router executor is " "when executing a distributed query.The real-time executor is "
"optimal for simple key-value lookups on a single shard. The " "optimal for simple key-value lookup queries and queries that "
"real-time executor is optimal for queries that involve " "involve aggregations and/or co-located joins on multiple shards. "
"aggregations and/or co-located joins on multiple shards. The " "The task-tracker executor is optimal for long-running, complex "
"task-tracker executor is optimal for long-running, complex " "queries that touch thousands of shards and/or that involve table "
"queries that touch thousands of shards and/or that involve " "repartitioning."),
"table repartitioning."),
&TaskExecutorType, &TaskExecutorType,
MULTI_EXECUTOR_REAL_TIME, MULTI_EXECUTOR_REAL_TIME,
task_executor_type_options, task_executor_type_options,

View File

@ -56,6 +56,7 @@ typedef struct ShardPlacement
/* Function declarations to read shard and shard placement data */ /* Function declarations to read shard and shard placement data */
extern List * LoadShardIntervalList(Oid relationId); extern List * LoadShardIntervalList(Oid relationId);
extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId); extern List * LoadShardList(Oid relationId);
extern char * LoadShardAlias(Oid relationId, uint64 shardId); extern char * LoadShardAlias(Oid relationId, uint64 shardId);
extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval);

View File

@ -76,7 +76,8 @@ typedef enum
SHARD_FETCH_TASK = 4, SHARD_FETCH_TASK = 4,
MAP_OUTPUT_FETCH_TASK = 5, MAP_OUTPUT_FETCH_TASK = 5,
MERGE_FETCH_TASK = 6, MERGE_FETCH_TASK = 6,
MODIFY_TASK = 7 MODIFY_TASK = 7,
ROUTER_TASK = 8
} TaskType; } TaskType;
@ -231,6 +232,7 @@ extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
extern Node * BuildBaseConstraint(Var *column); extern Node * BuildBaseConstraint(Var *column);
extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval); extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
extern bool SimpleOpExpression(Expr *clause); extern bool SimpleOpExpression(Expr *clause);
extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement); extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
@ -246,6 +248,7 @@ extern List * TaskListConcatUnique(List *list1, List *list2);
extern bool TaskListMember(const List *taskList, const Task *task); extern bool TaskListMember(const List *taskList, const Task *task);
extern List * TaskListDifference(const List *list1, const List *list2); extern List * TaskListDifference(const List *list1, const List *list2);
extern List * TaskListUnion(const List *list1, const List *list2); extern List * TaskListUnion(const List *list1, const List *list2);
extern List * AssignAnchorShardTaskList(List *taskList);
extern List * FirstReplicaAssignTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList);

View File

@ -1,21 +1,22 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* modify_planner.h * multi_router_planner.h
* *
* Declarations for public functions and types related to modify planning. * Declarations for public functions and types related to router planning.
* *
* Copyright (c) 2014-2016, Citus Data, Inc. * Copyright (c) 2014-2016, Citus Data, Inc.
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef MODIFY_PLANNER_H #ifndef MULTI_ROUTER_PLANNER_H
#define MODIFY_PLANNER_H #define MULTI_ROUTER_PLANNER_H
#include "c.h" #include "c.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -29,7 +30,7 @@
#define UPSERT_ALIAS "citus_table_alias" #define UPSERT_ALIAS "citus_table_alias"
#endif #endif
extern MultiPlan * MultiRouterPlanCreate(Query *query);
extern bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType);
extern MultiPlan * MultiModifyPlanCreate(Query *query); #endif /* MULTI_ROUTER_PLANNER_H */
#endif /* MODIFY_PLANNER_H */

View File

@ -91,9 +91,10 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'router'; SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: predicate pruning for shardId 103070 DEBUG: predicate pruning for shardId 103070
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
-------+--------+-----------+--------------------------+------+------------- -------+--------+-----------+--------------------------+------+-------------
32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69
@ -101,6 +102,7 @@ DEBUG: predicate pruning for shardId 103070
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
DEBUG: predicate pruning for shardId 103072 DEBUG: predicate pruning for shardId 103072
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
--------+--------+-----------+--------------------------+------+------------- --------+--------+-----------+--------------------------+------+-------------
414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69

View File

@ -19,12 +19,12 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010;
(1 row) (1 row)
-- Check that partition and join pruning works when min/max values exist -- Check that partition and join pruning works when min/max values exist
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding l_orderkey = 1 to make the query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102014
DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102013
DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102011
DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102010
DEBUG: predicate pruning for shardId 102009
l_orderkey | l_linenumber | l_shipdate l_orderkey | l_linenumber | l_shipdate
------------+--------------+------------ ------------+--------------+------------
9030 | 1 | 09-02-1998 9030 | 1 | 09-02-1998
@ -33,7 +33,13 @@ DEBUG: predicate pruning for shardId 102009
9030 | 4 | 07-20-1998 9030 | 4 | 07-20-1998
9030 | 5 | 09-29-1998 9030 | 5 | 09-29-1998
9030 | 6 | 09-03-1998 9030 | 6 | 09-03-1998
(6 rows) 1 | 1 | 03-13-1996
1 | 2 | 04-12-1996
1 | 3 | 01-29-1996
1 | 4 | 04-21-1996
1 | 5 | 03-30-1996
1 | 6 | 01-30-1996
(12 rows)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey; WHERE l_orderkey = o_orderkey;

View File

@ -4,12 +4,12 @@
-- Tests to verify that we correctly prune unreferenced shards. For this, we -- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client. -- need to increase the logging verbosity of messages displayed on the client.
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102014
DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102013
DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102011
DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102010
DEBUG: predicate pruning for shardId 102009
l_orderkey | l_linenumber | l_shipdate l_orderkey | l_linenumber | l_shipdate
------------+--------------+------------ ------------+--------------+------------
9030 | 1 | 09-02-1998 9030 | 1 | 09-02-1998
@ -18,7 +18,13 @@ DEBUG: predicate pruning for shardId 102009
9030 | 4 | 07-20-1998 9030 | 4 | 07-20-1998
9030 | 5 | 09-29-1998 9030 | 5 | 09-29-1998
9030 | 6 | 09-03-1998 9030 | 6 | 09-03-1998
(6 rows) 1 | 1 | 03-13-1996
1 | 2 | 04-12-1996
1 | 3 | 01-29-1996
1 | 4 | 04-21-1996
1 | 5 | 03-30-1996
1 | 6 | 01-30-1996
(12 rows)
-- We use the l_linenumber field for the following aggregations. We need to use -- We use the l_linenumber field for the following aggregations. We need to use
-- an integer type, as aggregations on numerics or big integers return numerics -- an integer type, as aggregations on numerics or big integers return numerics

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,7 @@
CREATE TABLE articles ( CREATE TABLE articles (
id bigint NOT NULL, id bigint NOT NULL,
author_id bigint NOT NULL, author_id bigint NOT NULL,
title text NOT NULL, title varchar(20) NOT NULL,
word_count integer NOT NULL CHECK (word_count > 0) word_count integer NOT NULL CHECK (word_count > 0)
); );
-- this table is used in a CTE test -- this table is used in a CTE test
@ -315,14 +315,16 @@ ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported DETAIL: Having qual is currently unsupported
-- now, test the cases where Citus do or do not need to create -- now, test the cases where Citus do or do not need to create
-- the master queries -- the master queries
SET citus.task_executor_type TO 'router';
SET citus.large_table_shard_count TO 2; SET citus.large_table_shard_count TO 2;
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
-- start with the simple lookup query -- start with the simple lookup query
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -337,6 +339,7 @@ SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -350,13 +353,22 @@ DEBUG: predicate pruning for shardId 103094
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 18; WHERE author_id = 1 OR author_id = 18;
ERROR: cannot use router executor with queries that hit multiple shards id | author_id | title | word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". ----+-----------+--------------+------------
1 | 1 | arsenous | 9572
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
41 | 1 | aznavour | 11814
(5 rows)
-- rename the output columns on a no master query case -- rename the output columns on a no master query case
SELECT id as article_id, word_count * id as random_value SELECT id as article_id, word_count * id as random_value
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
article_id | random_value article_id | random_value
------------+-------------- ------------+--------------
1 | 9572 1 | 9572
@ -375,6 +387,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
10 | 17277 10 | 17277
@ -382,22 +395,29 @@ DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
10 | 6363 10 | 6363
(3 rows) (3 rows)
-- now show that JOINs don't work with multiple tables -- now show that JOINs with multiple tables are not router executable
-- they are executed by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles_single_shard b FROM articles a, articles_single_shard b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with JOINs first_author | second_word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". --------------+-------------------
10 | 19519
10 | 19519
10 | 19519
(3 rows)
-- do not create the master query for LIMIT on a single shard SELECT -- do not create the master query for LIMIT on a single shard SELECT
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1 WHERE author_id = 1
LIMIT 2; LIMIT 2;
DEBUG: push down of limit count: 2 DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -411,7 +431,9 @@ SELECT id
FROM articles FROM articles
WHERE author_id = 1 WHERE author_id = 1
GROUP BY id; GROUP BY id;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id id
---- ----
41 41
@ -423,49 +445,34 @@ DEBUG: predicate pruning for shardId 103094
-- copying from a single shard table does not require the master query -- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout; COPY articles_single_shard TO stdout;
DEBUG: Plan is router executable
50 10 anjanette 19519 50 10 anjanette 19519
-- error out for queries with aggregates -- error out for queries with aggregates
SELECT avg(word_count) SELECT avg(word_count)
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103093
ERROR: cannot use router executor with aggregates DEBUG: Plan is router executable
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". avg
--------------------
12356.400000000000
(1 row)
-- max, min, sum, count is somehow implemented -- max, min, sum, count is somehow implemented
-- differently in distributed planning but, still error out -- differently in distributed planning
SELECT max(word_count) as max, min(word_count) as min, SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt sum(word_count) as sum, count(word_count) as cnt
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103093
ERROR: cannot use router executor with aggregates DEBUG: Plan is router executable
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". max | min | sum | cnt
-- error out for queries with ORDER BY -------+------+-------+-----
SELECT * 18185 | 2728 | 61782 | 5
FROM articles (1 row)
WHERE author_id = 1
ORDER BY word_count;
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with ORDER BY clauses
HINT: Set citus.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with ORDER BY and LIMIT
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count
LIMIT 2;
DEBUG: push down of limit count: 2
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with ORDER BY clauses
HINT: Set citus.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with aggregates and GROUP BY
SELECT max(word_count)
FROM articles
WHERE author_id = 1
GROUP BY author_id;
DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with aggregates
HINT: Set citus.task_executor_type to "real-time" or "task-tracker".
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles a, articles b FROM articles a, articles b
@ -499,13 +506,6 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use router executor with repartition jobs ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.task_executor_type to "task-tracker".
-- error out for queries which hit more than 1 shards
SELECT *
FROM articles
WHERE author_id >= 1 AND author_id <= 3;
ERROR: cannot use router executor with queries that hit multiple shards
HINT: Set citus.task_executor_type to "real-time" or "task-tracker".
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
SET citus.task_executor_type TO 'real-time';

View File

@ -126,6 +126,11 @@ test: multi_repartitioned_subquery_udf
# --------- # ---------
test: multi_copy test: multi_copy
# ---------
# multi_router_planner creates hash partitioned tables.
# ---------
test: multi_router_planner
# ---------- # ----------
# multi_large_shardid stages more shards into lineitem # multi_large_shardid stages more shards into lineitem
# ---------- # ----------

View File

@ -69,7 +69,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'router'; SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;

View File

@ -15,8 +15,8 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102009;
SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010;
-- Check that partition and join pruning works when min/max values exist -- Check that partition and join pruning works when min/max values exist
-- Adding l_orderkey = 1 to make the query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey; WHERE l_orderkey = o_orderkey;

View File

@ -7,7 +7,8 @@
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
-- We use the l_linenumber field for the following aggregations. We need to use -- We use the l_linenumber field for the following aggregations. We need to use
-- an integer type, as aggregations on numerics or big integers return numerics -- an integer type, as aggregations on numerics or big integers return numerics

View File

@ -0,0 +1,532 @@
-- ===================================================================
-- test router planner functionality for single shard select queries
-- ===================================================================
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 103300;
CREATE TABLE articles_hash (
id bigint NOT NULL,
author_id bigint NOT NULL,
title varchar(20) NOT NULL,
word_count integer
);
-- Check for the existence of line 'DEBUG: Creating router plan'
-- to determine if router planner is used.
-- this table is used in a CTE test
CREATE TABLE authors_hash ( name text, id bigint );
-- this table is used in router executor tests
CREATE TABLE articles_single_shard_hash (LIKE articles_hash);
SELECT master_create_distributed_table('articles_hash', 'author_id', 'hash');
SELECT master_create_distributed_table('articles_single_shard_hash', 'author_id', 'hash');
-- test when a table is distributed but no shards created yet
SELECT count(*) from articles_hash;
SELECT master_create_worker_shards('articles_hash', 2, 1);
SELECT master_create_worker_shards('articles_single_shard_hash', 1, 1);
-- create a bunch of test data
INSERT INTO articles_hash VALUES ( 1, 1, 'arsenous', 9572);
INSERT INTO articles_hash VALUES ( 2, 2, 'abducing', 13642);
INSERT INTO articles_hash VALUES ( 3, 3, 'asternal', 10480);
INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551);
INSERT INTO articles_hash VALUES ( 5, 5, 'aruru', 11389);
INSERT INTO articles_hash VALUES ( 6, 6, 'atlases', 15459);
INSERT INTO articles_hash VALUES ( 7, 7, 'aseptic', 12298);
INSERT INTO articles_hash VALUES ( 8, 8, 'agatized', 16368);
INSERT INTO articles_hash VALUES ( 9, 9, 'alligate', 438);
INSERT INTO articles_hash VALUES (10, 10, 'aggrandize', 17277);
INSERT INTO articles_hash VALUES (11, 1, 'alamo', 1347);
INSERT INTO articles_hash VALUES (12, 2, 'archiblast', 18185);
INSERT INTO articles_hash VALUES (13, 3, 'aseyev', 2255);
INSERT INTO articles_hash VALUES (14, 4, 'andesite', 19094);
INSERT INTO articles_hash VALUES (15, 5, 'adversa', 3164);
INSERT INTO articles_hash VALUES (16, 6, 'allonym', 2);
INSERT INTO articles_hash VALUES (17, 7, 'auriga', 4073);
INSERT INTO articles_hash VALUES (18, 8, 'assembly', 911);
INSERT INTO articles_hash VALUES (19, 9, 'aubergiste', 4981);
INSERT INTO articles_hash VALUES (20, 10, 'absentness', 1820);
INSERT INTO articles_hash VALUES (21, 1, 'arcading', 5890);
INSERT INTO articles_hash VALUES (22, 2, 'antipope', 2728);
INSERT INTO articles_hash VALUES (23, 3, 'abhorring', 6799);
INSERT INTO articles_hash VALUES (24, 4, 'audacious', 3637);
INSERT INTO articles_hash VALUES (25, 5, 'antehall', 7707);
INSERT INTO articles_hash VALUES (26, 6, 'abington', 4545);
INSERT INTO articles_hash VALUES (27, 7, 'arsenous', 8616);
INSERT INTO articles_hash VALUES (28, 8, 'aerophyte', 5454);
INSERT INTO articles_hash VALUES (29, 9, 'amateur', 9524);
INSERT INTO articles_hash VALUES (30, 10, 'andelee', 6363);
INSERT INTO articles_hash VALUES (31, 1, 'athwartships', 7271);
INSERT INTO articles_hash VALUES (32, 2, 'amazon', 11342);
INSERT INTO articles_hash VALUES (33, 3, 'autochrome', 8180);
INSERT INTO articles_hash VALUES (34, 4, 'amnestied', 12250);
INSERT INTO articles_hash VALUES (35, 5, 'aminate', 9089);
INSERT INTO articles_hash VALUES (36, 6, 'ablation', 13159);
INSERT INTO articles_hash VALUES (37, 7, 'archduchies', 9997);
INSERT INTO articles_hash VALUES (38, 8, 'anatine', 14067);
INSERT INTO articles_hash VALUES (39, 9, 'anchises', 10906);
INSERT INTO articles_hash VALUES (40, 10, 'attemper', 14976);
INSERT INTO articles_hash VALUES (41, 1, 'aznavour', 11814);
INSERT INTO articles_hash VALUES (42, 2, 'ausable', 15885);
INSERT INTO articles_hash VALUES (43, 3, 'affixal', 12723);
INSERT INTO articles_hash VALUES (44, 4, 'anteport', 16793);
INSERT INTO articles_hash VALUES (45, 5, 'afrasia', 864);
INSERT INTO articles_hash VALUES (46, 6, 'atlanta', 17702);
INSERT INTO articles_hash VALUES (47, 7, 'abeyance', 1772);
INSERT INTO articles_hash VALUES (48, 8, 'alkylic', 18610);
INSERT INTO articles_hash VALUES (49, 9, 'anyone', 2681);
INSERT INTO articles_hash VALUES (50, 10, 'anjanette', 19519);
SET citus.task_executor_type TO 'real-time';
SET citus.large_table_shard_count TO 2;
SET client_min_messages TO 'DEBUG2';
-- insert a single row for the test
INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519);
-- first, test zero-shard SELECT, which should return an empty row
SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2;
-- single-shard tests
-- test simple select for a single row
SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50;
-- get all titles by a single author
SELECT title FROM articles_hash WHERE author_id = 10;
-- try ordering them by word count
SELECT title, word_count FROM articles_hash
WHERE author_id = 10
ORDER BY word_count DESC NULLS LAST;
-- look at last two articles by an author
SELECT title, id FROM articles_hash
WHERE author_id = 5
ORDER BY id
LIMIT 2;
-- find all articles by two authors in same shard
-- but plan is not router executable due to order by
SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8
ORDER BY author_id ASC, id;
-- same query is router executable with no order by
SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8;
-- add in some grouping expressions, still on same shard
-- having queries unsupported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10
GROUP BY author_id
HAVING sum(word_count) > 1000
ORDER BY sum(word_count) DESC;
-- however having clause is supported if it goes to a single shard
SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
WHERE author_id = 1
GROUP BY author_id
HAVING sum(word_count) > 1000
ORDER BY sum(word_count) DESC;
-- UNION/INTERSECT queries are unsupported
-- this is rejected by router planner and handled by multi_logical_planner
SELECT * FROM articles_hash WHERE author_id = 10 UNION
SELECT * FROM articles_hash WHERE author_id = 1;
-- query is a single shard query but can't do shard pruning,
-- not router-plannable due to <= and IN
SELECT * FROM articles_hash WHERE author_id <= 1;
SELECT * FROM articles_hash WHERE author_id IN (1, 3);
-- queries using CTEs are unsupported
WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1)
SELECT title FROM articles_hash WHERE author_id = 1;
-- queries which involve functions in FROM clause are unsupported.
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1;
-- subqueries are not supported in WHERE clause in Citus
SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a');
-- subqueries are supported in FROM clause but they are not router plannable
SELECT articles_hash.id,test.word_count
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id
ORDER BY articles_hash.id;
-- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a;
-- simple lookup query
SELECT *
FROM articles_hash
WHERE author_id = 1;
-- below query hits a single shard, but it is not router plannable
-- still router executable
SELECT *
FROM articles_hash
WHERE author_id = 1 OR author_id = 17;
-- below query hits two shards, not router plannable + not router executable
-- handled by real-time executor
SELECT *
FROM articles_hash
WHERE author_id = 1 OR author_id = 18;
-- rename the output columns
SELECT id as article_id, word_count * id as random_value
FROM articles_hash
WHERE author_id = 1;
-- we can push down co-located joins to a single worker
-- this is not router plannable but router executable
-- handled by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3;
-- following join is neither router plannable, nor router executable
SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_single_shard_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3;
-- single shard select with limit is router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1
LIMIT 3;
-- single shard select with limit + offset is router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1
LIMIT 2
OFFSET 1;
-- single shard select with limit + offset + order by is router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1
ORDER BY id desc
LIMIT 2
OFFSET 1;
-- single shard select with group by on non-partition column is router plannable
SELECT id
FROM articles_hash
WHERE author_id = 1
GROUP BY id;
-- single shard select with distinct is router plannable
SELECT distinct id
FROM articles_hash
WHERE author_id = 1;
-- single shard aggregate is router plannable
SELECT avg(word_count)
FROM articles_hash
WHERE author_id = 2;
-- max, min, sum, count are router plannable on single shard
SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt
FROM articles_hash
WHERE author_id = 2;
-- queries with aggregates and group by supported on single shard
SELECT max(word_count)
FROM articles_hash
WHERE author_id = 1
GROUP BY author_id;
SET client_min_messages to 'NOTICE';
-- error out for queries with repartition jobs
SELECT *
FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1;
-- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor
SELECT *
FROM articles_hash
WHERE author_id >= 1 AND author_id <= 3;
SET citus.task_executor_type TO 'real-time';
-- Test various filtering options for router plannable check
SET client_min_messages to 'DEBUG2';
-- this is definitely single shard
-- but not router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1 and author_id >= 1;
-- not router plannable due to or
SELECT *
FROM articles_hash
WHERE author_id = 1 or id = 1;
-- router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1 and (id = 1 or id = 41);
-- router plannable
SELECT *
FROM articles_hash
WHERE author_id = 1 and (id = random()::int * 0);
-- not router plannable due to function call on the right side
SELECT *
FROM articles_hash
WHERE author_id = (random()::int * 0 + 1);
-- not router plannable due to or
SELECT *
FROM articles_hash
WHERE author_id = 1 or id = 1;
-- router plannable due to abs(-1) getting converted to 1 by postgresql
SELECT *
FROM articles_hash
WHERE author_id = abs(-1);
-- not router plannable due to abs() function
SELECT *
FROM articles_hash
WHERE 1 = abs(author_id);
-- not router plannable due to abs() function
SELECT *
FROM articles_hash
WHERE author_id = abs(author_id - 2);
-- router plannable, function on different field
SELECT *
FROM articles_hash
WHERE author_id = 1 and (id = abs(id - 2));
-- not router plannable due to is true
SELECT *
FROM articles_hash
WHERE (author_id = 1) is true;
-- router plannable, (boolean expression) = true is collapsed to (boolean expression)
SELECT *
FROM articles_hash
WHERE (author_id = 1) = true;
-- router plannable, between operator is on another column
SELECT *
FROM articles_hash
WHERE (author_id = 1) and id between 0 and 20;
-- router plannable, partition column expression is and'ed to rest
SELECT *
FROM articles_hash
WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s';
-- router plannable, order is changed
SELECT *
FROM articles_hash
WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1);
-- router plannable
SELECT *
FROM articles_hash
WHERE (title like '%s' or title like 'a%') and (author_id = 1);
-- router plannable
SELECT *
FROM articles_hash
WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000);
-- window functions are supported if query is router plannable
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5;
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5
ORDER BY word_count DESC;
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash
WHERE author_id = 1;
SELECT id, word_count, AVG(word_count) over (order by word_count)
FROM articles_hash
WHERE author_id = 1;
SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count)
FROM articles_hash
WHERE author_id = 1;
-- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash
WHERE author_id = 1 or author_id = 2;
-- but they are not supported for not router plannable queries
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5 or author_id = 1;
-- complex query hitting a single shard
SELECT
count(DISTINCT CASE
WHEN
word_count > 100
THEN
id
ELSE
NULL
END) as c
FROM
articles_hash
WHERE
author_id = 5;
-- same query is not router plannable if hits multiple shards
SELECT
count(DISTINCT CASE
WHEN
word_count > 100
THEN
id
ELSE
NULL
END) as c
FROM
articles_hash
GROUP BY
author_id;
-- queries inside transactions can be router plannable
BEGIN;
SELECT *
FROM articles_hash
WHERE author_id = 1
ORDER BY id;
END;
-- cursor queries are router plannable
BEGIN;
DECLARE test_cursor CURSOR FOR
SELECT *
FROM articles_hash
WHERE author_id = 1
ORDER BY id;
FETCH test_cursor;
FETCH test_cursor;
END;
-- queries inside copy can be router plannable
COPY (
SELECT *
FROM articles_hash
WHERE author_id = 1
ORDER BY id) TO STDOUT;
-- table creation queries inside can be router plannable
CREATE TEMP TABLE temp_articles_hash as
SELECT *
FROM articles_hash
WHERE author_id = 1
ORDER BY id;
-- router plannable queries may include filter for aggragates
SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash
WHERE author_id = 1;
-- non-router plannable queries do not support filters
SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash
WHERE author_id = 1 or author_id = 2;
-- prepare queries can be router plannable
PREPARE author_1_articles as
SELECT *
FROM articles_hash
WHERE author_id = 1;
EXECUTE author_1_articles;
-- parametric prepare queries can be router plannable
PREPARE author_articles(int) as
SELECT *
FROM articles_hash
WHERE author_id = $1;
EXECUTE author_articles(1);
-- queries inside plpgsql functions could be router plannable
CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$
DECLARE
max_id integer;
BEGIN
SELECT MAX(id) FROM articles_hash ah
WHERE author_id = 1
into max_id;
return max_id;
END;
$$ LANGUAGE plpgsql;
SELECT author_articles_max_id();
-- plpgsql function that return query results are not router plannable
CREATE OR REPLACE FUNCTION author_articles_id_word_count() RETURNS TABLE(id bigint, word_count int) AS $$
DECLARE
BEGIN
RETURN QUERY
SELECT ah.id, ah.word_count
FROM articles_hash ah
WHERE author_id = 1;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM author_articles_id_word_count();
-- router planner/executor is disabled for task-tracker executor
-- following query is router plannable, but router planner is disabled
SET citus.task_executor_type to 'task-tracker';
SELECT id
FROM articles_hash
WHERE author_id = 1;
-- insert query is router plannable even under task-tracker
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814);
-- verify insert is successfull (not router plannable and executable)
SELECT id
FROM articles_hash
WHERE author_id = 1;
SET client_min_messages to 'NOTICE';
DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count();
DROP TABLE articles_hash;
DROP TABLE articles_single_shard_hash;
DROP TABLE authors_hash;

View File

@ -5,7 +5,7 @@
CREATE TABLE articles ( CREATE TABLE articles (
id bigint NOT NULL, id bigint NOT NULL,
author_id bigint NOT NULL, author_id bigint NOT NULL,
title text NOT NULL, title varchar(20) NOT NULL,
word_count integer NOT NULL CHECK (word_count > 0) word_count integer NOT NULL CHECK (word_count > 0)
); );
@ -188,9 +188,9 @@ SELECT author_id FROM articles
-- now, test the cases where Citus do or do not need to create -- now, test the cases where Citus do or do not need to create
-- the master queries -- the master queries
SET citus.task_executor_type TO 'router';
SET citus.large_table_shard_count TO 2; SET citus.large_table_shard_count TO 2;
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
-- start with the simple lookup query -- start with the simple lookup query
SELECT * SELECT *
@ -219,7 +219,8 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
-- now show that JOINs don't work with multiple tables -- now show that JOINs with multiple tables are not router executable
-- they are executed by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles_single_shard b FROM articles a, articles_single_shard b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
@ -248,40 +249,15 @@ SELECT avg(word_count)
WHERE author_id = 2; WHERE author_id = 2;
-- max, min, sum, count is somehow implemented -- max, min, sum, count is somehow implemented
-- differently in distributed planning but, still error out -- differently in distributed planning
SELECT max(word_count) as max, min(word_count) as min, SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt sum(word_count) as sum, count(word_count) as cnt
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
-- error out for queries with ORDER BY
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count;
-- error out for queries with ORDER BY and LIMIT
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count
LIMIT 2;
-- error out for queries with aggregates and GROUP BY
SELECT max(word_count)
FROM articles
WHERE author_id = 1
GROUP BY author_id;
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles a, articles b FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
-- error out for queries which hit more than 1 shards
SELECT *
FROM articles
WHERE author_id >= 1 AND author_id <= 3;
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
SET citus.task_executor_type TO 'real-time';