diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index e2b7688ef..295fb36bf 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -42,7 +42,7 @@ bool AllModificationsCommutative = false; static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static int32 ExecuteDistributedModify(Task *task); -static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, +static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, EState *executorState, TupleDesc tupleDescriptor, DestReceiver *destination); @@ -84,10 +84,10 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task) queryDesc->estate = executorState; /* - * As it's similar to what we're doing, use a MaterialState node to store - * our state. This is used to store our tuplestore, so cursors etc. can - * work. - */ + * As it's similar to what we're doing, use a MaterialState node to store + * our state. This is used to store our tuplestore, so cursors etc. can + * work. + */ queryDesc->planstate = (PlanState *) makeNode(MaterialState); #if (PG_VERSION_NUM < 90500) @@ -321,7 +321,7 @@ ExecuteDistributedModify(Task *task) * given placement, the function attempts it on its replica. */ static void -ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, Task *task, +ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, EState *executorState, TupleDesc tupleDescriptor, DestReceiver *destination) { @@ -364,11 +364,12 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, Task *task, ExecClearTuple(tupleTableSlot); currentTupleCount++; + /* * If numberTuples is zero fetch all tuples, otherwise stop after * count tuples. */ - if (numberTuples && numberTuples == currentTupleCount) + if (tupleCount > 0 && tupleCount == currentTupleCount) { break; } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index aa0b0b626..5072aac37 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -30,6 +30,8 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ +static bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType); + /* * JobExecutorType selects the executor type for the given multiPlan using the task @@ -42,29 +44,20 @@ MultiExecutorType JobExecutorType(MultiPlan *multiPlan) { Job *job = multiPlan->workerJob; - Query *masterQuery = multiPlan->masterQuery; List *workerTaskList = job->taskList; List *workerNodeList = WorkerNodeList(); int taskCount = list_length(workerTaskList); int workerNodeCount = list_length(workerNodeList); double tasksPerNode = taskCount / ((double) workerNodeCount); int dependedJobCount = list_length(job->dependedJobList); - MultiExecutorType executorType = TaskExecutorType; + bool routerExecutablePlan = RouterExecutablePlan(multiPlan, executorType); - /* check if the first task is a modify task, short-circuit if so */ - if (taskCount > 0) + /* check if can switch to router executor */ + if (routerExecutablePlan) { - Task *firstTask = (Task *) linitial(workerTaskList); - TaskType taskType = firstTask->taskType; - - if (taskType == MODIFY_TASK || taskType == ROUTER_TASK) - { - /* router planner creates a single task */ - Assert(taskCount == 1); - - return MULTI_EXECUTOR_ROUTER; - } + ereport(DEBUG2, (errmsg("Plan is router executable"))); + return MULTI_EXECUTOR_ROUTER; } if (executorType == MULTI_EXECUTOR_REAL_TIME) @@ -104,7 +97,7 @@ JobExecutorType(MultiPlan *multiPlan) "\"task-tracker\"."))); } } - else if (executorType == MULTI_EXECUTOR_TASK_TRACKER) + else { /* if we have more tasks per node than what can be tracked, warn the user */ if (tasksPerNode >= MaxTrackedTasksPerNode) @@ -113,64 +106,83 @@ JobExecutorType(MultiPlan *multiPlan) "configured max_tracked_tasks_per_node limit"))); } } - else if (executorType == MULTI_EXECUTOR_ROUTER) - { - Task *workerTask = NULL; - List *workerDependentTaskList = NIL; - bool masterQueryHasAggregates = false; - - /* if we have repartition jobs with router executor, error out */ - if (dependedJobCount > 0) - { - ereport(ERROR, (errmsg("cannot use router executor with repartition jobs"), - errhint("Set citus.task_executor_type to " - "\"task-tracker\"."))); - } - - /* if the query hits more than one shards, error out*/ - if (taskCount != 1) - { - ereport(ERROR, (errmsg("cannot use router executor with queries that " - "hit multiple shards"), - errhint("Set citus.task_executor_type to \"real-time\" or " - "\"task-tracker\"."))); - } - - /* if the query has dependent data fetch tasks */ - workerTask = list_nth(workerTaskList, 0); - workerDependentTaskList = workerTask->dependedTaskList; - if (list_length(workerDependentTaskList) > 0) - { - ereport(ERROR, (errmsg("cannot use router executor with JOINs"), - errhint("Set citus.task_executor_type to \"real-time\" or " - "\"task-tracker\"."))); - } - - /* ORDER BY is always applied on the master table with the current planner */ - if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0) - { - ereport(ERROR, (errmsg("cannot use router executor with ORDER BY clauses"), - errhint("Set citus.task_executor_type to \"real-time\" or " - "\"task-tracker\"."))); - } - - /* - * Note that worker query having an aggregate means that the master query should have either - * an aggregate or a function expression which has to be executed for the correct results. - */ - masterQueryHasAggregates = job->jobQuery->hasAggs; - if (masterQueryHasAggregates) - { - ereport(ERROR, (errmsg("cannot use router executor with aggregates"), - errhint("Set citus.task_executor_type to \"real-time\" or " - "\"task-tracker\"."))); - } - } return executorType; } +/* + * RouterExecutablePlan returns whether a multi-plan can be executed using the + * router executor. Modify queries are always router executable, select queries + * are router executable only if executorType is real time. + */ +static bool +RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType) +{ + Job *job = multiPlan->workerJob; + TaskType taskType = TASK_TYPE_INVALID_FIRST; + Query *masterQuery = multiPlan->masterQuery; + List *workerTaskList = job->taskList; + int taskCount = list_length(workerTaskList); + int dependedJobCount = list_length(job->dependedJobList); + Task *workerTask = NULL; + List *workerDependentTaskList = NIL; + bool masterQueryHasAggregates = false; + + /* router executor cannot execute queries that hit more than one shard */ + if (taskCount != 1) + { + return false; + } + + /* check if the first task is a modify or a router task, short-circuit if so */ + workerTask = (Task *) linitial(workerTaskList); + taskType = workerTask->taskType; + if (taskType == MODIFY_TASK || taskType == ROUTER_TASK) + { + return true; + } + + if (executorType == MULTI_EXECUTOR_TASK_TRACKER) + { + return false; + } + + /* router executor cannot execute repartition jobs */ + if (dependedJobCount > 0) + { + return false; + } + + /* router executor cannot execute queries with dependent data fetch tasks */ + workerDependentTaskList = workerTask->dependedTaskList; + if (list_length(workerDependentTaskList) > 0) + { + return false; + } + + /* router executor cannot execute queries with order by */ + if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0) + { + return false; + } + + /* + * Router executor cannot execute queries with aggregates. + * Note that worker query having an aggregate means that the master query should + * have either an aggregate or a function expression which has to be executed for + * the correct results. + */ + masterQueryHasAggregates = job->jobQuery->hasAggs; + if (masterQueryHasAggregates) + { + return false; + } + + return true; +} + + /* * MaxMasterConnectionCount returns the number of connections a master can open. * A master cannot create more than a certain number of file descriptors (FDs). diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 564c91251..d468ea377 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -574,7 +574,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) { Oid namespaceId; Oid indexRelationId; - char* indexRelationName = createIndexStatement->idxname; + char *indexRelationName = createIndexStatement->idxname; if (indexRelationName == NULL) { diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 02083a03c..7a073392d 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -75,24 +75,12 @@ CreatePhysicalPlan(Query *parse) { Query *parseCopy = copyObject(parse); MultiPlan *physicalPlan = NULL; - CmdType commandType = parse->commandType; - bool routerPlannable = false; - - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE) - { - routerPlannable = true; - } - else if (TaskExecutorType == MULTI_EXECUTOR_REAL_TIME || - TaskExecutorType == MULTI_EXECUTOR_ROUTER) - { - routerPlannable = MultiRouterPlannableQuery(parseCopy); - } - + bool routerPlannable = MultiRouterPlannableQuery(parseCopy, TaskExecutorType); if (routerPlannable) { ereport(DEBUG2, (errmsg("Creating router plan"))); physicalPlan = MultiRouterPlanCreate(parseCopy); + CheckNodeIsDumpable((Node *) physicalPlan); } else { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index e1657e2d3..90b654c90 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -540,7 +540,7 @@ TargetShardInterval(Query *query) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("router executor queries must target exactly one " - "shard"))); + "shard"))); } else { @@ -664,7 +664,7 @@ RouterSelectTask(Query *query) StringInfo queryString = makeStringInfo(); uint64 shardId = INVALID_SHARD_ID; bool upsertQuery = false; - CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; + CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; FromExpr *joinTree = NULL; Assert(shardInterval != NULL); @@ -701,7 +701,8 @@ RouterSelectTask(Query *query) /* * RouterQueryJob creates a Job for the specified query to execute the - * provided single shard select task.*/ + * provided single shard select task. + */ static Job * RouterQueryJob(Query *query, Task *task) { @@ -737,11 +738,10 @@ RouterQueryJob(Query *query, Task *task) * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a select query issued on a hash * partitioned distributed table, and it has a exact match comparison on the - * partition column. This feature is enabled if task executor is set to real-time or - * router. + * partition column. This feature is enabled if task executor is set to real-time */ bool -MultiRouterPlannableQuery(Query *query) +MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType) { uint32 rangeTableId = 1; List *rangeTableList = NIL; @@ -758,6 +758,17 @@ MultiRouterPlannableQuery(Query *query) int partitionColumnReferenceCount = 0; int shardCount = 0; + if (commandType == CMD_INSERT || commandType == CMD_UPDATE || + commandType == CMD_DELETE) + { + return true; + } + + if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME) + { + return false; + } + Assert(commandType == CMD_SELECT); /* @@ -768,7 +779,7 @@ MultiRouterPlannableQuery(Query *query) * during RangeTblEntry checks. */ if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate || - query->hasRecursive || query->utilityStmt != NULL) + query->hasRecursive) { return false; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d72ae3150..2dc98dd81 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -59,7 +59,6 @@ static const struct config_enum_entry task_assignment_policy_options[] = { static const struct config_enum_entry task_executor_type_options[] = { { "real-time", MULTI_EXECUTOR_REAL_TIME, false }, { "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false }, - { "router", MULTI_EXECUTOR_ROUTER, false }, { NULL, 0, false } }; @@ -483,14 +482,13 @@ RegisterCitusConfigVariables(void) DefineCustomEnumVariable( "citus.task_executor_type", gettext_noop("Sets the executor type to be used for distributed queries."), - gettext_noop("The master node chooses between three different executor types " - "when executing a distributed query. The router executor is " - "optimal for simple key-value lookups on a single shard. The " - "real-time executor is optimal for queries that involve " - "aggregations and/or co-located joins on multiple shards. The " - "task-tracker executor is optimal for long-running, complex " - "queries that touch thousands of shards and/or that involve " - "table repartitioning."), + gettext_noop("The master node chooses between two different executor types " + "when executing a distributed query.The real-time executor is " + "optimal for simple key-value lookup queries and queries that " + "involve aggregations and/or co-located joins on multiple shards. " + "The task-tracker executor is optimal for long-running, complex " + "queries that touch thousands of shards and/or that involve table " + "repartitioning."), &TaskExecutorType, MULTI_EXECUTOR_REAL_TIME, task_executor_type_options, diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 6974b8b62..693c314e4 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -16,6 +16,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_server_executor.h" #include "nodes/parsenodes.h" @@ -30,6 +31,6 @@ #endif extern MultiPlan * MultiRouterPlanCreate(Query *query); -extern bool MultiRouterPlannableQuery(Query *query); +extern bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index d28f173cc..d1df70c83 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -91,9 +91,10 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; -SET citus.task_executor_type TO 'router'; +SET citus.task_executor_type TO 'real-time'; SELECT * FROM range_partitioned WHERE id = 32743; DEBUG: predicate pruning for shardId 103070 +DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price -------+--------+-----------+--------------------------+------+------------- 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 @@ -101,6 +102,7 @@ DEBUG: predicate pruning for shardId 103070 SELECT * FROM append_partitioned WHERE id = 414123; DEBUG: predicate pruning for shardId 103072 +DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 diff --git a/src/test/regress/expected/multi_null_minmax_value_pruning.out b/src/test/regress/expected/multi_null_minmax_value_pruning.out index 486db51fb..cb7ff7b06 100644 --- a/src/test/regress/expected/multi_null_minmax_value_pruning.out +++ b/src/test/regress/expected/multi_null_minmax_value_pruning.out @@ -19,12 +19,12 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; (1 row) -- Check that partition and join pruning works when min/max values exist -SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; +-- Adding l_orderkey = 1 to make the query not router executable +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 -DEBUG: predicate pruning for shardId 102009 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -33,7 +33,13 @@ DEBUG: predicate pruning for shardId 102009 9030 | 4 | 07-20-1998 9030 | 5 | 09-29-1998 9030 | 6 | 09-03-1998 -(6 rows) + 1 | 1 | 03-13-1996 + 1 | 2 | 04-12-1996 + 1 | 3 | 01-29-1996 + 1 | 4 | 04-21-1996 + 1 | 5 | 03-30-1996 + 1 | 6 | 01-30-1996 +(12 rows) SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index cc312078b..ea969bddd 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -4,12 +4,12 @@ -- Tests to verify that we correctly prune unreferenced shards. For this, we -- need to increase the logging verbosity of messages displayed on the client. SET client_min_messages TO DEBUG2; -SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; +-- Adding additional l_orderkey = 1 to make this query not router executable +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102010 -DEBUG: predicate pruning for shardId 102009 l_orderkey | l_linenumber | l_shipdate ------------+--------------+------------ 9030 | 1 | 09-02-1998 @@ -18,7 +18,13 @@ DEBUG: predicate pruning for shardId 102009 9030 | 4 | 07-20-1998 9030 | 5 | 09-29-1998 9030 | 6 | 09-03-1998 -(6 rows) + 1 | 1 | 03-13-1996 + 1 | 2 | 04-12-1996 + 1 | 3 | 01-29-1996 + 1 | 4 | 04-21-1996 + 1 | 5 | 03-30-1996 + 1 | 6 | 01-30-1996 +(12 rows) -- We use the l_linenumber field for the following aggregations. We need to use -- an integer type, as aggregations on numerics or big integers return numerics diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 288f15145..4d03e19f6 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -102,6 +102,7 @@ SET client_min_messages TO 'DEBUG2'; -- insert a single row for the test INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); DEBUG: Creating router plan +DEBUG: Plan is router executable -- first, test zero-shard SELECT, which should return an empty row SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; DEBUG: predicate pruning for shardId 103301 @@ -116,6 +117,7 @@ DEBUG: predicate pruning for shardId 103300 SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+-----------+------------ 50 | 10 | anjanette | 19519 @@ -125,6 +127,7 @@ DEBUG: predicate pruning for shardId 103301 SELECT title FROM articles_hash WHERE author_id = 10; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable title ------------ aggrandize @@ -140,6 +143,7 @@ SELECT title, word_count FROM articles_hash ORDER BY word_count DESC NULLS LAST; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable title | word_count ------------+------------ anjanette | 19519 @@ -156,6 +160,7 @@ SELECT title, id FROM articles_hash LIMIT 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable title | id ---------+---- aruru | 5 @@ -163,6 +168,7 @@ DEBUG: predicate pruning for shardId 103301 (2 rows) -- find all articles by two authors in same shard +-- but plan is not router executable due to order by SELECT title, author_id FROM articles_hash WHERE author_id = 7 OR author_id = 8 ORDER BY author_id ASC, id; @@ -181,6 +187,25 @@ DEBUG: predicate pruning for shardId 103301 alkylic | 8 (10 rows) +-- same query is router executable with no order by +SELECT title, author_id FROM articles_hash + WHERE author_id = 7 OR author_id = 8; +DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable + title | author_id +-------------+----------- + aseptic | 7 + agatized | 8 + auriga | 7 + assembly | 8 + arsenous | 7 + aerophyte | 8 + archduchies | 7 + anatine | 8 + abeyance | 7 + alkylic | 8 +(10 rows) + -- add in some grouping expressions, still on same shard -- having queries unsupported in Citus SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash @@ -198,6 +223,7 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash ORDER BY sum(word_count) DESC; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable author_id | corpus_size -----------+------------- 1 | 35894 @@ -289,13 +315,13 @@ SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id L AS special_price FROM articles_hash a; ERROR: cannot perform distributed planning on this query DETAIL: Subqueries other than in from-clause are currently unsupported -set citus.task_executor_type to 'router'; -- simple lookup query SELECT * FROM articles_hash WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -311,6 +337,7 @@ SELECT * FROM articles_hash WHERE author_id = 1 OR author_id = 17; DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -321,17 +348,26 @@ DEBUG: predicate pruning for shardId 103301 (5 rows) -- below query hits two shards, not router plannable + not router executable +-- handled by real-time executor SELECT * FROM articles_hash WHERE author_id = 1 OR author_id = 18; -ERROR: cannot use router executor with queries that hit multiple shards -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + -- rename the output columns SELECT id as article_id, word_count * id as random_value FROM articles_hash WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable article_id | random_value ------------+-------------- 1 | 9572 @@ -343,6 +379,7 @@ DEBUG: predicate pruning for shardId 103301 -- we can push down co-located joins to a single worker -- this is not router plannable but router executable +-- handled by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_hash b WHERE a.author_id = 10 and a.author_id = b.author_id @@ -350,6 +387,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 103301 DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] +DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 17277 @@ -364,8 +402,13 @@ SELECT a.author_id as first_author, b.word_count as second_word_count LIMIT 3; DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 103301 -ERROR: cannot use router executor with JOINs -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + first_author | second_word_count +--------------+------------------- + 10 | 19519 + 10 | 19519 + 10 | 19519 +(3 rows) + -- single shard select with limit is router plannable SELECT * FROM articles_hash @@ -373,6 +416,7 @@ SELECT * LIMIT 3; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -388,6 +432,7 @@ SELECT * OFFSET 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 11 | 1 | alamo | 1347 @@ -403,6 +448,7 @@ SELECT * OFFSET 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 31 | 1 | athwartships | 7271 @@ -417,6 +463,7 @@ SELECT id GROUP BY id; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id ---- 41 @@ -432,6 +479,7 @@ SELECT distinct id WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id ---- 41 @@ -447,6 +495,7 @@ SELECT avg(word_count) WHERE author_id = 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103300 +DEBUG: Plan is router executable avg -------------------- 12356.400000000000 @@ -459,6 +508,7 @@ SELECT max(word_count) as max, min(word_count) as min, WHERE author_id = 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103300 +DEBUG: Plan is router executable max | min | sum | cnt -------+------+-------+----- 18185 | 2728 | 61782 | 5 @@ -471,6 +521,7 @@ SELECT max(word_count) GROUP BY author_id; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable max ------- 11814 @@ -481,14 +532,32 @@ SET client_min_messages to 'NOTICE'; SELECT * FROM articles_hash a, articles_hash b WHERE a.id = b.id AND a.author_id = 1; -ERROR: cannot use router executor with repartition jobs +ERROR: cannot use real time executor with repartition jobs HINT: Set citus.task_executor_type to "task-tracker". --- error out for queries which hit more than 1 shards +-- queries which hit more than 1 shards are not router plannable or executable +-- handled by real-time executor SELECT * FROM articles_hash WHERE author_id >= 1 AND author_id <= 3; -ERROR: cannot use router executor with queries that hit multiple shards -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 3 | 3 | asternal | 10480 + 11 | 1 | alamo | 1347 + 13 | 3 | aseyev | 2255 + 21 | 1 | arcading | 5890 + 23 | 3 | abhorring | 6799 + 31 | 1 | athwartships | 7271 + 33 | 3 | autochrome | 8180 + 41 | 1 | aznavour | 11814 + 43 | 3 | affixal | 12723 + 2 | 2 | abducing | 13642 + 12 | 2 | archiblast | 18185 + 22 | 2 | antipope | 2728 + 32 | 2 | amazon | 11342 + 42 | 2 | ausable | 15885 +(15 rows) + SET citus.task_executor_type TO 'real-time'; -- Test various filtering options for router plannable check SET client_min_messages to 'DEBUG2'; @@ -498,6 +567,7 @@ SELECT * FROM articles_hash WHERE author_id = 1 and author_id >= 1; DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -527,6 +597,7 @@ SELECT * WHERE author_id = 1 and (id = 1 or id = 41); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -539,6 +610,7 @@ SELECT * WHERE author_id = 1 and (id = random()::int * 0); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+-------+------------ (0 rows) @@ -576,6 +648,7 @@ SELECT * WHERE author_id = abs(-1); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -617,6 +690,7 @@ SELECT * WHERE author_id = 1 and (id = abs(id - 2)); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -641,6 +715,7 @@ SELECT * WHERE (author_id = 1) = true; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -656,6 +731,7 @@ SELECT * WHERE (author_id = 1) and id between 0 and 20; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -668,6 +744,7 @@ SELECT * WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s'; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -680,6 +757,7 @@ SELECT * WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -692,6 +770,7 @@ SELECT * WHERE (title like '%s' or title like 'a%') and (author_id = 1); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -707,6 +786,7 @@ SELECT * WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -720,6 +800,7 @@ SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count WHERE author_id = 5; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable prev | title | word_count ----------+----------+------------ | afrasia | 864 @@ -735,6 +816,7 @@ SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count ORDER BY word_count DESC; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable prev | title | word_count ----------+----------+------------ aminate | aruru | 11389 @@ -749,6 +831,7 @@ SELECT id, MIN(id) over (order by word_count) WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | min ----+----- 11 | 11 @@ -763,6 +846,7 @@ SELECT id, word_count, AVG(word_count) over (order by word_count) WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | word_count | avg ----+------------+----------------------- 11 | 1347 | 1347.0000000000000000 @@ -772,6 +856,21 @@ DEBUG: predicate pruning for shardId 103301 41 | 11814 | 7178.8000000000000000 (5 rows) +SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count) + FROM articles_hash + WHERE author_id = 1; +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable + word_count | rank +------------+------ + 1347 | 1 + 5890 | 2 + 7271 | 3 + 9572 | 4 + 11814 | 5 +(5 rows) + -- window functions are not supported for not router plannable queries SELECT id, MIN(id) over (order by word_count) FROM articles_hash @@ -801,6 +900,7 @@ SELECT author_id = 5; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable c --- 5 @@ -831,6 +931,7 @@ SELECT * ORDER BY id; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -841,14 +942,16 @@ DEBUG: predicate pruning for shardId 103301 (5 rows) END; --- cursor queries inside transactions are not router plannable +-- cursor queries are router plannable BEGIN; DECLARE test_cursor CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 ORDER BY id; +DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable FETCH test_cursor; id | author_id | title | word_count ----+-----------+----------+------------ @@ -870,6 +973,7 @@ COPY ( ORDER BY id) TO STDOUT; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable 1 1 arsenous 9572 11 1 alamo 1347 21 1 arcading 5890 @@ -884,12 +988,14 @@ CREATE TEMP TABLE temp_articles_hash as ORDER BY id; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable -- router plannable queries may include filter for aggragates SELECT count(*), count(*) FILTER (WHERE id < 3) FROM articles_hash WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable count | count -------+------- 5 | 1 @@ -912,6 +1018,7 @@ PREPARE author_1_articles as EXECUTE author_1_articles; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -929,6 +1036,7 @@ PREPARE author_articles(int) as EXECUTE author_articles(1); DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -955,6 +1063,10 @@ CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_max_id() line 5 at SQL statement DEBUG: predicate pruning for shardId 103301 +CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_max_id() line 5 at SQL statement +DEBUG: Plan is router executable CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_max_id() line 5 at SQL statement @@ -985,8 +1097,46 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +DEBUG: Plan is router executable +CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY ERROR: scan directions other than forward scans are unsupported CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +-- router planner/executor is disabled for task-tracker executor +-- following query is router plannable, but router planner is disabled +SET citus.task_executor_type to 'task-tracker'; +SELECT id + FROM articles_hash + WHERE author_id = 1; +DEBUG: predicate pruning for shardId 103301 + id +---- + 1 + 11 + 21 + 31 + 41 +(5 rows) + +-- insert query is router plannable even under task-tracker +INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814); +DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 103301 +DEBUG: Plan is router executable +-- verify insert is successfull (not router plannable and executable) +SELECT id + FROM articles_hash + WHERE author_id = 1; +DEBUG: predicate pruning for shardId 103301 + id +---- + 1 + 11 + 21 + 31 + 41 + 51 +(6 rows) + SET client_min_messages to 'NOTICE'; DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_id_word_count(); diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index fe0904b71..31ef27012 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -4,7 +4,7 @@ CREATE TABLE articles ( id bigint NOT NULL, author_id bigint NOT NULL, - title text NOT NULL, + title varchar(20) NOT NULL, word_count integer NOT NULL CHECK (word_count > 0) ); -- this table is used in a CTE test @@ -315,15 +315,16 @@ ERROR: cannot perform distributed planning on this query DETAIL: Having qual is currently unsupported -- now, test the cases where Citus do or do not need to create -- the master queries -SET citus.task_executor_type TO 'router'; SET citus.large_table_shard_count TO 2; SET client_min_messages TO 'DEBUG2'; +SET citus.task_executor_type TO 'real-time'; -- start with the simple lookup query SELECT * FROM articles WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -338,6 +339,7 @@ SELECT * FROM articles WHERE author_id = 1 OR author_id = 17; DEBUG: predicate pruning for shardId 103094 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ 1 | 1 | arsenous | 9572 @@ -351,14 +353,22 @@ DEBUG: predicate pruning for shardId 103094 SELECT * FROM articles WHERE author_id = 1 OR author_id = 18; -ERROR: cannot use router executor with queries that hit multiple shards -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + -- rename the output columns on a no master query case SELECT id as article_id, word_count * id as random_value FROM articles WHERE author_id = 1; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 +DEBUG: Plan is router executable article_id | random_value ------------+-------------- 1 | 9572 @@ -377,6 +387,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 103094 DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] +DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 17277 @@ -384,15 +395,21 @@ DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] 10 | 6363 (3 rows) --- now show that JOINs don't work with multiple tables +-- now show that JOINs with multiple tables are not router executable +-- they are executed by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 103094 -ERROR: cannot use router executor with JOINs -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". + first_author | second_word_count +--------------+------------------- + 10 | 19519 + 10 | 19519 + 10 | 19519 +(3 rows) + -- do not create the master query for LIMIT on a single shard SELECT SELECT * FROM articles @@ -400,6 +417,7 @@ SELECT * LIMIT 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 +DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ 1 | 1 | arsenous | 9572 @@ -415,6 +433,7 @@ SELECT id GROUP BY id; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103094 +DEBUG: Plan is router executable id ---- 41 @@ -426,6 +445,7 @@ DEBUG: predicate pruning for shardId 103094 -- copying from a single shard table does not require the master query COPY articles_single_shard TO stdout; +DEBUG: Plan is router executable 50 10 anjanette 19519 -- error out for queries with aggregates SELECT avg(word_count) @@ -433,66 +453,26 @@ SELECT avg(word_count) WHERE author_id = 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103093 +DEBUG: Plan is router executable avg -------------------- 12356.400000000000 (1 row) -- max, min, sum, count is somehow implemented --- differently in distributed planning but, still error out +-- differently in distributed planning SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; DEBUG: Creating router plan DEBUG: predicate pruning for shardId 103093 +DEBUG: Plan is router executable max | min | sum | cnt -------+------+-------+----- 18185 | 2728 | 61782 | 5 (1 row) --- error out for queries with ORDER BY -SELECT * - FROM articles - WHERE author_id = 1 - ORDER BY word_count; -DEBUG: Creating router plan -DEBUG: predicate pruning for shardId 103094 - id | author_id | title | word_count -----+-----------+--------------+------------ - 11 | 1 | alamo | 1347 - 21 | 1 | arcading | 5890 - 31 | 1 | athwartships | 7271 - 1 | 1 | arsenous | 9572 - 41 | 1 | aznavour | 11814 -(5 rows) - --- error out for queries with ORDER BY and LIMIT -SELECT * - FROM articles - WHERE author_id = 1 - ORDER BY word_count - LIMIT 2; -DEBUG: Creating router plan -DEBUG: predicate pruning for shardId 103094 - id | author_id | title | word_count -----+-----------+----------+------------ - 11 | 1 | alamo | 1347 - 21 | 1 | arcading | 5890 -(2 rows) - --- error out for queries with aggregates and GROUP BY -SELECT max(word_count) - FROM articles - WHERE author_id = 1 - GROUP BY author_id; -DEBUG: Creating router plan -DEBUG: predicate pruning for shardId 103094 - max -------- - 11814 -(1 row) - -- error out for queries with repartition jobs SELECT * FROM articles a, articles b @@ -526,13 +506,6 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 9 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 -ERROR: cannot use router executor with repartition jobs +ERROR: cannot use real time executor with repartition jobs HINT: Set citus.task_executor_type to "task-tracker". --- error out for queries which hit more than 1 shards -SELECT * - FROM articles - WHERE author_id >= 1 AND author_id <= 3; -ERROR: cannot use router executor with queries that hit multiple shards -HINT: Set citus.task_executor_type to "real-time" or "task-tracker". SET client_min_messages to 'NOTICE'; -SET citus.task_executor_type TO 'real-time'; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index dc439f017..a619fdf28 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -69,7 +69,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; -SET citus.task_executor_type TO 'router'; +SET citus.task_executor_type TO 'real-time'; SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM append_partitioned WHERE id = 414123; SET client_min_messages TO DEFAULT; diff --git a/src/test/regress/sql/multi_null_minmax_value_pruning.sql b/src/test/regress/sql/multi_null_minmax_value_pruning.sql index 9604f9ab4..b471e926e 100644 --- a/src/test/regress/sql/multi_null_minmax_value_pruning.sql +++ b/src/test/regress/sql/multi_null_minmax_value_pruning.sql @@ -15,8 +15,8 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102009; SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; -- Check that partition and join pruning works when min/max values exist - -SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; +-- Adding l_orderkey = 1 to make the query not router executable +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders WHERE l_orderkey = o_orderkey; diff --git a/src/test/regress/sql/multi_partition_pruning.sql b/src/test/regress/sql/multi_partition_pruning.sql index d3c407bf9..3c99516ab 100644 --- a/src/test/regress/sql/multi_partition_pruning.sql +++ b/src/test/regress/sql/multi_partition_pruning.sql @@ -7,7 +7,8 @@ SET client_min_messages TO DEBUG2; -SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; +-- Adding additional l_orderkey = 1 to make this query not router executable +SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1; -- We use the l_linenumber field for the following aggregations. We need to use -- an integer type, as aggregations on numerics or big integers return numerics diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 341e0c0a8..7bbc9a985 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -114,10 +114,15 @@ SELECT title, id FROM articles_hash LIMIT 2; -- find all articles by two authors in same shard +-- but plan is not router executable due to order by SELECT title, author_id FROM articles_hash WHERE author_id = 7 OR author_id = 8 ORDER BY author_id ASC, id; +-- same query is router executable with no order by +SELECT title, author_id FROM articles_hash + WHERE author_id = 7 OR author_id = 8; + -- add in some grouping expressions, still on same shard -- having queries unsupported in Citus SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash @@ -163,8 +168,6 @@ ORDER BY articles_hash.id; SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles_hash a; -set citus.task_executor_type to 'router'; - -- simple lookup query SELECT * FROM articles_hash @@ -177,6 +180,7 @@ SELECT * WHERE author_id = 1 OR author_id = 17; -- below query hits two shards, not router plannable + not router executable +-- handled by real-time executor SELECT * FROM articles_hash WHERE author_id = 1 OR author_id = 18; @@ -188,6 +192,7 @@ SELECT id as article_id, word_count * id as random_value -- we can push down co-located joins to a single worker -- this is not router plannable but router executable +-- handled by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_hash b WHERE a.author_id = 10 and a.author_id = b.author_id @@ -255,7 +260,8 @@ SELECT * FROM articles_hash a, articles_hash b WHERE a.id = b.id AND a.author_id = 1; --- error out for queries which hit more than 1 shards +-- queries which hit more than 1 shards are not router plannable or executable +-- handled by real-time executor SELECT * FROM articles_hash WHERE author_id >= 1 AND author_id <= 3; @@ -369,6 +375,10 @@ SELECT id, word_count, AVG(word_count) over (order by word_count) FROM articles_hash WHERE author_id = 1; +SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count) + FROM articles_hash + WHERE author_id = 1; + -- window functions are not supported for not router plannable queries SELECT id, MIN(id) over (order by word_count) FROM articles_hash @@ -418,7 +428,7 @@ SELECT * ORDER BY id; END; --- cursor queries inside transactions are not router plannable +-- cursor queries are router plannable BEGIN; DECLARE test_cursor CURSOR FOR SELECT * @@ -497,6 +507,21 @@ $$ LANGUAGE plpgsql; SELECT * FROM author_articles_id_word_count(); +-- router planner/executor is disabled for task-tracker executor +-- following query is router plannable, but router planner is disabled +SET citus.task_executor_type to 'task-tracker'; +SELECT id + FROM articles_hash + WHERE author_id = 1; + +-- insert query is router plannable even under task-tracker +INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814); + +-- verify insert is successfull (not router plannable and executable) +SELECT id + FROM articles_hash + WHERE author_id = 1; + SET client_min_messages to 'NOTICE'; DROP FUNCTION author_articles_max_id(); diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 795026290..a2ccca852 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -5,7 +5,7 @@ CREATE TABLE articles ( id bigint NOT NULL, author_id bigint NOT NULL, - title text NOT NULL, + title varchar(20) NOT NULL, word_count integer NOT NULL CHECK (word_count > 0) ); @@ -188,9 +188,9 @@ SELECT author_id FROM articles -- now, test the cases where Citus do or do not need to create -- the master queries -SET citus.task_executor_type TO 'router'; SET citus.large_table_shard_count TO 2; SET client_min_messages TO 'DEBUG2'; +SET citus.task_executor_type TO 'real-time'; -- start with the simple lookup query SELECT * @@ -219,7 +219,8 @@ SELECT a.author_id as first_author, b.word_count as second_word_count WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; --- now show that JOINs don't work with multiple tables +-- now show that JOINs with multiple tables are not router executable +-- they are executed by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id @@ -248,40 +249,15 @@ SELECT avg(word_count) WHERE author_id = 2; -- max, min, sum, count is somehow implemented --- differently in distributed planning but, still error out +-- differently in distributed planning SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; --- error out for queries with ORDER BY -SELECT * - FROM articles - WHERE author_id = 1 - ORDER BY word_count; - --- error out for queries with ORDER BY and LIMIT -SELECT * - FROM articles - WHERE author_id = 1 - ORDER BY word_count - LIMIT 2; - --- error out for queries with aggregates and GROUP BY -SELECT max(word_count) - FROM articles - WHERE author_id = 1 - GROUP BY author_id; - -- error out for queries with repartition jobs SELECT * FROM articles a, articles b WHERE a.id = b.id AND a.author_id = 1; --- error out for queries which hit more than 1 shards -SELECT * - FROM articles - WHERE author_id >= 1 AND author_id <= 3; - SET client_min_messages to 'NOTICE'; -SET citus.task_executor_type TO 'real-time';