From 938546b9389c26b383032c1ead99517569e9709d Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 30 Mar 2016 16:04:55 +0300 Subject: [PATCH] Add router plannable check and router planning logic for single shard select queries --- .../executor/multi_router_executor.c | 76 +- .../executor/multi_server_executor.c | 6 +- .../master/master_metadata_utility.c | 20 + .../planner/multi_physical_planner.c | 6 +- .../distributed/planner/multi_planner.c | 18 +- ...odify_planner.c => multi_router_planner.c} | 411 +++++++- src/backend/distributed/shared_library_init.c | 2 +- .../distributed/master_metadata_utility.h | 1 + .../distributed/multi_physical_planner.h | 5 +- ...odify_planner.h => multi_router_planner.h} | 14 +- .../regress/expected/multi_router_planner.out | 995 ++++++++++++++++++ .../regress/expected/multi_simple_queries.out | 51 +- src/test/regress/multi_schedule | 5 + src/test/regress/sql/multi_router_planner.sql | 507 +++++++++ 14 files changed, 2007 insertions(+), 110 deletions(-) rename src/backend/distributed/planner/{modify_planner.c => multi_router_planner.c} (63%) rename src/include/distributed/{modify_planner.h => multi_router_planner.h} (67%) create mode 100644 src/test/regress/expected/multi_router_planner.out create mode 100644 src/test/regress/sql/multi_router_planner.sql diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2c759f721..e2b7688ef 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -42,7 +42,8 @@ bool AllModificationsCommutative = false; static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static int32 ExecuteDistributedModify(Task *task); -static void ExecuteSingleShardSelect(Task *task, EState *executorState, +static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, + Task *task, EState *executorState, TupleDesc tupleDescriptor, DestReceiver *destination); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); @@ -57,7 +58,6 @@ static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) { - bool topLevel = true; LOCKMODE lockMode = NoLock; EState *executorState = NULL; CmdType commandType = queryDesc->operation; @@ -65,9 +65,13 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) /* ensure that the task is not NULL */ Assert(task != NULL); - /* disallow transactions and triggers during distributed commands */ - PreventTransactionChain(topLevel, "distributed commands"); - eflags |= EXEC_FLAG_SKIP_TRIGGERS; + /* disallow transactions and triggers during distributed modify commands */ + if (commandType != CMD_SELECT) + { + bool topLevel = true; + PreventTransactionChain(topLevel, "distributed commands"); + eflags |= EXEC_FLAG_SKIP_TRIGGERS; + } /* signal that it is a router execution */ eflags |= EXEC_FLAG_CITUS_ROUTER_EXECUTOR; @@ -79,6 +83,13 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) queryDesc->estate = executorState; + /* + * As it's similar to what we're doing, use a MaterialState node to store + * our state. This is used to store our tuplestore, so cursors etc. can + * work. + */ + queryDesc->planstate = (PlanState *) makeNode(MaterialState); + #if (PG_VERSION_NUM < 90500) /* make sure that upsertQuery is false for versions that UPSERT is not available */ @@ -181,12 +192,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas errmsg("scan directions other than forward scans " "are unsupported"))); } - if (count != 0) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("fetching rows from a query using a cursor " - "is unsupported"))); - } oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); @@ -206,7 +211,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas DestReceiver *destination = queryDesc->dest; TupleDesc resultTupleDescriptor = queryDesc->tupDesc; - ExecuteSingleShardSelect(task, estate, resultTupleDescriptor, destination); + ExecuteSingleShardSelect(queryDesc, count, task, estate, + resultTupleDescriptor, destination); } else { @@ -310,24 +316,32 @@ ExecuteDistributedModify(Task *task) /* - * ExecuteSingleShardSelect executes the remote select query and sends the - * resultant tuples to the given destination receiver. If the query fails on a + * ExecuteSingleShardSelect executes, if not done already, the remote select query and + * sends the resulting tuples to the given destination receiver. If the query fails on a * given placement, the function attempts it on its replica. */ static void -ExecuteSingleShardSelect(Task *task, EState *executorState, - TupleDesc tupleDescriptor, DestReceiver *destination) +ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, Task *task, + EState *executorState, TupleDesc tupleDescriptor, + DestReceiver *destination) { - Tuplestorestate *tupleStore = NULL; bool resultsOK = false; TupleTableSlot *tupleTableSlot = NULL; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + Tuplestorestate *tupleStore = routerState->tuplestorestate; + uint64 currentTupleCount = 0; - tupleStore = tuplestore_begin_heap(false, false, work_mem); - - resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); - if (!resultsOK) + /* initialize tuplestore for the first call */ + if (routerState->tuplestorestate == NULL) { - ereport(ERROR, (errmsg("could not receive query results"))); + routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + tupleStore = routerState->tuplestorestate; + + resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } } tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); @@ -348,14 +362,22 @@ ExecuteSingleShardSelect(Task *task, EState *executorState, executorState->es_processed++; ExecClearTuple(tupleTableSlot); + + currentTupleCount++; + /* + * If numberTuples is zero fetch all tuples, otherwise stop after + * count tuples. + */ + if (numberTuples && numberTuples == currentTupleCount) + { + break; + } } /* shutdown the tuple receiver */ (*destination->rShutdown)(destination); ExecDropSingleTupleTableSlot(tupleTableSlot); - - tuplestore_end(tupleStore); } @@ -555,6 +577,12 @@ void RouterExecutorEnd(QueryDesc *queryDesc) { EState *estate = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + + if (routerState->tuplestorestate) + { + tuplestore_end(routerState->tuplestorestate); + } Assert(estate != NULL); Assert(estate->es_finished); diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index c4d2dcc9b..aa0b0b626 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -56,9 +56,13 @@ JobExecutorType(MultiPlan *multiPlan) if (taskCount > 0) { Task *firstTask = (Task *) linitial(workerTaskList); + TaskType taskType = firstTask->taskType; - if (firstTask->taskType == MODIFY_TASK) + if (taskType == MODIFY_TASK || taskType == ROUTER_TASK) { + /* router planner creates a single task */ + Assert(taskCount == 1); + return MULTI_EXECUTOR_ROUTER; } } diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index c65a81195..1be66d944 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -68,6 +68,26 @@ LoadShardIntervalList(Oid relationId) } +/* + * ShardIntervalCount returns number of shard intervals for a given distributed table. + * The function returns 0 if table is not distributed, or no shards can be found for + * the given relation id. + */ +int +ShardIntervalCount(Oid relationId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + int shardIntervalCount = 0; + + if (cacheEntry->isDistributedTable) + { + shardIntervalCount = cacheEntry->shardIntervalArrayLength; + } + + return shardIntervalCount; +} + + /* * LoadShardList reads list of shards for given relationId from pg_dist_shard, * and returns the list of found shardIds. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 9323a163a..853cb886e 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -130,7 +130,6 @@ static OperatorCacheEntry * LookupOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Oid GetOperatorByType(Oid typeId, Oid accessMethodId, int16 strategyNumber); static Node * HashableClauseMutator(Node *originalNode, Var *partitionColumn); -static bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn); static Var * MakeInt4Column(void); static Const * MakeInt4Constant(Datum constantValue); static OpExpr * MakeHashedOperatorExpression(OpExpr *operatorExpression); @@ -161,7 +160,6 @@ static uint64 AnchorShardId(List *fragmentList, uint32 anchorRangeTableId); static List * PruneSqlTaskDependencies(List *sqlTaskList); static List * AssignTaskList(List *sqlTaskList); static bool HasMergeTaskDependencies(List *sqlTaskList); -static List * AssignAnchorShardTaskList(List *taskList); static List * GreedyAssignTaskList(List *taskList); static Task * GreedyAssignTask(WorkerNode *workerNode, List *taskList, List *activeShardPlacementLists); @@ -2945,7 +2943,7 @@ HashableClauseMutator(Node *originalNode, Var *partitionColumn) * operator expression which means it is a binary operator expression with * operands of a var and a non-null constant. */ -static bool +bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn) { Node *leftOperand = get_leftop((Expr *) operatorExpression); @@ -4811,7 +4809,7 @@ TaskListUnion(const List *list1, const List *list2) * configured task assignment policy. The distributed executor later sends these * tasks to their assigned locations for remote execution. */ -static List * +List * AssignAnchorShardTaskList(List *taskList) { List *assignedTaskList = NIL; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index ae94bdf07..02083a03c 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -20,7 +20,8 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" -#include "distributed/modify_planner.h" +#include "distributed/multi_router_planner.h" +#include "distributed/multi_server_executor.h" #include "executor/executor.h" @@ -75,12 +76,23 @@ CreatePhysicalPlan(Query *parse) Query *parseCopy = copyObject(parse); MultiPlan *physicalPlan = NULL; CmdType commandType = parse->commandType; + bool routerPlannable = false; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { - /* modifications go directly from a query to a physical plan */ - physicalPlan = MultiModifyPlanCreate(parse); + routerPlannable = true; + } + else if (TaskExecutorType == MULTI_EXECUTOR_REAL_TIME || + TaskExecutorType == MULTI_EXECUTOR_ROUTER) + { + routerPlannable = MultiRouterPlannableQuery(parseCopy); + } + + if (routerPlannable) + { + ereport(DEBUG2, (errmsg("Creating router plan"))); + physicalPlan = MultiRouterPlanCreate(parseCopy); } else { diff --git a/src/backend/distributed/planner/modify_planner.c b/src/backend/distributed/planner/multi_router_planner.c similarity index 63% rename from src/backend/distributed/planner/modify_planner.c rename to src/backend/distributed/planner/multi_router_planner.c index f7f3cf9a1..e1657e2d3 100644 --- a/src/backend/distributed/planner/modify_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1,8 +1,9 @@ /*------------------------------------------------------------------------- * - * modify_planner.c + * multi_router_planner.c * - * This file contains functions to plan distributed table modifications. + * This file contains functions to plan single shard queries + * including distributed table modifications. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -19,14 +20,15 @@ #else #include "access/skey.h" #endif +#include "access/xact.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" -#include "distributed/modify_planner.h" /* IWYU pragma: keep */ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" #include "distributed/listutils.h" #include "distributed/citus_ruleutils.h" #include "distributed/relay_utility.h" @@ -52,39 +54,60 @@ /* planner functions forward declarations */ -static void ErrorIfQueryNotSupported(Query *queryTree); -static Task * DistributedModifyTask(Query *query); +static void ErrorIfModifyQueryNotSupported(Query *queryTree); +static Task * RouterModifyTask(Query *query); #if (PG_VERSION_NUM >= 90500) static OnConflictExpr * RebuildOnConflict(Oid relationId, OnConflictExpr *originalOnConflict); #endif -static Job * DistributedModifyJob(Query *query, Task *modifyTask); +static ShardInterval * TargetShardInterval(Query *query); static List * QueryRestrictList(Query *query); -static ShardInterval * DistributedModifyShardInterval(Query *query); static Oid ExtractFirstDistributedTableId(Query *query); static Const * ExtractPartitionValue(Query *query, Var *partitionColumn); +static Task * RouterSelectTask(Query *query); +static Job * RouterQueryJob(Query *query, Task *task); +static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); /* - * MultiModifyPlanCreate actually creates the distributed plan for execution - * of a distribution modification. It expects that the provided MultiTreeRoot - * is actually a Query object, which it uses directly to produce a MultiPlan. + * MultiRouterPlanCreate creates a physical plan for given router plannable query. + * Created plan is either a modify task that changes a single shard, or a router task + * that returns query results from a single shard. Supported modify queries + * (insert/update/delete) are router plannble by default. The caller is expected to call + * MultiRouterPlannableQuery to see if the query is router plannable for select queries. */ MultiPlan * -MultiModifyPlanCreate(Query *query) +MultiRouterPlanCreate(Query *query) { - Task *modifyTask = NULL; - Job *modifyJob = NULL; + Task *task = NULL; + Job *job = NULL; MultiPlan *multiPlan = NULL; + CmdType commandType = query->commandType; + bool modifyTask = false; - ErrorIfQueryNotSupported(query); + if (commandType == CMD_INSERT || commandType == CMD_UPDATE || + commandType == CMD_DELETE) + { + modifyTask = true; + } - modifyTask = DistributedModifyTask(query); + if (modifyTask) + { + ErrorIfModifyQueryNotSupported(query); + task = RouterModifyTask(query); + } + else + { + Assert(commandType == CMD_SELECT); - modifyJob = DistributedModifyJob(query, modifyTask); + task = RouterSelectTask(query); + } + + + job = RouterQueryJob(query, task); multiPlan = CitusMakeNode(MultiPlan); - multiPlan->workerJob = modifyJob; + multiPlan->workerJob = job; multiPlan->masterQuery = NULL; multiPlan->masterTableName = NULL; @@ -93,11 +116,11 @@ MultiModifyPlanCreate(Query *query) /* - * ErrorIfQueryNotSupported checks if the query contains unsupported features, + * ErrorIfModifyQueryNotSupported checks if the query contains unsupported features, * and errors out if it does. */ static void -ErrorIfQueryNotSupported(Query *queryTree) +ErrorIfModifyQueryNotSupported(Query *queryTree) { Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; @@ -339,14 +362,14 @@ ErrorIfQueryNotSupported(Query *queryTree) /* - * DistributedModifyTask builds a Task to represent a modification performed by + * RouterModifyTask builds a Task to represent a modification performed by * the provided query against the provided shard interval. This task contains * shard-extended deparsed SQL to be run during execution. */ static Task * -DistributedModifyTask(Query *query) +RouterModifyTask(Query *query) { - ShardInterval *shardInterval = DistributedModifyShardInterval(query); + ShardInterval *shardInterval = TargetShardInterval(query); uint64 shardId = shardInterval->shardId; FromExpr *joinTree = NULL; StringInfo queryString = makeStringInfo(); @@ -475,40 +498,20 @@ RebuildOnConflict(Oid relationId, OnConflictExpr *originalOnConflict) /* - * DistributedModifyJob creates a Job for the specified query to execute the - * provided modification task. Modification task placements are produced using - * the "first-replica" algorithm, except modifications run against all matching - * placements rather than just the first successful one. - */ -Job * -DistributedModifyJob(Query *query, Task *modifyTask) -{ - Job *modifyJob = NULL; - List *taskList = FirstReplicaAssignTaskList(list_make1(modifyTask)); - - modifyJob = CitusMakeNode(Job); - modifyJob->dependedJobList = NIL; - modifyJob->jobId = INVALID_JOB_ID; - modifyJob->subqueryPushdown = false; - modifyJob->jobQuery = query; - modifyJob->taskList = taskList; - - return modifyJob; -} - - -/* - * DistributedModifyShardInterval determines the single shard targeted by a - * provided distributed modification command. If no matching shards exist, or - * if the modification targets more than one one shard, this function raises - * an error. + * TargetShardInterval determines the single shard targeted by a provided command. + * If no matching shards exist, or if the modification targets more than one one + * shard, this function raises an error depending on the command type. */ static ShardInterval * -DistributedModifyShardInterval(Query *query) +TargetShardInterval(Query *query) { + CmdType commandType = query->commandType; + bool selectTask = (commandType == CMD_SELECT); List *restrictClauseList = NIL; List *prunedShardList = NIL; Index tableId = 1; + int prunedShardCount = 0; + Oid distributedTableId = ExtractFirstDistributedTableId(query); List *shardIntervalList = NIL; @@ -520,7 +523,7 @@ DistributedModifyShardInterval(Query *query) char *relationName = get_rel_name(distributedTableId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards for modification"), + errmsg("could not find any shards"), errdetail("No shards exist for distributed table \"%s\".", relationName), errhint("Run master_create_worker_shards to create shards " @@ -530,12 +533,21 @@ DistributedModifyShardInterval(Query *query) restrictClauseList = QueryRestrictList(query); prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, shardIntervalList); - - if (list_length(prunedShardList) != 1) + prunedShardCount = list_length(prunedShardList); + if (prunedShardCount != 1) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributed modifications must target exactly one " - "shard"))); + if (selectTask) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("router executor queries must target exactly one " + "shard"))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributed modifications must target exactly one " + "shard"))); + } } return (ShardInterval *) linitial(prunedShardList); @@ -574,7 +586,8 @@ QueryRestrictList(Query *query) queryRestrictList = list_make1(equalityExpr); } - else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) + else if (commandType == CMD_UPDATE || commandType == CMD_DELETE || + commandType == CMD_SELECT) { queryRestrictList = WhereClauseList(query->jointree); } @@ -640,3 +653,287 @@ ExtractPartitionValue(Query *query, Var *partitionColumn) return partitionValue; } + + +/* RouterSelectTask builds a Task to represent a single shard select query */ +static Task * +RouterSelectTask(Query *query) +{ + Task *task = NULL; + ShardInterval *shardInterval = TargetShardInterval(query); + StringInfo queryString = makeStringInfo(); + uint64 shardId = INVALID_SHARD_ID; + bool upsertQuery = false; + CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; + FromExpr *joinTree = NULL; + + Assert(shardInterval != NULL); + Assert(commandType == CMD_SELECT); + + shardId = shardInterval->shardId; + + /* + * Convert the qualifiers to an explicitly and'd clause, which is needed + * before we deparse the query. + */ + joinTree = query->jointree; + if ((joinTree != NULL) && (joinTree->quals != NULL)) + { + Node *whereClause = (Node *) make_ands_explicit((List *) joinTree->quals); + joinTree->quals = whereClause; + } + + deparse_shard_query(query, shardInterval->relationId, shardId, queryString); + ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + + task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; + task->taskId = INVALID_TASK_ID; + task->taskType = ROUTER_TASK; + task->queryString = queryString->data; + task->anchorShardId = shardId; + task->dependedTaskList = NIL; + task->upsertQuery = upsertQuery; + + return task; +} + + +/* + * RouterQueryJob creates a Job for the specified query to execute the + * provided single shard select task.*/ +static Job * +RouterQueryJob(Query *query, Task *task) +{ + Job *job = NULL; + List *taskList = NIL; + TaskType taskType = task->taskType; + + /* + * We send modify task to the first replica, otherwise we choose the target shard + * according to task assignment policy. + */ + if (taskType == MODIFY_TASK) + { + taskList = FirstReplicaAssignTaskList(list_make1(task)); + } + else + { + taskList = AssignAnchorShardTaskList(list_make1(task)); + } + + job = CitusMakeNode(Job); + job->dependedJobList = NIL; + job->jobId = INVALID_JOB_ID; + job->subqueryPushdown = false; + job->jobQuery = query; + job->taskList = taskList; + + return job; +} + + +/* + * MultiRouterPlannableQuery returns true if given query can be router plannable. + * The query is router plannable if it is a select query issued on a hash + * partitioned distributed table, and it has a exact match comparison on the + * partition column. This feature is enabled if task executor is set to real-time or + * router. + */ +bool +MultiRouterPlannableQuery(Query *query) +{ + uint32 rangeTableId = 1; + List *rangeTableList = NIL; + RangeTblEntry *rangeTableEntry = NULL; + Oid distributedTableId = InvalidOid; + Var *partitionColumn = NULL; + char partitionMethod = '\0'; + Node *quals = NULL; + CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; + FromExpr *joinTree = query->jointree; + List *varClauseList = NIL; + ListCell *varClauseCell = NULL; + bool partitionColumnMatchExpression = false; + int partitionColumnReferenceCount = 0; + int shardCount = 0; + + Assert(commandType == CMD_SELECT); + + /* + * Reject subqueries which are in SELECT or WHERE clause. + * Queries which are recursive, with CommonTableExpr, with locking (hasForUpdate), + * or with window functions are also rejected here. + * Queries which have subqueries, or tablesamples in FROM clauses are rejected later + * during RangeTblEntry checks. + */ + if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate || + query->hasRecursive || query->utilityStmt != NULL) + { + return false; + } + +#if (PG_VERSION_NUM >= 90500) + if (query->groupingSets) + { + return false; + } +#endif + + /* only hash partitioned tables are supported */ + distributedTableId = ExtractFirstDistributedTableId(query); + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + partitionMethod = PartitionMethod(distributedTableId); + + if (partitionMethod != DISTRIBUTE_BY_HASH) + { + return false; + } + + /* extract range table entries */ + ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); + + /* query can have only one range table of type RTE_RELATION */ + if (list_length(rangeTableList) != 1) + { + return false; + } + + rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); + if (rangeTableEntry->rtekind != RTE_RELATION) + { + return false; + } + +#if (PG_VERSION_NUM >= 90500) + if (rangeTableEntry->tablesample) + { + return false; + } +#endif + + if (joinTree == NULL) + { + return false; + } + + quals = joinTree->quals; + if (quals == NULL) + { + return false; + } + + /* convert list of expressions into expression tree */ + if (quals != NULL && IsA(quals, List)) + { + quals = (Node *) make_ands_explicit((List *) quals); + } + + /* + * Partition column must be used in a simple equality match check and it must be + * place at top level conjustion operator. + */ + partitionColumnMatchExpression = + ColumnMatchExpressionAtTopLevelConjunction(quals, partitionColumn); + + if (!partitionColumnMatchExpression) + { + return false; + } + + /* make sure partition column is used only once in the query */ + varClauseList = pull_var_clause_default(quals); + foreach(varClauseCell, varClauseList) + { + Var *column = (Var *) lfirst(varClauseCell); + if (equal(column, partitionColumn)) + { + partitionColumnReferenceCount++; + } + } + + if (partitionColumnReferenceCount != 1) + { + return false; + } + + /* + * We need to make sure there is at least one shard for this hash partitioned + * query to be router plannable. We can not prepare a router plan if there + * are no shards. + */ + shardCount = ShardIntervalCount(distributedTableId); + if (shardCount == 0) + { + return false; + } + + return true; +} + + +/* + * ColumnMatchExpressionAtTopLevelConjunction returns true if the query contains an exact + * match (equal) expression on the provided column. The function returns true only + * if the match expression has an AND relation with the rest of the expression tree. + */ +static bool +ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, OpExpr)) + { + OpExpr *opExpr = (OpExpr *) node; + bool simpleExpression = SimpleOpExpression((Expr *) opExpr); + bool columnInExpr = false; + char *operatorName = NULL; + int operatorNameComparison = 0; + bool usingEqualityOperator = false; + + if (!simpleExpression) + { + return false; + } + + columnInExpr = OpExpressionContainsColumn(opExpr, column); + if (!columnInExpr) + { + return false; + } + + operatorName = get_opname(opExpr->opno); + operatorNameComparison = strncmp(operatorName, EQUAL_OPERATOR_STRING, + NAMEDATALEN); + usingEqualityOperator = (operatorNameComparison == 0); + + return usingEqualityOperator; + } + else if (IsA(node, BoolExpr)) + { + BoolExpr *boolExpr = (BoolExpr *) node; + List *argumentList = boolExpr->args; + ListCell *argumentCell = NULL; + + if (boolExpr->boolop != AND_EXPR) + { + return false; + } + + foreach(argumentCell, argumentList) + { + Node *argumentNode = (Node *) lfirst(argumentCell); + bool columnMatch = + ColumnMatchExpressionAtTopLevelConjunction(argumentNode, column); + if (columnMatch) + { + return true; + } + } + } + + return false; +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 4db469989..d72ae3150 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -19,7 +19,6 @@ #include "commands/explain.h" #include "executor/executor.h" #include "distributed/master_protocol.h" -#include "distributed/modify_planner.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -27,6 +26,7 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_transaction.h" #include "distributed/multi_utility.h" diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 17a711ee2..a30667564 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -56,6 +56,7 @@ typedef struct ShardPlacement /* Function declarations to read shard and shard placement data */ extern List * LoadShardIntervalList(Oid relationId); +extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern char * LoadShardAlias(Oid relationId, uint64 shardId); extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 9506b1fd1..6b0fb4172 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -76,7 +76,8 @@ typedef enum SHARD_FETCH_TASK = 4, MAP_OUTPUT_FETCH_TASK = 5, MERGE_FETCH_TASK = 6, - MODIFY_TASK = 7 + MODIFY_TASK = 7, + ROUTER_TASK = 8 } TaskType; @@ -231,6 +232,7 @@ extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber); extern Node * BuildBaseConstraint(Var *column); extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval); extern bool SimpleOpExpression(Expr *clause); +extern bool OpExpressionContainsColumn(OpExpr *operatorExpression, Var *partitionColumn); extern int CompareShardPlacements(const void *leftElement, const void *rightElement); @@ -246,6 +248,7 @@ extern List * TaskListConcatUnique(List *list1, List *list2); extern bool TaskListMember(const List *taskList, const Task *task); extern List * TaskListDifference(const List *list1, const List *list2); extern List * TaskListUnion(const List *list1, const List *list2); +extern List * AssignAnchorShardTaskList(List *taskList); extern List * FirstReplicaAssignTaskList(List *taskList); diff --git a/src/include/distributed/modify_planner.h b/src/include/distributed/multi_router_planner.h similarity index 67% rename from src/include/distributed/modify_planner.h rename to src/include/distributed/multi_router_planner.h index bd6df4755..6974b8b62 100644 --- a/src/include/distributed/modify_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -1,16 +1,16 @@ /*------------------------------------------------------------------------- * - * modify_planner.h + * multi_router_planner.h * - * Declarations for public functions and types related to modify planning. + * Declarations for public functions and types related to router planning. * * Copyright (c) 2014-2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ -#ifndef MODIFY_PLANNER_H -#define MODIFY_PLANNER_H +#ifndef MULTI_ROUTER_PLANNER_H +#define MULTI_ROUTER_PLANNER_H #include "c.h" @@ -29,7 +29,7 @@ #define UPSERT_ALIAS "citus_table_alias" #endif +extern MultiPlan * MultiRouterPlanCreate(Query *query); +extern bool MultiRouterPlannableQuery(Query *query); -extern MultiPlan * MultiModifyPlanCreate(Query *query); - -#endif /* MODIFY_PLANNER_H */ +#endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out new file mode 100644 index 000000000..288f15145 --- /dev/null +++ b/src/test/regress/expected/multi_router_planner.out @@ -0,0 +1,995 @@ +-- =================================================================== +-- test router planner functionality for single shard select queries +-- =================================================================== +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 103300; +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); +-- Check for the existence of line 'DEBUG: Creating router plan' +-- to determine if router planner is used. +-- this table is used in a CTE test +CREATE TABLE authors_hash ( name text, id bigint ); +-- this table is used in router executor tests +CREATE TABLE articles_single_shard_hash (LIKE articles_hash); +SELECT master_create_distributed_table('articles_hash', 'author_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_distributed_table('articles_single_shard_hash', 'author_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- test when a table is distributed but no shards created yet +SELECT count(*) from articles_hash; + count +------- + +(1 row) + +SELECT master_create_worker_shards('articles_hash', 2, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT master_create_worker_shards('articles_single_shard_hash', 1, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- create a bunch of test data +INSERT INTO articles_hash VALUES ( 1, 1, 'arsenous', 9572); +INSERT INTO articles_hash VALUES ( 2, 2, 'abducing', 13642); +INSERT INTO articles_hash VALUES ( 3, 3, 'asternal', 10480); +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551); +INSERT INTO articles_hash VALUES ( 5, 5, 'aruru', 11389); +INSERT INTO articles_hash VALUES ( 6, 6, 'atlases', 15459); +INSERT INTO articles_hash VALUES ( 7, 7, 'aseptic', 12298); +INSERT INTO articles_hash VALUES ( 8, 8, 'agatized', 16368); +INSERT INTO articles_hash VALUES ( 9, 9, 'alligate', 438); +INSERT INTO articles_hash VALUES (10, 10, 'aggrandize', 17277); +INSERT INTO articles_hash VALUES (11, 1, 'alamo', 1347); +INSERT INTO articles_hash VALUES (12, 2, 'archiblast', 18185); +INSERT INTO articles_hash VALUES (13, 3, 'aseyev', 2255); +INSERT INTO articles_hash VALUES (14, 4, 'andesite', 19094); +INSERT INTO articles_hash VALUES (15, 5, 'adversa', 3164); +INSERT INTO articles_hash VALUES (16, 6, 'allonym', 2); +INSERT INTO articles_hash VALUES (17, 7, 'auriga', 4073); +INSERT INTO articles_hash VALUES (18, 8, 'assembly', 911); +INSERT INTO articles_hash VALUES (19, 9, 'aubergiste', 4981); +INSERT INTO articles_hash VALUES (20, 10, 'absentness', 1820); +INSERT INTO articles_hash VALUES (21, 1, 'arcading', 5890); +INSERT INTO articles_hash VALUES (22, 2, 'antipope', 2728); +INSERT INTO articles_hash VALUES (23, 3, 'abhorring', 6799); +INSERT INTO articles_hash VALUES (24, 4, 'audacious', 3637); +INSERT INTO articles_hash VALUES (25, 5, 'antehall', 7707); +INSERT INTO articles_hash VALUES (26, 6, 'abington', 4545); +INSERT INTO articles_hash VALUES (27, 7, 'arsenous', 8616); +INSERT INTO articles_hash VALUES (28, 8, 'aerophyte', 5454); +INSERT INTO articles_hash VALUES (29, 9, 'amateur', 9524); +INSERT INTO articles_hash VALUES (30, 10, 'andelee', 6363); +INSERT INTO articles_hash VALUES (31, 1, 'athwartships', 7271); +INSERT INTO articles_hash VALUES (32, 2, 'amazon', 11342); +INSERT INTO articles_hash VALUES (33, 3, 'autochrome', 8180); +INSERT INTO articles_hash VALUES (34, 4, 'amnestied', 12250); +INSERT INTO articles_hash VALUES (35, 5, 'aminate', 9089); +INSERT INTO articles_hash VALUES (36, 6, 'ablation', 13159); +INSERT INTO articles_hash VALUES (37, 7, 'archduchies', 9997); +INSERT INTO articles_hash VALUES (38, 8, 'anatine', 14067); +INSERT INTO articles_hash VALUES (39, 9, 'anchises', 10906); +INSERT INTO articles_hash VALUES (40, 10, 'attemper', 14976); +INSERT INTO articles_hash VALUES (41, 1, 'aznavour', 11814); +INSERT INTO articles_hash VALUES (42, 2, 'ausable', 15885); +INSERT INTO articles_hash VALUES (43, 3, 'affixal', 12723); +INSERT INTO articles_hash VALUES (44, 4, 'anteport', 16793); +INSERT INTO articles_hash VALUES (45, 5, 'afrasia', 864); +INSERT INTO articles_hash VALUES (46, 6, 'atlanta', 17702); +INSERT INTO articles_hash VALUES (47, 7, 'abeyance', 1772); +INSERT INTO articles_hash VALUES (48, 8, 'alkylic', 18610); +INSERT INTO articles_hash VALUES (49, 9, 'anyone', 2681); +INSERT INTO articles_hash VALUES (50, 10, 'anjanette', 19519); +SET citus.task_executor_type TO 'real-time'; +SET citus.large_table_shard_count TO 2; +SET client_min_messages TO 'DEBUG2'; +-- insert a single row for the test +INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); +DEBUG: Creating router plan +-- first, test zero-shard SELECT, which should return an empty row +SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; +DEBUG: predicate pruning for shardId 103301 +DEBUG: predicate pruning for shardId 103300 + count +------- + +(1 row) + +-- single-shard tests +-- test simple select for a single row +SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+-----------+------------ + 50 | 10 | anjanette | 19519 +(1 row) + +-- get all titles by a single author +SELECT title FROM articles_hash WHERE author_id = 10; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + title +------------ + aggrandize + absentness + andelee + attemper + anjanette +(5 rows) + +-- try ordering them by word count +SELECT title, word_count FROM articles_hash + WHERE author_id = 10 + ORDER BY word_count DESC NULLS LAST; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + title | word_count +------------+------------ + anjanette | 19519 + aggrandize | 17277 + attemper | 14976 + andelee | 6363 + absentness | 1820 +(5 rows) + +-- look at last two articles by an author +SELECT title, id FROM articles_hash + WHERE author_id = 5 + ORDER BY id + LIMIT 2; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + title | id +---------+---- + aruru | 5 + adversa | 15 +(2 rows) + +-- find all articles by two authors in same shard +SELECT title, author_id FROM articles_hash + WHERE author_id = 7 OR author_id = 8 + ORDER BY author_id ASC, id; +DEBUG: predicate pruning for shardId 103301 + title | author_id +-------------+----------- + aseptic | 7 + auriga | 7 + arsenous | 7 + archduchies | 7 + abeyance | 7 + agatized | 8 + assembly | 8 + aerophyte | 8 + anatine | 8 + alkylic | 8 +(10 rows) + +-- add in some grouping expressions, still on same shard +-- having queries unsupported in Citus +SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash + WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 + GROUP BY author_id + HAVING sum(word_count) > 1000 + ORDER BY sum(word_count) DESC; +ERROR: cannot perform distributed planning on this query +DETAIL: Having qual is currently unsupported +-- however having clause is supported if it goes to a single shard +SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash + WHERE author_id = 1 + GROUP BY author_id + HAVING sum(word_count) > 1000 + ORDER BY sum(word_count) DESC; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + author_id | corpus_size +-----------+------------- + 1 | 35894 +(1 row) + +-- UNION/INTERSECT queries are unsupported +-- this is rejected by router planner and handled by multi_logical_planner +SELECT * FROM articles_hash WHERE author_id = 10 UNION +SELECT * FROM articles_hash WHERE author_id = 1; +ERROR: cannot perform distributed planning on this query +DETAIL: Union, Intersect, or Except are currently unsupported +-- query is a single shard query but can't do shard pruning, +-- not router-plannable due to <= and IN +SELECT * FROM articles_hash WHERE author_id <= 1; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +SELECT * FROM articles_hash WHERE author_id IN (1, 3); +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 3 | 3 | asternal | 10480 + 11 | 1 | alamo | 1347 + 13 | 3 | aseyev | 2255 + 21 | 1 | arcading | 5890 + 23 | 3 | abhorring | 6799 + 31 | 1 | athwartships | 7271 + 33 | 3 | autochrome | 8180 + 41 | 1 | aznavour | 11814 + 43 | 3 | affixal | 12723 +(10 rows) + +-- queries using CTEs are unsupported +WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) +SELECT title FROM articles_hash WHERE author_id = 1; +ERROR: cannot perform distributed planning on this query +DETAIL: Common Table Expressions are currently unsupported +-- queries which involve functions in FROM clause are unsupported. +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; +ERROR: cannot perform distributed planning on this query +DETAIL: Complex table expressions are currently unsupported +-- subqueries are not supported in WHERE clause in Citus +SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a'); +ERROR: cannot plan queries that include both regular and partitioned relations +-- subqueries are supported in FROM clause but they are not router plannable +SELECT articles_hash.id,test.word_count +FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id +ORDER BY articles_hash.id; +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 14 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 14 +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". +-- subqueries are not supported in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries other than in from-clause are currently unsupported +set citus.task_executor_type to 'router'; +-- simple lookup query +SELECT * + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- below query hits a single shard, but it is not router plannable +-- still router executable +SELECT * + FROM articles_hash + WHERE author_id = 1 OR author_id = 17; +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- below query hits two shards, not router plannable + not router executable +SELECT * + FROM articles_hash + WHERE author_id = 1 OR author_id = 18; +ERROR: cannot use router executor with queries that hit multiple shards +HINT: Set citus.task_executor_type to "real-time" or "task-tracker". +-- rename the output columns +SELECT id as article_id, word_count * id as random_value + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + article_id | random_value +------------+-------------- + 1 | 9572 + 11 | 14817 + 21 | 123690 + 31 | 225401 + 41 | 484374 +(5 rows) + +-- we can push down co-located joins to a single worker +-- this is not router plannable but router executable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; +DEBUG: push down of limit count: 3 +DEBUG: predicate pruning for shardId 103301 +DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] + first_author | second_word_count +--------------+------------------- + 10 | 17277 + 10 | 1820 + 10 | 6363 +(3 rows) + +-- following join is neither router plannable, nor router executable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; +DEBUG: push down of limit count: 3 +DEBUG: predicate pruning for shardId 103301 +ERROR: cannot use router executor with JOINs +HINT: Set citus.task_executor_type to "real-time" or "task-tracker". +-- single shard select with limit is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + LIMIT 3; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 +(3 rows) + +-- single shard select with limit + offset is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + LIMIT 2 + OFFSET 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 +(2 rows) + +-- single shard select with limit + offset + order by is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id desc + LIMIT 2 + OFFSET 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 31 | 1 | athwartships | 7271 + 21 | 1 | arcading | 5890 +(2 rows) + + +-- single shard select with group by on non-partition column is router plannable +SELECT id + FROM articles_hash + WHERE author_id = 1 + GROUP BY id; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id +---- + 41 + 11 + 31 + 1 + 21 +(5 rows) + +-- single shard select with distinct is router plannable +SELECT distinct id + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id +---- + 41 + 11 + 31 + 1 + 21 +(5 rows) + +-- single shard aggregate is router plannable +SELECT avg(word_count) + FROM articles_hash + WHERE author_id = 2; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103300 + avg +-------------------- + 12356.400000000000 +(1 row) + +-- max, min, sum, count are router plannable on single shard +SELECT max(word_count) as max, min(word_count) as min, + sum(word_count) as sum, count(word_count) as cnt + FROM articles_hash + WHERE author_id = 2; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103300 + max | min | sum | cnt +-------+------+-------+----- + 18185 | 2728 | 61782 | 5 +(1 row) + +-- queries with aggregates and group by supported on single shard +SELECT max(word_count) + FROM articles_hash + WHERE author_id = 1 + GROUP BY author_id; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + max +------- + 11814 +(1 row) + +SET client_min_messages to 'NOTICE'; +-- error out for queries with repartition jobs +SELECT * + FROM articles_hash a, articles_hash b + WHERE a.id = b.id AND a.author_id = 1; +ERROR: cannot use router executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". +-- error out for queries which hit more than 1 shards +SELECT * + FROM articles_hash + WHERE author_id >= 1 AND author_id <= 3; +ERROR: cannot use router executor with queries that hit multiple shards +HINT: Set citus.task_executor_type to "real-time" or "task-tracker". +SET citus.task_executor_type TO 'real-time'; +-- Test various filtering options for router plannable check +SET client_min_messages to 'DEBUG2'; +-- this is definitely single shard +-- but not router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and author_id >= 1; +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- not router plannable due to or +SELECT * + FROM articles_hash + WHERE author_id = 1 or id = 1; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + + +-- router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = 1 or id = 41); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 + 41 | 1 | aznavour | 11814 +(2 rows) + +-- router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = random()::int * 0); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+-------+------------ +(0 rows) + +-- not router plannable due to function call on the right side +SELECT * + FROM articles_hash + WHERE author_id = (random()::int * 0 + 1); + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- not router plannable due to or +SELECT * + FROM articles_hash + WHERE author_id = 1 or id = 1; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + + +-- router plannable due to abs(-1) getting converted to 1 by postgresql +SELECT * + FROM articles_hash + WHERE author_id = abs(-1); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- not router plannable due to abs() function +SELECT * + FROM articles_hash + WHERE 1 = abs(author_id); + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- not router plannable due to abs() function +SELECT * + FROM articles_hash + WHERE author_id = abs(author_id - 2); + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- router plannable, function on different field +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = abs(id - 2)); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 +(1 row) + +-- not router plannable due to is true +SELECT * + FROM articles_hash + WHERE (author_id = 1) is true; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- router plannable, (boolean expression) = true is collapsed to (boolean expression) +SELECT * + FROM articles_hash + WHERE (author_id = 1) = true; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- router plannable, between operator is on another column +SELECT * + FROM articles_hash + WHERE (author_id = 1) and id between 0 and 20; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 +(2 rows) + +-- router plannable, partition column expression is and'ed to rest +SELECT * + FROM articles_hash + WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s'; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 31 | 1 | athwartships | 7271 +(2 rows) + +-- router plannable, order is changed +SELECT * + FROM articles_hash + WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 31 | 1 | athwartships | 7271 +(2 rows) + +-- router plannable +SELECT * + FROM articles_hash + WHERE (title like '%s' or title like 'a%') and (author_id = 1); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- router plannable +SELECT * + FROM articles_hash + WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 41 | 1 | aznavour | 11814 +(3 rows) + +-- window functions are supported if query is router plannable +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + prev | title | word_count +----------+----------+------------ + | afrasia | 864 + afrasia | adversa | 3164 + adversa | antehall | 7707 + antehall | aminate | 9089 + aminate | aruru | 11389 +(5 rows) + +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5 + ORDER BY word_count DESC; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + prev | title | word_count +----------+----------+------------ + aminate | aruru | 11389 + antehall | aminate | 9089 + adversa | antehall | 7707 + afrasia | adversa | 3164 + | afrasia | 864 +(5 rows) + +SELECT id, MIN(id) over (order by word_count) + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | min +----+----- + 11 | 11 + 21 | 11 + 31 | 11 + 1 | 1 + 41 | 1 +(5 rows) + +SELECT id, word_count, AVG(word_count) over (order by word_count) + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | word_count | avg +----+------------+----------------------- + 11 | 1347 | 1347.0000000000000000 + 21 | 5890 | 3618.5000000000000000 + 31 | 7271 | 4836.0000000000000000 + 1 | 9572 | 6020.0000000000000000 + 41 | 11814 | 7178.8000000000000000 +(5 rows) + +-- window functions are not supported for not router plannable queries +SELECT id, MIN(id) over (order by word_count) + FROM articles_hash + WHERE author_id = 1 or author_id = 2; +ERROR: cannot perform distributed planning on this query +DETAIL: Window functions are currently unsupported + +-- but they are not supported for not router plannable queries +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5 or author_id = 1; +ERROR: cannot perform distributed planning on this query +DETAIL: Window functions are currently unsupported +-- complex query hitting a single shard +SELECT + count(DISTINCT CASE + WHEN + word_count > 100 + THEN + id + ELSE + NULL + END) as c + FROM + articles_hash + WHERE + author_id = 5; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + c +--- + 5 +(1 row) + +-- same query is not router plannable if hits multiple shards +SELECT + count(DISTINCT CASE + WHEN + word_count > 100 + THEN + id + ELSE + NULL + END) as c + FROM + articles_hash + GROUP BY + author_id; +ERROR: cannot compute aggregate (distinct) +DETAIL: aggregate (distinct) on complex expressions is unsupported +HINT: You can load the hll extension from contrib packages and enable distinct approximations. +-- queries inside transactions can be router plannable +BEGIN; +SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +END; +-- cursor queries inside transactions are not router plannable +BEGIN; +DECLARE test_cursor CURSOR FOR + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; +DEBUG: predicate pruning for shardId 103301 +FETCH test_cursor; + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 +(1 row) + +FETCH test_cursor; + id | author_id | title | word_count +----+-----------+-------+------------ + 11 | 1 | alamo | 1347 +(1 row) + +END; +-- queries inside copy can be router plannable +COPY ( + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id) TO STDOUT; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 +1 1 arsenous 9572 +11 1 alamo 1347 +21 1 arcading 5890 +31 1 athwartships 7271 +41 1 aznavour 11814 + +-- table creation queries inside can be router plannable +CREATE TEMP TABLE temp_articles_hash as + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 +-- router plannable queries may include filter for aggragates +SELECT count(*), count(*) FILTER (WHERE id < 3) + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + count | count +-------+------- + 5 | 1 +(1 row) + +-- non-router plannable queries do not support filters +SELECT count(*), count(*) FILTER (WHERE id < 3) + FROM articles_hash + WHERE author_id = 1 or author_id = 2; + count | count +-------+------- + 10 | +(1 row) + +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; +EXECUTE author_1_articles; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; +EXECUTE author_articles(1); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +-- queries inside plpgsql functions could be router plannable +CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$ +DECLARE + max_id integer; +BEGIN + SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1 + into max_id; + return max_id; +END; +$$ LANGUAGE plpgsql; +SELECT author_articles_max_id(); +DEBUG: Creating router plan +CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_max_id() line 5 at SQL statement +DEBUG: predicate pruning for shardId 103301 +CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_max_id() line 5 at SQL statement + author_articles_max_id +------------------------ + 41 +(1 row) + +-- plpgsql function that return query results are not router plannable +CREATE OR REPLACE FUNCTION author_articles_id_word_count() RETURNS TABLE(id bigint, word_count int) AS $$ +DECLARE +BEGIN + RETURN QUERY + SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1; + +END; +$$ LANGUAGE plpgsql; +SELECT * FROM author_articles_id_word_count(); +DEBUG: Creating router plan +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +DEBUG: predicate pruning for shardId 103301 +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +ERROR: scan directions other than forward scans are unsupported +CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +SET client_min_messages to 'NOTICE'; +DROP FUNCTION author_articles_max_id(); +DROP FUNCTION author_articles_id_word_count(); +DROP TABLE articles_hash; +DROP TABLE articles_single_shard_hash; +DROP TABLE authors_hash; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index c771628fc..fe0904b71 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -322,6 +322,7 @@ SET client_min_messages TO 'DEBUG2'; SELECT * FROM articles WHERE author_id = 1; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 id | author_id | title | word_count ----+-----------+--------------+------------ @@ -356,6 +357,7 @@ HINT: Set citus.task_executor_type to "real-time" or "task-tracker". SELECT id as article_id, word_count * id as random_value FROM articles WHERE author_id = 1; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 article_id | random_value ------------+-------------- @@ -396,7 +398,7 @@ SELECT * FROM articles WHERE author_id = 1 LIMIT 2; -DEBUG: push down of limit count: 2 +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 id | author_id | title | word_count ----+-----------+----------+------------ @@ -411,6 +413,7 @@ SELECT id FROM articles WHERE author_id = 1 GROUP BY id; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 id ---- @@ -428,44 +431,68 @@ COPY articles_single_shard TO stdout; SELECT avg(word_count) FROM articles WHERE author_id = 2; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103093 -ERROR: cannot use router executor with aggregates -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + avg +-------------------- + 12356.400000000000 +(1 row) + -- max, min, sum, count is somehow implemented -- differently in distributed planning but, still error out SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103093 -ERROR: cannot use router executor with aggregates -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + max | min | sum | cnt +-------+------+-------+----- + 18185 | 2728 | 61782 | 5 +(1 row) + -- error out for queries with ORDER BY SELECT * FROM articles WHERE author_id = 1 ORDER BY word_count; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 -ERROR: cannot use router executor with ORDER BY clauses -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + id | author_id | title | word_count +----+-----------+--------------+------------ + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 1 | 1 | arsenous | 9572 + 41 | 1 | aznavour | 11814 +(5 rows) + -- error out for queries with ORDER BY and LIMIT SELECT * FROM articles WHERE author_id = 1 ORDER BY word_count LIMIT 2; -DEBUG: push down of limit count: 2 +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 -ERROR: cannot use router executor with ORDER BY clauses -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + id | author_id | title | word_count +----+-----------+----------+------------ + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 +(2 rows) + -- error out for queries with aggregates and GROUP BY SELECT max(word_count) FROM articles WHERE author_id = 1 GROUP BY author_id; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 -ERROR: cannot use router executor with aggregates -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + max +------- + 11814 +(1 row) + -- error out for queries with repartition jobs SELECT * FROM articles a, articles b diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index e493fc739..a5295acb2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -126,6 +126,11 @@ test: multi_repartitioned_subquery_udf # --------- test: multi_copy +# --------- +# multi_router_planner creates hash partitioned tables. +# --------- +test: multi_router_planner + # ---------- # multi_large_shardid stages more shards into lineitem # ---------- diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql new file mode 100644 index 000000000..341e0c0a8 --- /dev/null +++ b/src/test/regress/sql/multi_router_planner.sql @@ -0,0 +1,507 @@ +-- =================================================================== +-- test router planner functionality for single shard select queries +-- =================================================================== +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 103300; + + +CREATE TABLE articles_hash ( + id bigint NOT NULL, + author_id bigint NOT NULL, + title varchar(20) NOT NULL, + word_count integer +); + +-- Check for the existence of line 'DEBUG: Creating router plan' +-- to determine if router planner is used. + +-- this table is used in a CTE test +CREATE TABLE authors_hash ( name text, id bigint ); + +-- this table is used in router executor tests +CREATE TABLE articles_single_shard_hash (LIKE articles_hash); + +SELECT master_create_distributed_table('articles_hash', 'author_id', 'hash'); +SELECT master_create_distributed_table('articles_single_shard_hash', 'author_id', 'hash'); + + +-- test when a table is distributed but no shards created yet +SELECT count(*) from articles_hash; + +SELECT master_create_worker_shards('articles_hash', 2, 1); +SELECT master_create_worker_shards('articles_single_shard_hash', 1, 1); + +-- create a bunch of test data +INSERT INTO articles_hash VALUES ( 1, 1, 'arsenous', 9572); +INSERT INTO articles_hash VALUES ( 2, 2, 'abducing', 13642); +INSERT INTO articles_hash VALUES ( 3, 3, 'asternal', 10480); +INSERT INTO articles_hash VALUES ( 4, 4, 'altdorfer', 14551); +INSERT INTO articles_hash VALUES ( 5, 5, 'aruru', 11389); +INSERT INTO articles_hash VALUES ( 6, 6, 'atlases', 15459); +INSERT INTO articles_hash VALUES ( 7, 7, 'aseptic', 12298); +INSERT INTO articles_hash VALUES ( 8, 8, 'agatized', 16368); +INSERT INTO articles_hash VALUES ( 9, 9, 'alligate', 438); +INSERT INTO articles_hash VALUES (10, 10, 'aggrandize', 17277); +INSERT INTO articles_hash VALUES (11, 1, 'alamo', 1347); +INSERT INTO articles_hash VALUES (12, 2, 'archiblast', 18185); +INSERT INTO articles_hash VALUES (13, 3, 'aseyev', 2255); +INSERT INTO articles_hash VALUES (14, 4, 'andesite', 19094); +INSERT INTO articles_hash VALUES (15, 5, 'adversa', 3164); +INSERT INTO articles_hash VALUES (16, 6, 'allonym', 2); +INSERT INTO articles_hash VALUES (17, 7, 'auriga', 4073); +INSERT INTO articles_hash VALUES (18, 8, 'assembly', 911); +INSERT INTO articles_hash VALUES (19, 9, 'aubergiste', 4981); +INSERT INTO articles_hash VALUES (20, 10, 'absentness', 1820); +INSERT INTO articles_hash VALUES (21, 1, 'arcading', 5890); +INSERT INTO articles_hash VALUES (22, 2, 'antipope', 2728); +INSERT INTO articles_hash VALUES (23, 3, 'abhorring', 6799); +INSERT INTO articles_hash VALUES (24, 4, 'audacious', 3637); +INSERT INTO articles_hash VALUES (25, 5, 'antehall', 7707); +INSERT INTO articles_hash VALUES (26, 6, 'abington', 4545); +INSERT INTO articles_hash VALUES (27, 7, 'arsenous', 8616); +INSERT INTO articles_hash VALUES (28, 8, 'aerophyte', 5454); +INSERT INTO articles_hash VALUES (29, 9, 'amateur', 9524); +INSERT INTO articles_hash VALUES (30, 10, 'andelee', 6363); +INSERT INTO articles_hash VALUES (31, 1, 'athwartships', 7271); +INSERT INTO articles_hash VALUES (32, 2, 'amazon', 11342); +INSERT INTO articles_hash VALUES (33, 3, 'autochrome', 8180); +INSERT INTO articles_hash VALUES (34, 4, 'amnestied', 12250); +INSERT INTO articles_hash VALUES (35, 5, 'aminate', 9089); +INSERT INTO articles_hash VALUES (36, 6, 'ablation', 13159); +INSERT INTO articles_hash VALUES (37, 7, 'archduchies', 9997); +INSERT INTO articles_hash VALUES (38, 8, 'anatine', 14067); +INSERT INTO articles_hash VALUES (39, 9, 'anchises', 10906); +INSERT INTO articles_hash VALUES (40, 10, 'attemper', 14976); +INSERT INTO articles_hash VALUES (41, 1, 'aznavour', 11814); +INSERT INTO articles_hash VALUES (42, 2, 'ausable', 15885); +INSERT INTO articles_hash VALUES (43, 3, 'affixal', 12723); +INSERT INTO articles_hash VALUES (44, 4, 'anteport', 16793); +INSERT INTO articles_hash VALUES (45, 5, 'afrasia', 864); +INSERT INTO articles_hash VALUES (46, 6, 'atlanta', 17702); +INSERT INTO articles_hash VALUES (47, 7, 'abeyance', 1772); +INSERT INTO articles_hash VALUES (48, 8, 'alkylic', 18610); +INSERT INTO articles_hash VALUES (49, 9, 'anyone', 2681); +INSERT INTO articles_hash VALUES (50, 10, 'anjanette', 19519); + + + +SET citus.task_executor_type TO 'real-time'; +SET citus.large_table_shard_count TO 2; +SET client_min_messages TO 'DEBUG2'; + +-- insert a single row for the test +INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); + +-- first, test zero-shard SELECT, which should return an empty row +SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; + +-- single-shard tests + +-- test simple select for a single row +SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50; + +-- get all titles by a single author +SELECT title FROM articles_hash WHERE author_id = 10; + +-- try ordering them by word count +SELECT title, word_count FROM articles_hash + WHERE author_id = 10 + ORDER BY word_count DESC NULLS LAST; + +-- look at last two articles by an author +SELECT title, id FROM articles_hash + WHERE author_id = 5 + ORDER BY id + LIMIT 2; + +-- find all articles by two authors in same shard +SELECT title, author_id FROM articles_hash + WHERE author_id = 7 OR author_id = 8 + ORDER BY author_id ASC, id; + +-- add in some grouping expressions, still on same shard +-- having queries unsupported in Citus +SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash + WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 + GROUP BY author_id + HAVING sum(word_count) > 1000 + ORDER BY sum(word_count) DESC; + +-- however having clause is supported if it goes to a single shard +SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash + WHERE author_id = 1 + GROUP BY author_id + HAVING sum(word_count) > 1000 + ORDER BY sum(word_count) DESC; + + +-- UNION/INTERSECT queries are unsupported +-- this is rejected by router planner and handled by multi_logical_planner +SELECT * FROM articles_hash WHERE author_id = 10 UNION +SELECT * FROM articles_hash WHERE author_id = 1; + +-- query is a single shard query but can't do shard pruning, +-- not router-plannable due to <= and IN +SELECT * FROM articles_hash WHERE author_id <= 1; +SELECT * FROM articles_hash WHERE author_id IN (1, 3); + +-- queries using CTEs are unsupported +WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) +SELECT title FROM articles_hash WHERE author_id = 1; + +-- queries which involve functions in FROM clause are unsupported. +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; + +-- subqueries are not supported in WHERE clause in Citus +SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a'); + +-- subqueries are supported in FROM clause but they are not router plannable +SELECT articles_hash.id,test.word_count +FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id +ORDER BY articles_hash.id; + +-- subqueries are not supported in SELECT clause +SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1) + AS special_price FROM articles_hash a; + +set citus.task_executor_type to 'router'; + +-- simple lookup query +SELECT * + FROM articles_hash + WHERE author_id = 1; + +-- below query hits a single shard, but it is not router plannable +-- still router executable +SELECT * + FROM articles_hash + WHERE author_id = 1 OR author_id = 17; + +-- below query hits two shards, not router plannable + not router executable +SELECT * + FROM articles_hash + WHERE author_id = 1 OR author_id = 18; + +-- rename the output columns +SELECT id as article_id, word_count * id as random_value + FROM articles_hash + WHERE author_id = 1; + +-- we can push down co-located joins to a single worker +-- this is not router plannable but router executable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; + +-- following join is neither router plannable, nor router executable +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, articles_single_shard_hash b + WHERE a.author_id = 10 and a.author_id = b.author_id + LIMIT 3; + +-- single shard select with limit is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + LIMIT 3; + +-- single shard select with limit + offset is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + LIMIT 2 + OFFSET 1; + +-- single shard select with limit + offset + order by is router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id desc + LIMIT 2 + OFFSET 1; + +-- single shard select with group by on non-partition column is router plannable +SELECT id + FROM articles_hash + WHERE author_id = 1 + GROUP BY id; + +-- single shard select with distinct is router plannable +SELECT distinct id + FROM articles_hash + WHERE author_id = 1; + +-- single shard aggregate is router plannable +SELECT avg(word_count) + FROM articles_hash + WHERE author_id = 2; + +-- max, min, sum, count are router plannable on single shard +SELECT max(word_count) as max, min(word_count) as min, + sum(word_count) as sum, count(word_count) as cnt + FROM articles_hash + WHERE author_id = 2; + + +-- queries with aggregates and group by supported on single shard +SELECT max(word_count) + FROM articles_hash + WHERE author_id = 1 + GROUP BY author_id; + +SET client_min_messages to 'NOTICE'; +-- error out for queries with repartition jobs +SELECT * + FROM articles_hash a, articles_hash b + WHERE a.id = b.id AND a.author_id = 1; + +-- error out for queries which hit more than 1 shards +SELECT * + FROM articles_hash + WHERE author_id >= 1 AND author_id <= 3; + +SET citus.task_executor_type TO 'real-time'; + +-- Test various filtering options for router plannable check +SET client_min_messages to 'DEBUG2'; + +-- this is definitely single shard +-- but not router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and author_id >= 1; + +-- not router plannable due to or +SELECT * + FROM articles_hash + WHERE author_id = 1 or id = 1; + +-- router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = 1 or id = 41); + +-- router plannable +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = random()::int * 0); + +-- not router plannable due to function call on the right side +SELECT * + FROM articles_hash + WHERE author_id = (random()::int * 0 + 1); + +-- not router plannable due to or +SELECT * + FROM articles_hash + WHERE author_id = 1 or id = 1; + +-- router plannable due to abs(-1) getting converted to 1 by postgresql +SELECT * + FROM articles_hash + WHERE author_id = abs(-1); + +-- not router plannable due to abs() function +SELECT * + FROM articles_hash + WHERE 1 = abs(author_id); + +-- not router plannable due to abs() function +SELECT * + FROM articles_hash + WHERE author_id = abs(author_id - 2); + +-- router plannable, function on different field +SELECT * + FROM articles_hash + WHERE author_id = 1 and (id = abs(id - 2)); + +-- not router plannable due to is true +SELECT * + FROM articles_hash + WHERE (author_id = 1) is true; + +-- router plannable, (boolean expression) = true is collapsed to (boolean expression) +SELECT * + FROM articles_hash + WHERE (author_id = 1) = true; + +-- router plannable, between operator is on another column +SELECT * + FROM articles_hash + WHERE (author_id = 1) and id between 0 and 20; + +-- router plannable, partition column expression is and'ed to rest +SELECT * + FROM articles_hash + WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s'; + +-- router plannable, order is changed +SELECT * + FROM articles_hash + WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1); + +-- router plannable +SELECT * + FROM articles_hash + WHERE (title like '%s' or title like 'a%') and (author_id = 1); + +-- router plannable +SELECT * + FROM articles_hash + WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000); + +-- window functions are supported if query is router plannable +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5; + +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5 + ORDER BY word_count DESC; + +SELECT id, MIN(id) over (order by word_count) + FROM articles_hash + WHERE author_id = 1; + +SELECT id, word_count, AVG(word_count) over (order by word_count) + FROM articles_hash + WHERE author_id = 1; + +-- window functions are not supported for not router plannable queries +SELECT id, MIN(id) over (order by word_count) + FROM articles_hash + WHERE author_id = 1 or author_id = 2; + + +-- but they are not supported for not router plannable queries +SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count + FROM articles_hash + WHERE author_id = 5 or author_id = 1; + +-- complex query hitting a single shard +SELECT + count(DISTINCT CASE + WHEN + word_count > 100 + THEN + id + ELSE + NULL + END) as c + FROM + articles_hash + WHERE + author_id = 5; + +-- same query is not router plannable if hits multiple shards +SELECT + count(DISTINCT CASE + WHEN + word_count > 100 + THEN + id + ELSE + NULL + END) as c + FROM + articles_hash + GROUP BY + author_id; + +-- queries inside transactions can be router plannable +BEGIN; +SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; +END; + +-- cursor queries inside transactions are not router plannable +BEGIN; +DECLARE test_cursor CURSOR FOR + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; +FETCH test_cursor; +FETCH test_cursor; +END; + +-- queries inside copy can be router plannable +COPY ( + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id) TO STDOUT; + +-- table creation queries inside can be router plannable +CREATE TEMP TABLE temp_articles_hash as + SELECT * + FROM articles_hash + WHERE author_id = 1 + ORDER BY id; + +-- router plannable queries may include filter for aggragates +SELECT count(*), count(*) FILTER (WHERE id < 3) + FROM articles_hash + WHERE author_id = 1; + +-- non-router plannable queries do not support filters +SELECT count(*), count(*) FILTER (WHERE id < 3) + FROM articles_hash + WHERE author_id = 1 or author_id = 2; + +-- prepare queries can be router plannable +PREPARE author_1_articles as + SELECT * + FROM articles_hash + WHERE author_id = 1; + +EXECUTE author_1_articles; + +-- parametric prepare queries can be router plannable +PREPARE author_articles(int) as + SELECT * + FROM articles_hash + WHERE author_id = $1; + +EXECUTE author_articles(1); + +-- queries inside plpgsql functions could be router plannable +CREATE OR REPLACE FUNCTION author_articles_max_id() RETURNS int AS $$ +DECLARE + max_id integer; +BEGIN + SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1 + into max_id; + return max_id; +END; +$$ LANGUAGE plpgsql; + +SELECT author_articles_max_id(); + +-- plpgsql function that return query results are not router plannable +CREATE OR REPLACE FUNCTION author_articles_id_word_count() RETURNS TABLE(id bigint, word_count int) AS $$ +DECLARE +BEGIN + RETURN QUERY + SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1; + +END; +$$ LANGUAGE plpgsql; + +SELECT * FROM author_articles_id_word_count(); + +SET client_min_messages to 'NOTICE'; + +DROP FUNCTION author_articles_max_id(); +DROP FUNCTION author_articles_id_word_count(); + +DROP TABLE articles_hash; +DROP TABLE articles_single_shard_hash; +DROP TABLE authors_hash;