Add dynamic executor selection

- non-router plannable queries can be executed
  by router executor if they satisfy the criteria
- router executor is removed from configuration,
  now task executor can not be set to router
- removed some tests that error out for router executor
pull/427/head
Murat Tuncer 2016-04-19 09:29:10 +03:00
parent 938546b938
commit a88d3ecd4e
17 changed files with 369 additions and 219 deletions

View File

@ -42,7 +42,7 @@ bool AllModificationsCommutative = false;
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static int32 ExecuteDistributedModify(Task *task); static int32 ExecuteDistributedModify(Task *task);
static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount,
Task *task, EState *executorState, Task *task, EState *executorState,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
DestReceiver *destination); DestReceiver *destination);
@ -84,10 +84,10 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
queryDesc->estate = executorState; queryDesc->estate = executorState;
/* /*
* As it's similar to what we're doing, use a MaterialState node to store * As it's similar to what we're doing, use a MaterialState node to store
* our state. This is used to store our tuplestore, so cursors etc. can * our state. This is used to store our tuplestore, so cursors etc. can
* work. * work.
*/ */
queryDesc->planstate = (PlanState *) makeNode(MaterialState); queryDesc->planstate = (PlanState *) makeNode(MaterialState);
#if (PG_VERSION_NUM < 90500) #if (PG_VERSION_NUM < 90500)
@ -321,7 +321,7 @@ ExecuteDistributedModify(Task *task)
* given placement, the function attempts it on its replica. * given placement, the function attempts it on its replica.
*/ */
static void static void
ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, Task *task, ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task,
EState *executorState, TupleDesc tupleDescriptor, EState *executorState, TupleDesc tupleDescriptor,
DestReceiver *destination) DestReceiver *destination)
{ {
@ -364,11 +364,12 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 numberTuples, Task *task,
ExecClearTuple(tupleTableSlot); ExecClearTuple(tupleTableSlot);
currentTupleCount++; currentTupleCount++;
/* /*
* If numberTuples is zero fetch all tuples, otherwise stop after * If numberTuples is zero fetch all tuples, otherwise stop after
* count tuples. * count tuples.
*/ */
if (numberTuples && numberTuples == currentTupleCount) if (tupleCount > 0 && tupleCount == currentTupleCount)
{ {
break; break;
} }

View File

@ -30,6 +30,8 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */ int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
static bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
/* /*
* JobExecutorType selects the executor type for the given multiPlan using the task * JobExecutorType selects the executor type for the given multiPlan using the task
@ -42,29 +44,20 @@ MultiExecutorType
JobExecutorType(MultiPlan *multiPlan) JobExecutorType(MultiPlan *multiPlan)
{ {
Job *job = multiPlan->workerJob; Job *job = multiPlan->workerJob;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList; List *workerTaskList = job->taskList;
List *workerNodeList = WorkerNodeList(); List *workerNodeList = WorkerNodeList();
int taskCount = list_length(workerTaskList); int taskCount = list_length(workerTaskList);
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
double tasksPerNode = taskCount / ((double) workerNodeCount); double tasksPerNode = taskCount / ((double) workerNodeCount);
int dependedJobCount = list_length(job->dependedJobList); int dependedJobCount = list_length(job->dependedJobList);
MultiExecutorType executorType = TaskExecutorType; MultiExecutorType executorType = TaskExecutorType;
bool routerExecutablePlan = RouterExecutablePlan(multiPlan, executorType);
/* check if the first task is a modify task, short-circuit if so */ /* check if can switch to router executor */
if (taskCount > 0) if (routerExecutablePlan)
{ {
Task *firstTask = (Task *) linitial(workerTaskList); ereport(DEBUG2, (errmsg("Plan is router executable")));
TaskType taskType = firstTask->taskType; return MULTI_EXECUTOR_ROUTER;
if (taskType == MODIFY_TASK || taskType == ROUTER_TASK)
{
/* router planner creates a single task */
Assert(taskCount == 1);
return MULTI_EXECUTOR_ROUTER;
}
} }
if (executorType == MULTI_EXECUTOR_REAL_TIME) if (executorType == MULTI_EXECUTOR_REAL_TIME)
@ -104,7 +97,7 @@ JobExecutorType(MultiPlan *multiPlan)
"\"task-tracker\"."))); "\"task-tracker\".")));
} }
} }
else if (executorType == MULTI_EXECUTOR_TASK_TRACKER) else
{ {
/* if we have more tasks per node than what can be tracked, warn the user */ /* if we have more tasks per node than what can be tracked, warn the user */
if (tasksPerNode >= MaxTrackedTasksPerNode) if (tasksPerNode >= MaxTrackedTasksPerNode)
@ -113,64 +106,83 @@ JobExecutorType(MultiPlan *multiPlan)
"configured max_tracked_tasks_per_node limit"))); "configured max_tracked_tasks_per_node limit")));
} }
} }
else if (executorType == MULTI_EXECUTOR_ROUTER)
{
Task *workerTask = NULL;
List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false;
/* if we have repartition jobs with router executor, error out */
if (dependedJobCount > 0)
{
ereport(ERROR, (errmsg("cannot use router executor with repartition jobs"),
errhint("Set citus.task_executor_type to "
"\"task-tracker\".")));
}
/* if the query hits more than one shards, error out*/
if (taskCount != 1)
{
ereport(ERROR, (errmsg("cannot use router executor with queries that "
"hit multiple shards"),
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
}
/* if the query has dependent data fetch tasks */
workerTask = list_nth(workerTaskList, 0);
workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0)
{
ereport(ERROR, (errmsg("cannot use router executor with JOINs"),
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
}
/* ORDER BY is always applied on the master table with the current planner */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
ereport(ERROR, (errmsg("cannot use router executor with ORDER BY clauses"),
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
}
/*
* Note that worker query having an aggregate means that the master query should have either
* an aggregate or a function expression which has to be executed for the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
ereport(ERROR, (errmsg("cannot use router executor with aggregates"),
errhint("Set citus.task_executor_type to \"real-time\" or "
"\"task-tracker\".")));
}
}
return executorType; return executorType;
} }
/*
* RouterExecutablePlan returns whether a multi-plan can be executed using the
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
static bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{
Job *job = multiPlan->workerJob;
TaskType taskType = TASK_TYPE_INVALID_FIRST;
Query *masterQuery = multiPlan->masterQuery;
List *workerTaskList = job->taskList;
int taskCount = list_length(workerTaskList);
int dependedJobCount = list_length(job->dependedJobList);
Task *workerTask = NULL;
List *workerDependentTaskList = NIL;
bool masterQueryHasAggregates = false;
/* router executor cannot execute queries that hit more than one shard */
if (taskCount != 1)
{
return false;
}
/* check if the first task is a modify or a router task, short-circuit if so */
workerTask = (Task *) linitial(workerTaskList);
taskType = workerTask->taskType;
if (taskType == MODIFY_TASK || taskType == ROUTER_TASK)
{
return true;
}
if (executorType == MULTI_EXECUTOR_TASK_TRACKER)
{
return false;
}
/* router executor cannot execute repartition jobs */
if (dependedJobCount > 0)
{
return false;
}
/* router executor cannot execute queries with dependent data fetch tasks */
workerDependentTaskList = workerTask->dependedTaskList;
if (list_length(workerDependentTaskList) > 0)
{
return false;
}
/* router executor cannot execute queries with order by */
if (masterQuery != NULL && list_length(masterQuery->sortClause) > 0)
{
return false;
}
/*
* Router executor cannot execute queries with aggregates.
* Note that worker query having an aggregate means that the master query should
* have either an aggregate or a function expression which has to be executed for
* the correct results.
*/
masterQueryHasAggregates = job->jobQuery->hasAggs;
if (masterQueryHasAggregates)
{
return false;
}
return true;
}
/* /*
* MaxMasterConnectionCount returns the number of connections a master can open. * MaxMasterConnectionCount returns the number of connections a master can open.
* A master cannot create more than a certain number of file descriptors (FDs). * A master cannot create more than a certain number of file descriptors (FDs).

View File

@ -574,7 +574,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
{ {
Oid namespaceId; Oid namespaceId;
Oid indexRelationId; Oid indexRelationId;
char* indexRelationName = createIndexStatement->idxname; char *indexRelationName = createIndexStatement->idxname;
if (indexRelationName == NULL) if (indexRelationName == NULL)
{ {

View File

@ -75,24 +75,12 @@ CreatePhysicalPlan(Query *parse)
{ {
Query *parseCopy = copyObject(parse); Query *parseCopy = copyObject(parse);
MultiPlan *physicalPlan = NULL; MultiPlan *physicalPlan = NULL;
CmdType commandType = parse->commandType; bool routerPlannable = MultiRouterPlannableQuery(parseCopy, TaskExecutorType);
bool routerPlannable = false;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
routerPlannable = true;
}
else if (TaskExecutorType == MULTI_EXECUTOR_REAL_TIME ||
TaskExecutorType == MULTI_EXECUTOR_ROUTER)
{
routerPlannable = MultiRouterPlannableQuery(parseCopy);
}
if (routerPlannable) if (routerPlannable)
{ {
ereport(DEBUG2, (errmsg("Creating router plan"))); ereport(DEBUG2, (errmsg("Creating router plan")));
physicalPlan = MultiRouterPlanCreate(parseCopy); physicalPlan = MultiRouterPlanCreate(parseCopy);
CheckNodeIsDumpable((Node *) physicalPlan);
} }
else else
{ {

View File

@ -540,7 +540,7 @@ TargetShardInterval(Query *query)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("router executor queries must target exactly one " errmsg("router executor queries must target exactly one "
"shard"))); "shard")));
} }
else else
{ {
@ -664,7 +664,7 @@ RouterSelectTask(Query *query)
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
bool upsertQuery = false; bool upsertQuery = false;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
FromExpr *joinTree = NULL; FromExpr *joinTree = NULL;
Assert(shardInterval != NULL); Assert(shardInterval != NULL);
@ -701,7 +701,8 @@ RouterSelectTask(Query *query)
/* /*
* RouterQueryJob creates a Job for the specified query to execute the * RouterQueryJob creates a Job for the specified query to execute the
* provided single shard select task.*/ * provided single shard select task.
*/
static Job * static Job *
RouterQueryJob(Query *query, Task *task) RouterQueryJob(Query *query, Task *task)
{ {
@ -737,11 +738,10 @@ RouterQueryJob(Query *query, Task *task)
* MultiRouterPlannableQuery returns true if given query can be router plannable. * MultiRouterPlannableQuery returns true if given query can be router plannable.
* The query is router plannable if it is a select query issued on a hash * The query is router plannable if it is a select query issued on a hash
* partitioned distributed table, and it has a exact match comparison on the * partitioned distributed table, and it has a exact match comparison on the
* partition column. This feature is enabled if task executor is set to real-time or * partition column. This feature is enabled if task executor is set to real-time
* router.
*/ */
bool bool
MultiRouterPlannableQuery(Query *query) MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
{ {
uint32 rangeTableId = 1; uint32 rangeTableId = 1;
List *rangeTableList = NIL; List *rangeTableList = NIL;
@ -758,6 +758,17 @@ MultiRouterPlannableQuery(Query *query)
int partitionColumnReferenceCount = 0; int partitionColumnReferenceCount = 0;
int shardCount = 0; int shardCount = 0;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
return true;
}
if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME)
{
return false;
}
Assert(commandType == CMD_SELECT); Assert(commandType == CMD_SELECT);
/* /*
@ -768,7 +779,7 @@ MultiRouterPlannableQuery(Query *query)
* during RangeTblEntry checks. * during RangeTblEntry checks.
*/ */
if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate || if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate ||
query->hasRecursive || query->utilityStmt != NULL) query->hasRecursive)
{ {
return false; return false;
} }

View File

@ -59,7 +59,6 @@ static const struct config_enum_entry task_assignment_policy_options[] = {
static const struct config_enum_entry task_executor_type_options[] = { static const struct config_enum_entry task_executor_type_options[] = {
{ "real-time", MULTI_EXECUTOR_REAL_TIME, false }, { "real-time", MULTI_EXECUTOR_REAL_TIME, false },
{ "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false }, { "task-tracker", MULTI_EXECUTOR_TASK_TRACKER, false },
{ "router", MULTI_EXECUTOR_ROUTER, false },
{ NULL, 0, false } { NULL, 0, false }
}; };
@ -483,14 +482,13 @@ RegisterCitusConfigVariables(void)
DefineCustomEnumVariable( DefineCustomEnumVariable(
"citus.task_executor_type", "citus.task_executor_type",
gettext_noop("Sets the executor type to be used for distributed queries."), gettext_noop("Sets the executor type to be used for distributed queries."),
gettext_noop("The master node chooses between three different executor types " gettext_noop("The master node chooses between two different executor types "
"when executing a distributed query. The router executor is " "when executing a distributed query.The real-time executor is "
"optimal for simple key-value lookups on a single shard. The " "optimal for simple key-value lookup queries and queries that "
"real-time executor is optimal for queries that involve " "involve aggregations and/or co-located joins on multiple shards. "
"aggregations and/or co-located joins on multiple shards. The " "The task-tracker executor is optimal for long-running, complex "
"task-tracker executor is optimal for long-running, complex " "queries that touch thousands of shards and/or that involve table "
"queries that touch thousands of shards and/or that involve " "repartitioning."),
"table repartitioning."),
&TaskExecutorType, &TaskExecutorType,
MULTI_EXECUTOR_REAL_TIME, MULTI_EXECUTOR_REAL_TIME,
task_executor_type_options, task_executor_type_options,

View File

@ -16,6 +16,7 @@
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -30,6 +31,6 @@
#endif #endif
extern MultiPlan * MultiRouterPlanCreate(Query *query); extern MultiPlan * MultiRouterPlanCreate(Query *query);
extern bool MultiRouterPlannableQuery(Query *query); extern bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -91,9 +91,10 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'router'; SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: predicate pruning for shardId 103070 DEBUG: predicate pruning for shardId 103070
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
-------+--------+-----------+--------------------------+------+------------- -------+--------+-----------+--------------------------+------+-------------
32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69
@ -101,6 +102,7 @@ DEBUG: predicate pruning for shardId 103070
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
DEBUG: predicate pruning for shardId 103072 DEBUG: predicate pruning for shardId 103072
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price id | symbol | bidder_id | placed_at | kind | limit_price
--------+--------+-----------+--------------------------+------+------------- --------+--------+-----------+--------------------------+------+-------------
414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69

View File

@ -19,12 +19,12 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010;
(1 row) (1 row)
-- Check that partition and join pruning works when min/max values exist -- Check that partition and join pruning works when min/max values exist
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding l_orderkey = 1 to make the query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102014
DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102013
DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102011
DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102010
DEBUG: predicate pruning for shardId 102009
l_orderkey | l_linenumber | l_shipdate l_orderkey | l_linenumber | l_shipdate
------------+--------------+------------ ------------+--------------+------------
9030 | 1 | 09-02-1998 9030 | 1 | 09-02-1998
@ -33,7 +33,13 @@ DEBUG: predicate pruning for shardId 102009
9030 | 4 | 07-20-1998 9030 | 4 | 07-20-1998
9030 | 5 | 09-29-1998 9030 | 5 | 09-29-1998
9030 | 6 | 09-03-1998 9030 | 6 | 09-03-1998
(6 rows) 1 | 1 | 03-13-1996
1 | 2 | 04-12-1996
1 | 3 | 01-29-1996
1 | 4 | 04-21-1996
1 | 5 | 03-30-1996
1 | 6 | 01-30-1996
(12 rows)
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey; WHERE l_orderkey = o_orderkey;

View File

@ -4,12 +4,12 @@
-- Tests to verify that we correctly prune unreferenced shards. For this, we -- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client. -- need to increase the logging verbosity of messages displayed on the client.
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
DEBUG: predicate pruning for shardId 102014 DEBUG: predicate pruning for shardId 102014
DEBUG: predicate pruning for shardId 102013 DEBUG: predicate pruning for shardId 102013
DEBUG: predicate pruning for shardId 102011 DEBUG: predicate pruning for shardId 102011
DEBUG: predicate pruning for shardId 102010 DEBUG: predicate pruning for shardId 102010
DEBUG: predicate pruning for shardId 102009
l_orderkey | l_linenumber | l_shipdate l_orderkey | l_linenumber | l_shipdate
------------+--------------+------------ ------------+--------------+------------
9030 | 1 | 09-02-1998 9030 | 1 | 09-02-1998
@ -18,7 +18,13 @@ DEBUG: predicate pruning for shardId 102009
9030 | 4 | 07-20-1998 9030 | 4 | 07-20-1998
9030 | 5 | 09-29-1998 9030 | 5 | 09-29-1998
9030 | 6 | 09-03-1998 9030 | 6 | 09-03-1998
(6 rows) 1 | 1 | 03-13-1996
1 | 2 | 04-12-1996
1 | 3 | 01-29-1996
1 | 4 | 04-21-1996
1 | 5 | 03-30-1996
1 | 6 | 01-30-1996
(12 rows)
-- We use the l_linenumber field for the following aggregations. We need to use -- We use the l_linenumber field for the following aggregations. We need to use
-- an integer type, as aggregations on numerics or big integers return numerics -- an integer type, as aggregations on numerics or big integers return numerics

View File

@ -102,6 +102,7 @@ SET client_min_messages TO 'DEBUG2';
-- insert a single row for the test -- insert a single row for the test
INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable
-- first, test zero-shard SELECT, which should return an empty row -- first, test zero-shard SELECT, which should return an empty row
SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2;
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
@ -116,6 +117,7 @@ DEBUG: predicate pruning for shardId 103300
SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50; SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+-----------+------------ ----+-----------+-----------+------------
50 | 10 | anjanette | 19519 50 | 10 | anjanette | 19519
@ -125,6 +127,7 @@ DEBUG: predicate pruning for shardId 103301
SELECT title FROM articles_hash WHERE author_id = 10; SELECT title FROM articles_hash WHERE author_id = 10;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
title title
------------ ------------
aggrandize aggrandize
@ -140,6 +143,7 @@ SELECT title, word_count FROM articles_hash
ORDER BY word_count DESC NULLS LAST; ORDER BY word_count DESC NULLS LAST;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
title | word_count title | word_count
------------+------------ ------------+------------
anjanette | 19519 anjanette | 19519
@ -156,6 +160,7 @@ SELECT title, id FROM articles_hash
LIMIT 2; LIMIT 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
title | id title | id
---------+---- ---------+----
aruru | 5 aruru | 5
@ -163,6 +168,7 @@ DEBUG: predicate pruning for shardId 103301
(2 rows) (2 rows)
-- find all articles by two authors in same shard -- find all articles by two authors in same shard
-- but plan is not router executable due to order by
SELECT title, author_id FROM articles_hash SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8 WHERE author_id = 7 OR author_id = 8
ORDER BY author_id ASC, id; ORDER BY author_id ASC, id;
@ -181,6 +187,25 @@ DEBUG: predicate pruning for shardId 103301
alkylic | 8 alkylic | 8
(10 rows) (10 rows)
-- same query is router executable with no order by
SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8;
DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
title | author_id
-------------+-----------
aseptic | 7
agatized | 8
auriga | 7
assembly | 8
arsenous | 7
aerophyte | 8
archduchies | 7
anatine | 8
abeyance | 7
alkylic | 8
(10 rows)
-- add in some grouping expressions, still on same shard -- add in some grouping expressions, still on same shard
-- having queries unsupported in Citus -- having queries unsupported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
@ -198,6 +223,7 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
ORDER BY sum(word_count) DESC; ORDER BY sum(word_count) DESC;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
author_id | corpus_size author_id | corpus_size
-----------+------------- -----------+-------------
1 | 35894 1 | 35894
@ -289,13 +315,13 @@ SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id L
AS special_price FROM articles_hash a; AS special_price FROM articles_hash a;
ERROR: cannot perform distributed planning on this query ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries other than in from-clause are currently unsupported DETAIL: Subqueries other than in from-clause are currently unsupported
set citus.task_executor_type to 'router';
-- simple lookup query -- simple lookup query
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -311,6 +337,7 @@ SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -321,17 +348,26 @@ DEBUG: predicate pruning for shardId 103301
(5 rows) (5 rows)
-- below query hits two shards, not router plannable + not router executable -- below query hits two shards, not router plannable + not router executable
-- handled by real-time executor
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 OR author_id = 18; WHERE author_id = 1 OR author_id = 18;
ERROR: cannot use router executor with queries that hit multiple shards id | author_id | title | word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". ----+-----------+--------------+------------
1 | 1 | arsenous | 9572
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
41 | 1 | aznavour | 11814
(5 rows)
-- rename the output columns -- rename the output columns
SELECT id as article_id, word_count * id as random_value SELECT id as article_id, word_count * id as random_value
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
article_id | random_value article_id | random_value
------------+-------------- ------------+--------------
1 | 9572 1 | 9572
@ -343,6 +379,7 @@ DEBUG: predicate pruning for shardId 103301
-- we can push down co-located joins to a single worker -- we can push down co-located joins to a single worker
-- this is not router plannable but router executable -- this is not router plannable but router executable
-- handled by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
@ -350,6 +387,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
10 | 17277 10 | 17277
@ -364,8 +402,13 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
ERROR: cannot use router executor with JOINs first_author | second_word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". --------------+-------------------
10 | 19519
10 | 19519
10 | 19519
(3 rows)
-- single shard select with limit is router plannable -- single shard select with limit is router plannable
SELECT * SELECT *
FROM articles_hash FROM articles_hash
@ -373,6 +416,7 @@ SELECT *
LIMIT 3; LIMIT 3;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -388,6 +432,7 @@ SELECT *
OFFSET 1; OFFSET 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
11 | 1 | alamo | 1347 11 | 1 | alamo | 1347
@ -403,6 +448,7 @@ SELECT *
OFFSET 1; OFFSET 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
31 | 1 | athwartships | 7271 31 | 1 | athwartships | 7271
@ -417,6 +463,7 @@ SELECT id
GROUP BY id; GROUP BY id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id id
---- ----
41 41
@ -432,6 +479,7 @@ SELECT distinct id
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id id
---- ----
41 41
@ -447,6 +495,7 @@ SELECT avg(word_count)
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103300 DEBUG: predicate pruning for shardId 103300
DEBUG: Plan is router executable
avg avg
-------------------- --------------------
12356.400000000000 12356.400000000000
@ -459,6 +508,7 @@ SELECT max(word_count) as max, min(word_count) as min,
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103300 DEBUG: predicate pruning for shardId 103300
DEBUG: Plan is router executable
max | min | sum | cnt max | min | sum | cnt
-------+------+-------+----- -------+------+-------+-----
18185 | 2728 | 61782 | 5 18185 | 2728 | 61782 | 5
@ -471,6 +521,7 @@ SELECT max(word_count)
GROUP BY author_id; GROUP BY author_id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
max max
------- -------
11814 11814
@ -481,14 +532,32 @@ SET client_min_messages to 'NOTICE';
SELECT * SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
ERROR: cannot use router executor with repartition jobs ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.task_executor_type to "task-tracker".
-- error out for queries which hit more than 1 shards -- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id >= 1 AND author_id <= 3; WHERE author_id >= 1 AND author_id <= 3;
ERROR: cannot use router executor with queries that hit multiple shards id | author_id | title | word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". ----+-----------+--------------+------------
1 | 1 | arsenous | 9572
3 | 3 | asternal | 10480
11 | 1 | alamo | 1347
13 | 3 | aseyev | 2255
21 | 1 | arcading | 5890
23 | 3 | abhorring | 6799
31 | 1 | athwartships | 7271
33 | 3 | autochrome | 8180
41 | 1 | aznavour | 11814
43 | 3 | affixal | 12723
2 | 2 | abducing | 13642
12 | 2 | archiblast | 18185
22 | 2 | antipope | 2728
32 | 2 | amazon | 11342
42 | 2 | ausable | 15885
(15 rows)
SET citus.task_executor_type TO 'real-time'; SET citus.task_executor_type TO 'real-time';
-- Test various filtering options for router plannable check -- Test various filtering options for router plannable check
SET client_min_messages to 'DEBUG2'; SET client_min_messages to 'DEBUG2';
@ -498,6 +567,7 @@ SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 and author_id >= 1; WHERE author_id = 1 and author_id >= 1;
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -527,6 +597,7 @@ SELECT *
WHERE author_id = 1 and (id = 1 or id = 41); WHERE author_id = 1 and (id = 1 or id = 41);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -539,6 +610,7 @@ SELECT *
WHERE author_id = 1 and (id = random()::int * 0); WHERE author_id = 1 and (id = random()::int * 0);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+-------+------------ ----+-----------+-------+------------
(0 rows) (0 rows)
@ -576,6 +648,7 @@ SELECT *
WHERE author_id = abs(-1); WHERE author_id = abs(-1);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -617,6 +690,7 @@ SELECT *
WHERE author_id = 1 and (id = abs(id - 2)); WHERE author_id = 1 and (id = abs(id - 2));
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -641,6 +715,7 @@ SELECT *
WHERE (author_id = 1) = true; WHERE (author_id = 1) = true;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -656,6 +731,7 @@ SELECT *
WHERE (author_id = 1) and id between 0 and 20; WHERE (author_id = 1) and id between 0 and 20;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -668,6 +744,7 @@ SELECT *
WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s'; WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s';
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -680,6 +757,7 @@ SELECT *
WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1); WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -692,6 +770,7 @@ SELECT *
WHERE (title like '%s' or title like 'a%') and (author_id = 1); WHERE (title like '%s' or title like 'a%') and (author_id = 1);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -707,6 +786,7 @@ SELECT *
WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000); WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -720,6 +800,7 @@ SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
WHERE author_id = 5; WHERE author_id = 5;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
prev | title | word_count prev | title | word_count
----------+----------+------------ ----------+----------+------------
| afrasia | 864 | afrasia | 864
@ -735,6 +816,7 @@ SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
ORDER BY word_count DESC; ORDER BY word_count DESC;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
prev | title | word_count prev | title | word_count
----------+----------+------------ ----------+----------+------------
aminate | aruru | 11389 aminate | aruru | 11389
@ -749,6 +831,7 @@ SELECT id, MIN(id) over (order by word_count)
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | min id | min
----+----- ----+-----
11 | 11 11 | 11
@ -763,6 +846,7 @@ SELECT id, word_count, AVG(word_count) over (order by word_count)
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | word_count | avg id | word_count | avg
----+------------+----------------------- ----+------------+-----------------------
11 | 1347 | 1347.0000000000000000 11 | 1347 | 1347.0000000000000000
@ -772,6 +856,21 @@ DEBUG: predicate pruning for shardId 103301
41 | 11814 | 7178.8000000000000000 41 | 11814 | 7178.8000000000000000
(5 rows) (5 rows)
SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count)
FROM articles_hash
WHERE author_id = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
word_count | rank
------------+------
1347 | 1
5890 | 2
7271 | 3
9572 | 4
11814 | 5
(5 rows)
-- window functions are not supported for not router plannable queries -- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count) SELECT id, MIN(id) over (order by word_count)
FROM articles_hash FROM articles_hash
@ -801,6 +900,7 @@ SELECT
author_id = 5; author_id = 5;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
c c
--- ---
5 5
@ -831,6 +931,7 @@ SELECT *
ORDER BY id; ORDER BY id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -841,14 +942,16 @@ DEBUG: predicate pruning for shardId 103301
(5 rows) (5 rows)
END; END;
-- cursor queries inside transactions are not router plannable -- cursor queries are router plannable
BEGIN; BEGIN;
DECLARE test_cursor CURSOR FOR DECLARE test_cursor CURSOR FOR
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 WHERE author_id = 1
ORDER BY id; ORDER BY id;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
FETCH test_cursor; FETCH test_cursor;
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
@ -870,6 +973,7 @@ COPY (
ORDER BY id) TO STDOUT; ORDER BY id) TO STDOUT;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
1 1 arsenous 9572 1 1 arsenous 9572
11 1 alamo 1347 11 1 alamo 1347
21 1 arcading 5890 21 1 arcading 5890
@ -884,12 +988,14 @@ CREATE TEMP TABLE temp_articles_hash as
ORDER BY id; ORDER BY id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
-- router plannable queries may include filter for aggragates -- router plannable queries may include filter for aggragates
SELECT count(*), count(*) FILTER (WHERE id < 3) SELECT count(*), count(*) FILTER (WHERE id < 3)
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
count | count count | count
-------+------- -------+-------
5 | 1 5 | 1
@ -912,6 +1018,7 @@ PREPARE author_1_articles as
EXECUTE author_1_articles; EXECUTE author_1_articles;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -929,6 +1036,7 @@ PREPARE author_articles(int) as
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -955,6 +1063,10 @@ CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah
WHERE author_id = 1" WHERE author_id = 1"
PL/pgSQL function author_articles_max_id() line 5 at SQL statement PL/pgSQL function author_articles_max_id() line 5 at SQL statement
DEBUG: predicate pruning for shardId 103301 DEBUG: predicate pruning for shardId 103301
CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah
WHERE author_id = 1"
PL/pgSQL function author_articles_max_id() line 5 at SQL statement
DEBUG: Plan is router executable
CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah
WHERE author_id = 1" WHERE author_id = 1"
PL/pgSQL function author_articles_max_id() line 5 at SQL statement PL/pgSQL function author_articles_max_id() line 5 at SQL statement
@ -985,8 +1097,46 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count
FROM articles_hash ah FROM articles_hash ah
WHERE author_id = 1" WHERE author_id = 1"
PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
DEBUG: Plan is router executable
CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
ERROR: scan directions other than forward scans are unsupported ERROR: scan directions other than forward scans are unsupported
CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY
-- router planner/executor is disabled for task-tracker executor
-- following query is router plannable, but router planner is disabled
SET citus.task_executor_type to 'task-tracker';
SELECT id
FROM articles_hash
WHERE author_id = 1;
DEBUG: predicate pruning for shardId 103301
id
----
1
11
21
31
41
(5 rows)
-- insert query is router plannable even under task-tracker
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814);
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103301
DEBUG: Plan is router executable
-- verify insert is successfull (not router plannable and executable)
SELECT id
FROM articles_hash
WHERE author_id = 1;
DEBUG: predicate pruning for shardId 103301
id
----
1
11
21
31
41
51
(6 rows)
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count(); DROP FUNCTION author_articles_id_word_count();

View File

@ -4,7 +4,7 @@
CREATE TABLE articles ( CREATE TABLE articles (
id bigint NOT NULL, id bigint NOT NULL,
author_id bigint NOT NULL, author_id bigint NOT NULL,
title text NOT NULL, title varchar(20) NOT NULL,
word_count integer NOT NULL CHECK (word_count > 0) word_count integer NOT NULL CHECK (word_count > 0)
); );
-- this table is used in a CTE test -- this table is used in a CTE test
@ -315,15 +315,16 @@ ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported DETAIL: Having qual is currently unsupported
-- now, test the cases where Citus do or do not need to create -- now, test the cases where Citus do or do not need to create
-- the master queries -- the master queries
SET citus.task_executor_type TO 'router';
SET citus.large_table_shard_count TO 2; SET citus.large_table_shard_count TO 2;
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
-- start with the simple lookup query -- start with the simple lookup query
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -338,6 +339,7 @@ SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -351,14 +353,22 @@ DEBUG: predicate pruning for shardId 103094
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 18; WHERE author_id = 1 OR author_id = 18;
ERROR: cannot use router executor with queries that hit multiple shards id | author_id | title | word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". ----+-----------+--------------+------------
1 | 1 | arsenous | 9572
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
41 | 1 | aznavour | 11814
(5 rows)
-- rename the output columns on a no master query case -- rename the output columns on a no master query case
SELECT id as article_id, word_count * id as random_value SELECT id as article_id, word_count * id as random_value
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
article_id | random_value article_id | random_value
------------+-------------- ------------+--------------
1 | 9572 1 | 9572
@ -377,6 +387,7 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
10 | 17277 10 | 17277
@ -384,15 +395,21 @@ DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
10 | 6363 10 | 6363
(3 rows) (3 rows)
-- now show that JOINs don't work with multiple tables -- now show that JOINs with multiple tables are not router executable
-- they are executed by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles_single_shard b FROM articles a, articles_single_shard b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3 DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
ERROR: cannot use router executor with JOINs first_author | second_word_count
HINT: Set citus.task_executor_type to "real-time" or "task-tracker". --------------+-------------------
10 | 19519
10 | 19519
10 | 19519
(3 rows)
-- do not create the master query for LIMIT on a single shard SELECT -- do not create the master query for LIMIT on a single shard SELECT
SELECT * SELECT *
FROM articles FROM articles
@ -400,6 +417,7 @@ SELECT *
LIMIT 2; LIMIT 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -415,6 +433,7 @@ SELECT id
GROUP BY id; GROUP BY id;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094 DEBUG: predicate pruning for shardId 103094
DEBUG: Plan is router executable
id id
---- ----
41 41
@ -426,6 +445,7 @@ DEBUG: predicate pruning for shardId 103094
-- copying from a single shard table does not require the master query -- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout; COPY articles_single_shard TO stdout;
DEBUG: Plan is router executable
50 10 anjanette 19519 50 10 anjanette 19519
-- error out for queries with aggregates -- error out for queries with aggregates
SELECT avg(word_count) SELECT avg(word_count)
@ -433,66 +453,26 @@ SELECT avg(word_count)
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103093
DEBUG: Plan is router executable
avg avg
-------------------- --------------------
12356.400000000000 12356.400000000000
(1 row) (1 row)
-- max, min, sum, count is somehow implemented -- max, min, sum, count is somehow implemented
-- differently in distributed planning but, still error out -- differently in distributed planning
SELECT max(word_count) as max, min(word_count) as min, SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt sum(word_count) as sum, count(word_count) as cnt
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103093 DEBUG: predicate pruning for shardId 103093
DEBUG: Plan is router executable
max | min | sum | cnt max | min | sum | cnt
-------+------+-------+----- -------+------+-------+-----
18185 | 2728 | 61782 | 5 18185 | 2728 | 61782 | 5
(1 row) (1 row)
-- error out for queries with ORDER BY
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094
id | author_id | title | word_count
----+-----------+--------------+------------
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
31 | 1 | athwartships | 7271
1 | 1 | arsenous | 9572
41 | 1 | aznavour | 11814
(5 rows)
-- error out for queries with ORDER BY and LIMIT
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count
LIMIT 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094
id | author_id | title | word_count
----+-----------+----------+------------
11 | 1 | alamo | 1347
21 | 1 | arcading | 5890
(2 rows)
-- error out for queries with aggregates and GROUP BY
SELECT max(word_count)
FROM articles
WHERE author_id = 1
GROUP BY author_id;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 103094
max
-------
11814
(1 row)
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles a, articles b FROM articles a, articles b
@ -526,13 +506,6 @@ DEBUG: pruning merge fetch taskId 10
DETAIL: Creating dependency on merge taskId 9 DETAIL: Creating dependency on merge taskId 9
DEBUG: pruning merge fetch taskId 11 DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use router executor with repartition jobs ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.task_executor_type to "task-tracker".
-- error out for queries which hit more than 1 shards
SELECT *
FROM articles
WHERE author_id >= 1 AND author_id <= 3;
ERROR: cannot use router executor with queries that hit multiple shards
HINT: Set citus.task_executor_type to "real-time" or "task-tracker".
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
SET citus.task_executor_type TO 'real-time';

View File

@ -69,7 +69,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'router'; SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;

View File

@ -15,8 +15,8 @@ SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102009;
SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010; SELECT shardminvalue, shardmaxvalue from pg_dist_shard WHERE shardid = 102010;
-- Check that partition and join pruning works when min/max values exist -- Check that partition and join pruning works when min/max values exist
-- Adding l_orderkey = 1 to make the query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders SELECT sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
WHERE l_orderkey = o_orderkey; WHERE l_orderkey = o_orderkey;

View File

@ -7,7 +7,8 @@
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030; -- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
-- We use the l_linenumber field for the following aggregations. We need to use -- We use the l_linenumber field for the following aggregations. We need to use
-- an integer type, as aggregations on numerics or big integers return numerics -- an integer type, as aggregations on numerics or big integers return numerics

View File

@ -114,10 +114,15 @@ SELECT title, id FROM articles_hash
LIMIT 2; LIMIT 2;
-- find all articles by two authors in same shard -- find all articles by two authors in same shard
-- but plan is not router executable due to order by
SELECT title, author_id FROM articles_hash SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8 WHERE author_id = 7 OR author_id = 8
ORDER BY author_id ASC, id; ORDER BY author_id ASC, id;
-- same query is router executable with no order by
SELECT title, author_id FROM articles_hash
WHERE author_id = 7 OR author_id = 8;
-- add in some grouping expressions, still on same shard -- add in some grouping expressions, still on same shard
-- having queries unsupported in Citus -- having queries unsupported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
@ -163,8 +168,6 @@ ORDER BY articles_hash.id;
SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1) SELECT a.title AS name, (SELECT a2.id FROM authors_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a; AS special_price FROM articles_hash a;
set citus.task_executor_type to 'router';
-- simple lookup query -- simple lookup query
SELECT * SELECT *
FROM articles_hash FROM articles_hash
@ -177,6 +180,7 @@ SELECT *
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
-- below query hits two shards, not router plannable + not router executable -- below query hits two shards, not router plannable + not router executable
-- handled by real-time executor
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 OR author_id = 18; WHERE author_id = 1 OR author_id = 18;
@ -188,6 +192,7 @@ SELECT id as article_id, word_count * id as random_value
-- we can push down co-located joins to a single worker -- we can push down co-located joins to a single worker
-- this is not router plannable but router executable -- this is not router plannable but router executable
-- handled by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
@ -255,7 +260,8 @@ SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
-- error out for queries which hit more than 1 shards -- queries which hit more than 1 shards are not router plannable or executable
-- handled by real-time executor
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id >= 1 AND author_id <= 3; WHERE author_id >= 1 AND author_id <= 3;
@ -369,6 +375,10 @@ SELECT id, word_count, AVG(word_count) over (order by word_count)
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count)
FROM articles_hash
WHERE author_id = 1;
-- window functions are not supported for not router plannable queries -- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count) SELECT id, MIN(id) over (order by word_count)
FROM articles_hash FROM articles_hash
@ -418,7 +428,7 @@ SELECT *
ORDER BY id; ORDER BY id;
END; END;
-- cursor queries inside transactions are not router plannable -- cursor queries are router plannable
BEGIN; BEGIN;
DECLARE test_cursor CURSOR FOR DECLARE test_cursor CURSOR FOR
SELECT * SELECT *
@ -497,6 +507,21 @@ $$ LANGUAGE plpgsql;
SELECT * FROM author_articles_id_word_count(); SELECT * FROM author_articles_id_word_count();
-- router planner/executor is disabled for task-tracker executor
-- following query is router plannable, but router planner is disabled
SET citus.task_executor_type to 'task-tracker';
SELECT id
FROM articles_hash
WHERE author_id = 1;
-- insert query is router plannable even under task-tracker
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814);
-- verify insert is successfull (not router plannable and executable)
SELECT id
FROM articles_hash
WHERE author_id = 1;
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_max_id();

View File

@ -5,7 +5,7 @@
CREATE TABLE articles ( CREATE TABLE articles (
id bigint NOT NULL, id bigint NOT NULL,
author_id bigint NOT NULL, author_id bigint NOT NULL,
title text NOT NULL, title varchar(20) NOT NULL,
word_count integer NOT NULL CHECK (word_count > 0) word_count integer NOT NULL CHECK (word_count > 0)
); );
@ -188,9 +188,9 @@ SELECT author_id FROM articles
-- now, test the cases where Citus do or do not need to create -- now, test the cases where Citus do or do not need to create
-- the master queries -- the master queries
SET citus.task_executor_type TO 'router';
SET citus.large_table_shard_count TO 2; SET citus.large_table_shard_count TO 2;
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
-- start with the simple lookup query -- start with the simple lookup query
SELECT * SELECT *
@ -219,7 +219,8 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
-- now show that JOINs don't work with multiple tables -- now show that JOINs with multiple tables are not router executable
-- they are executed by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles_single_shard b FROM articles a, articles_single_shard b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
@ -248,40 +249,15 @@ SELECT avg(word_count)
WHERE author_id = 2; WHERE author_id = 2;
-- max, min, sum, count is somehow implemented -- max, min, sum, count is somehow implemented
-- differently in distributed planning but, still error out -- differently in distributed planning
SELECT max(word_count) as max, min(word_count) as min, SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt sum(word_count) as sum, count(word_count) as cnt
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
-- error out for queries with ORDER BY
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count;
-- error out for queries with ORDER BY and LIMIT
SELECT *
FROM articles
WHERE author_id = 1
ORDER BY word_count
LIMIT 2;
-- error out for queries with aggregates and GROUP BY
SELECT max(word_count)
FROM articles
WHERE author_id = 1
GROUP BY author_id;
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles a, articles b FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
-- error out for queries which hit more than 1 shards
SELECT *
FROM articles
WHERE author_id >= 1 AND author_id <= 3;
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
SET citus.task_executor_type TO 'real-time';