mirror of https://github.com/citusdata/citus.git
Expand router planner coverage
We can now support richer set of queries in router planner. This allow us to support CTEs, joins, window function, subqueries if they are known to be executed at a single worker with a single task (all tables are filtered down to a single shard and a single worker contains all table shards referenced in the query). Fixes : #501pull/647/head
parent
512d1791c2
commit
cc33a450c4
|
@ -107,6 +107,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
|
|||
instr_time planStart;
|
||||
instr_time planDuration;
|
||||
Query *originalQuery = NULL;
|
||||
RelationRestrictionContext *restrictionContext = NULL;
|
||||
|
||||
/* if local query, run the standard explain and return */
|
||||
bool localQuery = !NeedsDistributedPlanning(query);
|
||||
|
@ -137,22 +138,35 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
|
|||
/* measure the full planning time to display in EXPLAIN ANALYZE */
|
||||
INSTR_TIME_SET_CURRENT(planStart);
|
||||
|
||||
/* call standard planner to modify the query structure before multi planning */
|
||||
initialPlan = standard_planner(query, 0, params);
|
||||
restrictionContext = CreateAndPushRestrictionContext();
|
||||
|
||||
commandType = initialPlan->commandType;
|
||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
PG_TRY();
|
||||
{
|
||||
if (es->analyze)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
|
||||
"distributed tables is not supported.")));
|
||||
}
|
||||
}
|
||||
/* call standard planner to modify the query structure before multi planning */
|
||||
initialPlan = standard_planner(query, 0, params);
|
||||
|
||||
multiPlan = CreatePhysicalPlan(originalQuery, query);
|
||||
commandType = initialPlan->commandType;
|
||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
{
|
||||
if (es->analyze)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
|
||||
"distributed tables is not supported.")));
|
||||
}
|
||||
}
|
||||
|
||||
multiPlan = CreatePhysicalPlan(originalQuery, query, restrictionContext);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PopRestrictionContext();
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
PopRestrictionContext();
|
||||
|
||||
INSTR_TIME_SET_CURRENT(planDuration);
|
||||
INSTR_TIME_SUBTRACT(planDuration, planStart);
|
||||
|
|
|
@ -1410,8 +1410,41 @@ FindNodesOfType(MultiNode *node, int type)
|
|||
|
||||
|
||||
/*
|
||||
* NeedsDistributedPlanning checks if the passed in query is a Select query
|
||||
* running on partitioned relations. If it is, we start distributed planning.
|
||||
* IdentifyRTE assigns an identifier to an RTE, for tracking purposes.
|
||||
*
|
||||
* To be able to track RTEs through postgres' query planning, which copies and
|
||||
* duplicate, and modifies them, we sometimes need to figure out whether two
|
||||
* RTEs are copies of the same original RTE. For that we, hackishly, use a
|
||||
* field normally unused in RTE_RELATION RTEs.
|
||||
*
|
||||
* The assigned identifier better be unique within a plantree.
|
||||
*/
|
||||
void
|
||||
IdentifyRTE(RangeTblEntry *rte, int identifier)
|
||||
{
|
||||
Assert(rte->rtekind == RTE_RELATION);
|
||||
Assert(rte->values_lists == NIL);
|
||||
rte->values_lists = list_make1_int(identifier);
|
||||
}
|
||||
|
||||
|
||||
/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */
|
||||
int
|
||||
GetRTEIdentity(RangeTblEntry *rte)
|
||||
{
|
||||
Assert(rte->rtekind == RTE_RELATION);
|
||||
Assert(IsA(rte->values_lists, IntList));
|
||||
Assert(list_length(rte->values_lists) == 1);
|
||||
|
||||
return linitial_int(rte->values_lists);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NeedsDistributedPlanning checks if the passed in query is a query running
|
||||
* on a distributed table. If it is, we start distributed planning.
|
||||
*
|
||||
* For distributed relations it also assigns identifiers to the relevant RTEs.
|
||||
*/
|
||||
bool
|
||||
NeedsDistributedPlanning(Query *queryTree)
|
||||
|
@ -1421,6 +1454,7 @@ NeedsDistributedPlanning(Query *queryTree)
|
|||
ListCell *rangeTableCell = NULL;
|
||||
bool hasLocalRelation = false;
|
||||
bool hasDistributedRelation = false;
|
||||
int rteIdentifier = 1;
|
||||
|
||||
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
|
||||
commandType != CMD_UPDATE && commandType != CMD_DELETE)
|
||||
|
@ -1441,6 +1475,17 @@ NeedsDistributedPlanning(Query *queryTree)
|
|||
if (IsDistributedTable(relationId))
|
||||
{
|
||||
hasDistributedRelation = true;
|
||||
|
||||
/*
|
||||
* To be able to track individual RTEs through postgres' query
|
||||
* planning, we need to be able to figure out whether an RTE is
|
||||
* actually a copy of another, rather than a different one. We
|
||||
* simply number the RTEs starting from 1.
|
||||
*/
|
||||
if (rangeTableEntry->rtekind == RTE_RELATION)
|
||||
{
|
||||
IdentifyRTE(rangeTableEntry, rteIdentifier++);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -31,6 +31,10 @@
|
|||
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
static List *relationRestrictionContextList = NIL;
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static void CheckNodeIsDumpable(Node *node);
|
||||
|
||||
|
@ -46,6 +50,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
PlannedStmt *result = NULL;
|
||||
bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
|
||||
Query *originalQuery = NULL;
|
||||
RelationRestrictionContext *restrictionContext = NULL;
|
||||
|
||||
/*
|
||||
* standard_planner scribbles on it's input, but for deparsing we need the
|
||||
|
@ -56,19 +61,36 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
originalQuery = copyObject(parse);
|
||||
}
|
||||
|
||||
/*
|
||||
* First call into standard planner. This is required because the Citus
|
||||
* planner relies on parse tree transformations made by postgres' planner.
|
||||
*/
|
||||
result = standard_planner(parse, cursorOptions, boundParams);
|
||||
/* create a restriction context and put it at the end if context list */
|
||||
restrictionContext = CreateAndPushRestrictionContext();
|
||||
|
||||
if (needsDistributedPlanning)
|
||||
PG_TRY();
|
||||
{
|
||||
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse);
|
||||
/*
|
||||
* First call into standard planner. This is required because the Citus
|
||||
* planner relies on parse tree transformations made by postgres' planner.
|
||||
*/
|
||||
|
||||
/* store required data into the planned statement */
|
||||
result = MultiQueryContainerNode(result, physicalPlan);
|
||||
result = standard_planner(parse, cursorOptions, boundParams);
|
||||
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse,
|
||||
restrictionContext);
|
||||
|
||||
/* store required data into the planned statement */
|
||||
result = MultiQueryContainerNode(result, physicalPlan);
|
||||
}
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
PopRestrictionContext();
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
/* remove the context from the context list */
|
||||
PopRestrictionContext();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -82,10 +104,11 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
* physical plan process needed to produce distributed query plans.
|
||||
*/
|
||||
MultiPlan *
|
||||
CreatePhysicalPlan(Query *originalQuery, Query *query)
|
||||
CreatePhysicalPlan(Query *originalQuery, Query *query,
|
||||
RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query,
|
||||
TaskExecutorType);
|
||||
TaskExecutorType, restrictionContext);
|
||||
if (physicalPlan == NULL)
|
||||
{
|
||||
/* Create and optimize logical plan */
|
||||
|
@ -296,3 +319,90 @@ CheckNodeIsDumpable(Node *node)
|
|||
pfree(out);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* multi_relation_restriction_hook is a hook called by postgresql standard planner
|
||||
* to notify us about various planning information regarding a relation. We use
|
||||
* it to retrieve restrictions on relations.
|
||||
*/
|
||||
void
|
||||
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index,
|
||||
RangeTblEntry *rte)
|
||||
{
|
||||
RelationRestrictionContext *restrictionContext = NULL;
|
||||
RelationRestriction *relationRestriction = NULL;
|
||||
bool distributedTable = false;
|
||||
bool localTable = false;
|
||||
|
||||
if (rte->rtekind != RTE_RELATION)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
distributedTable = IsDistributedTable(rte->relid);
|
||||
localTable = !distributedTable;
|
||||
|
||||
restrictionContext = CurrentRestrictionContext();
|
||||
Assert(restrictionContext != NULL);
|
||||
|
||||
relationRestriction = palloc0(sizeof(RelationRestriction));
|
||||
relationRestriction->index = index;
|
||||
relationRestriction->relationId = rte->relid;
|
||||
relationRestriction->rte = rte;
|
||||
relationRestriction->relOptInfo = relOptInfo;
|
||||
relationRestriction->distributedRelation = distributedTable;
|
||||
relationRestriction->plannerInfo = root;
|
||||
relationRestriction->prunedShardIntervalList = NIL;
|
||||
|
||||
restrictionContext->hasDistributedRelation |= distributedTable;
|
||||
restrictionContext->hasLocalRelation |= localTable;
|
||||
|
||||
restrictionContext->relationRestrictionList =
|
||||
lappend(restrictionContext->relationRestrictionList, relationRestriction);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateAndPushRestrictionContext creates a new restriction context, inserts it to the
|
||||
* beginning of the context list, and returns the newly created context.
|
||||
*/
|
||||
RelationRestrictionContext *
|
||||
CreateAndPushRestrictionContext(void)
|
||||
{
|
||||
RelationRestrictionContext *restrictionContext =
|
||||
palloc0(sizeof(RelationRestrictionContext));
|
||||
relationRestrictionContextList = lcons(restrictionContext,
|
||||
relationRestrictionContextList);
|
||||
|
||||
return restrictionContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CurrentRestrictionContext returns the the last restriction context from the
|
||||
* list.
|
||||
*/
|
||||
RelationRestrictionContext *
|
||||
CurrentRestrictionContext(void)
|
||||
{
|
||||
RelationRestrictionContext *restrictionContext = NULL;
|
||||
|
||||
Assert(relationRestrictionContextList != NIL);
|
||||
|
||||
restrictionContext =
|
||||
(RelationRestrictionContext *) linitial(relationRestrictionContextList);
|
||||
|
||||
return restrictionContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PopRestrictionContext removes the most recently added restriction context from
|
||||
* context list. The function assumes the list is not empty.
|
||||
*/
|
||||
void
|
||||
PopRestrictionContext(void)
|
||||
{
|
||||
relationRestrictionContextList = list_delete_first(relationRestrictionContextList);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
#include "access/stratnum.h"
|
||||
#include "access/xact.h"
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/citus_nodefuncs.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
|
@ -40,6 +42,7 @@
|
|||
#include "nodes/pg_list.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "optimizer/restrictinfo.h"
|
||||
#include "parser/parsetree.h"
|
||||
#include "storage/lock.h"
|
||||
#include "utils/elog.h"
|
||||
|
@ -65,17 +68,25 @@ static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
|
|||
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
|
||||
static char MostPermissiveVolatileFlag(char left, char right);
|
||||
static Task * RouterModifyTask(Query *originalQuery, Query *query);
|
||||
static ShardInterval * TargetShardInterval(Query *query);
|
||||
static ShardInterval * TargetShardIntervalForModify(Query *query);
|
||||
static List * QueryRestrictList(Query *query);
|
||||
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
||||
static ShardInterval * FastShardPruning(Oid distributedTableId,
|
||||
Const *partionColumnValue);
|
||||
static Oid ExtractFirstDistributedTableId(Query *query);
|
||||
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||
static Task * RouterSelectTask(Query *originalQuery, Query *query);
|
||||
static Job * RouterQueryJob(Query *query, Task *task);
|
||||
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType);
|
||||
static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
|
||||
static Task * RouterSelectTask(Query *originalQuery, Query *query,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
List **placementList);
|
||||
static List * TargetShardIntervalsForSelect(Query *query,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||
static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
|
||||
static bool UpdateRelationNames(Node *node,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
|
||||
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -87,21 +98,23 @@ static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
|
|||
*/
|
||||
MultiPlan *
|
||||
MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
||||
MultiExecutorType taskExecutorType)
|
||||
MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
Task *task = NULL;
|
||||
Job *job = NULL;
|
||||
MultiPlan *multiPlan = NULL;
|
||||
CmdType commandType = query->commandType;
|
||||
bool modifyTask = false;
|
||||
List *placementList = NIL;
|
||||
|
||||
bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType);
|
||||
bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType,
|
||||
restrictionContext);
|
||||
if (!routerPlannable)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("Creating router plan")));
|
||||
|
||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
|
@ -118,11 +131,17 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
|||
{
|
||||
Assert(commandType == CMD_SELECT);
|
||||
|
||||
task = RouterSelectTask(originalQuery, query);
|
||||
task = RouterSelectTask(originalQuery, query, restrictionContext, &placementList);
|
||||
}
|
||||
|
||||
if (task == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
job = RouterQueryJob(originalQuery, task);
|
||||
ereport(DEBUG2, (errmsg("Creating router plan")));
|
||||
|
||||
job = RouterQueryJob(originalQuery, task, placementList);
|
||||
|
||||
multiPlan = CitusMakeNode(MultiPlan);
|
||||
multiPlan->workerJob = job;
|
||||
|
@ -669,7 +688,7 @@ MostPermissiveVolatileFlag(char left, char right)
|
|||
static Task *
|
||||
RouterModifyTask(Query *originalQuery, Query *query)
|
||||
{
|
||||
ShardInterval *shardInterval = TargetShardInterval(query);
|
||||
ShardInterval *shardInterval = TargetShardIntervalForModify(query);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
StringInfo queryString = makeStringInfo();
|
||||
Task *modifyTask = NULL;
|
||||
|
@ -713,25 +732,23 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
|||
|
||||
|
||||
/*
|
||||
* TargetShardInterval determines the single shard targeted by a provided command.
|
||||
* If no matching shards exist, or if the modification targets more than one one
|
||||
* shard, this function raises an error depending on the command type.
|
||||
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
||||
* modify command. If no matching shards exist, or if the modification targets more
|
||||
* than one shard, this function raises an error depending on the command type.
|
||||
*/
|
||||
static ShardInterval *
|
||||
TargetShardInterval(Query *query)
|
||||
TargetShardIntervalForModify(Query *query)
|
||||
{
|
||||
CmdType commandType = query->commandType;
|
||||
bool selectTask = (commandType == CMD_SELECT);
|
||||
List *prunedShardList = NIL;
|
||||
int prunedShardCount = 0;
|
||||
|
||||
|
||||
int shardCount = 0;
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
char partitionMethod = cacheEntry->partitionMethod;
|
||||
bool fastShardPruningPossible = false;
|
||||
|
||||
Assert(query->commandType != CMD_SELECT);
|
||||
|
||||
/* error out if no shards exist for the table */
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
if (shardCount == 0)
|
||||
|
@ -774,18 +791,9 @@ TargetShardInterval(Query *query)
|
|||
prunedShardCount = list_length(prunedShardList);
|
||||
if (prunedShardCount != 1)
|
||||
{
|
||||
if (selectTask)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("router executor queries must target exactly one "
|
||||
"shard")));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributed modifications must target exactly one "
|
||||
"shard")));
|
||||
}
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributed modifications must target exactly one "
|
||||
"shard")));
|
||||
}
|
||||
|
||||
return (ShardInterval *) linitial(prunedShardList);
|
||||
|
@ -963,22 +971,88 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
|
|||
|
||||
/* RouterSelectTask builds a Task to represent a single shard select query */
|
||||
static Task *
|
||||
RouterSelectTask(Query *originalQuery, Query *query)
|
||||
RouterSelectTask(Query *originalQuery, Query *query,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
List **placementList)
|
||||
{
|
||||
Task *task = NULL;
|
||||
ShardInterval *shardInterval = TargetShardInterval(query);
|
||||
List *prunedRelationShardList = TargetShardIntervalsForSelect(query,
|
||||
restrictionContext);
|
||||
StringInfo queryString = makeStringInfo();
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
bool upsertQuery = false;
|
||||
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
|
||||
ListCell *prunedRelationShardListCell = NULL;
|
||||
List *workerList = NIL;
|
||||
bool shardsPresent = false;
|
||||
|
||||
*placementList = NIL;
|
||||
|
||||
if (prunedRelationShardList == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Assert(shardInterval != NULL);
|
||||
Assert(commandType == CMD_SELECT);
|
||||
|
||||
shardId = shardInterval->shardId;
|
||||
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
||||
{
|
||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString);
|
||||
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
||||
/* no shard is present or all shards are pruned out case will be handled later */
|
||||
if (prunedShardList == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
shardsPresent = true;
|
||||
|
||||
/* all relations are now pruned down to 0 or 1 shards */
|
||||
Assert(list_length(prunedShardList) <= 1);
|
||||
|
||||
/* anchor shard id */
|
||||
if (shardId == INVALID_SHARD_ID)
|
||||
{
|
||||
shardInterval = (ShardInterval *) linitial(prunedShardList);
|
||||
shardId = shardInterval->shardId;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Determine the worker that has all shard placements if a shard placement found.
|
||||
* If no shard placement exists, we will still run the query but the result will
|
||||
* be empty. We create a dummy shard placement for the first active worker.
|
||||
*/
|
||||
if (shardsPresent)
|
||||
{
|
||||
workerList = WorkersContainingAllShards(prunedRelationShardList);
|
||||
}
|
||||
else
|
||||
{
|
||||
List *workerNodeList = WorkerNodeList();
|
||||
if (workerNodeList != NIL)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);
|
||||
ShardPlacement *dummyPlacement =
|
||||
(ShardPlacement *) CitusMakeNode(ShardPlacement);
|
||||
dummyPlacement->nodeName = workerNode->workerName;
|
||||
dummyPlacement->nodePort = workerNode->workerPort;
|
||||
|
||||
workerList = lappend(workerList, dummyPlacement);
|
||||
}
|
||||
}
|
||||
|
||||
if (workerList == NIL)
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
UpdateRelationNames((Node *) originalQuery, restrictionContext);
|
||||
|
||||
pg_get_query_def(originalQuery, queryString);
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
|
@ -990,16 +1064,338 @@ RouterSelectTask(Query *originalQuery, Query *query)
|
|||
task->upsertQuery = upsertQuery;
|
||||
task->requiresMasterEvaluation = false;
|
||||
|
||||
*placementList = workerList;
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetShardIntervalsForSelect performs shard pruning for all referenced relations
|
||||
* in the query and returns list of shards per relation. Shard pruning is done based
|
||||
* on provided restriction context per relation. The function bails out and returns NULL
|
||||
* if any of the relations pruned down to more than one active shard. It also records
|
||||
* pruned shard intervals in relation restriction context to be used later on.
|
||||
*/
|
||||
static List *
|
||||
TargetShardIntervalsForSelect(Query *query,
|
||||
RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
List *prunedRelationShardList = NIL;
|
||||
ListCell *restrictionCell = NULL;
|
||||
|
||||
Assert(query->commandType == CMD_SELECT);
|
||||
Assert(restrictionContext != NULL);
|
||||
|
||||
foreach(restrictionCell, restrictionContext->relationRestrictionList)
|
||||
{
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) lfirst(restrictionCell);
|
||||
Oid relationId = relationRestriction->relationId;
|
||||
Index tableId = relationRestriction->index;
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||
List *restrictClauseList = extract_actual_clauses(baseRestrictionList, false);
|
||||
List *shardIntervalList = NIL;
|
||||
List *prunedShardList = NIL;
|
||||
int shardIndex = 0;
|
||||
|
||||
relationRestriction->prunedShardIntervalList = NIL;
|
||||
|
||||
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
||||
{
|
||||
ShardInterval *shardInterval =
|
||||
cacheEntry->sortedShardIntervalArray[shardIndex];
|
||||
shardIntervalList = lappend(shardIntervalList, shardInterval);
|
||||
}
|
||||
|
||||
if (shardCount > 0)
|
||||
{
|
||||
prunedShardList = PruneShardList(relationId, tableId,
|
||||
restrictClauseList,
|
||||
shardIntervalList);
|
||||
|
||||
/*
|
||||
* Quick bail out. The query can not be router plannable if one
|
||||
* relation has more than one shard left after pruning. Having no
|
||||
* shard left is okay at this point. It will be handled at a later
|
||||
* stage.
|
||||
*/
|
||||
if (list_length(prunedShardList) > 1)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
relationRestriction->prunedShardIntervalList = prunedShardList;
|
||||
prunedRelationShardList = lappend(prunedRelationShardList, prunedShardList);
|
||||
}
|
||||
|
||||
return prunedRelationShardList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WorkersContainingAllShards returns list of shard placements that contain all
|
||||
* shard intervals provided to the function. It returns NIL if no placement exists.
|
||||
* The caller should check if there are any shard intervals exist for placement
|
||||
* check prior to calling this function.
|
||||
*/
|
||||
static List *
|
||||
WorkersContainingAllShards(List *prunedShardIntervalsList)
|
||||
{
|
||||
ListCell *prunedShardIntervalCell = NULL;
|
||||
bool firstShard = true;
|
||||
List *currentPlacementList = NIL;
|
||||
|
||||
foreach(prunedShardIntervalCell, prunedShardIntervalsList)
|
||||
{
|
||||
List *shardIntervalList = (List *) lfirst(prunedShardIntervalCell);
|
||||
ShardInterval *shardInterval = NULL;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *newPlacementList = NIL;
|
||||
|
||||
if (shardIntervalList == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Assert(list_length(shardIntervalList) == 1);
|
||||
|
||||
shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||
shardId = shardInterval->shardId;
|
||||
|
||||
/* retrieve all active shard placements for this shard */
|
||||
newPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
if (firstShard)
|
||||
{
|
||||
firstShard = false;
|
||||
currentPlacementList = newPlacementList;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* keep placements that still exists for this shard */
|
||||
currentPlacementList = IntersectPlacementList(currentPlacementList,
|
||||
newPlacementList);
|
||||
}
|
||||
|
||||
/*
|
||||
* Bail out if placement list becomes empty. This means there is no worker
|
||||
* containing all shards referecend by the query, hence we can not forward
|
||||
* this query directly to any worker.
|
||||
*/
|
||||
if (currentPlacementList == NIL)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return currentPlacementList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IntersectPlacementList performs placement pruning based on matching on
|
||||
* nodeName:nodePort fields of shard placement data. We start pruning from all
|
||||
* placements of the first relation's shard. Then for each relation's shard, we
|
||||
* compute intersection of the new shards placement with existing placement list.
|
||||
* This operation could have been done using other methods, but since we do not
|
||||
* expect very high replication factor, iterating over a list and making string
|
||||
* comparisons should be sufficient.
|
||||
*/
|
||||
static List *
|
||||
IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
|
||||
{
|
||||
ListCell *lhsPlacementCell = NULL;
|
||||
List *placementList = NIL;
|
||||
|
||||
/* Keep existing placement in the list if it is also present in new placement list */
|
||||
foreach(lhsPlacementCell, lhsPlacementList)
|
||||
{
|
||||
ShardPlacement *lhsPlacement = (ShardPlacement *) lfirst(lhsPlacementCell);
|
||||
ListCell *rhsPlacementCell = NULL;
|
||||
foreach(rhsPlacementCell, rhsPlacementList)
|
||||
{
|
||||
ShardPlacement *rhsPlacement = (ShardPlacement *) lfirst(rhsPlacementCell);
|
||||
if (rhsPlacement->nodePort == lhsPlacement->nodePort &&
|
||||
strncmp(rhsPlacement->nodeName, lhsPlacement->nodeName,
|
||||
WORKER_LENGTH) == 0)
|
||||
{
|
||||
placementList = lappend(placementList, rhsPlacement);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return placementList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
|
||||
* subquery RTE that returns no results.
|
||||
*/
|
||||
static void
|
||||
ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
|
||||
{
|
||||
Relation relation = heap_open(rte->relid, NoLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||
int columnCount = tupleDescriptor->natts;
|
||||
int columnIndex = 0;
|
||||
Query *subquery = NULL;
|
||||
List *targetList = NIL;
|
||||
FromExpr *joinTree = NULL;
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex];
|
||||
TargetEntry *targetEntry = NULL;
|
||||
StringInfo resname = NULL;
|
||||
Const *constValue = NULL;
|
||||
|
||||
if (attributeForm->attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
resname = makeStringInfo();
|
||||
constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod,
|
||||
attributeForm->attcollation);
|
||||
|
||||
appendStringInfo(resname, "%s", attributeForm->attname.data);
|
||||
|
||||
targetEntry = makeNode(TargetEntry);
|
||||
targetEntry->expr = (Expr *) constValue;
|
||||
targetEntry->resno = columnIndex;
|
||||
targetEntry->resname = resname->data;
|
||||
|
||||
targetList = lappend(targetList, targetEntry);
|
||||
}
|
||||
|
||||
heap_close(relation, NoLock);
|
||||
|
||||
joinTree = makeNode(FromExpr);
|
||||
joinTree->quals = makeBoolConst(false, false);
|
||||
|
||||
subquery = makeNode(Query);
|
||||
subquery->commandType = CMD_SELECT;
|
||||
subquery->querySource = QSRC_ORIGINAL;
|
||||
subquery->canSetTag = true;
|
||||
subquery->targetList = targetList;
|
||||
subquery->jointree = joinTree;
|
||||
|
||||
rte->rtekind = RTE_SUBQUERY;
|
||||
rte->subquery = subquery;
|
||||
rte->alias = copyObject(rte->eref);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateRelationNames walks over the query tree and appends shard ids to
|
||||
* relations. It uses unique identity value to establish connection between a
|
||||
* shard and the range table entry. If the range table id is not given a
|
||||
* identity, than the relation is not referenced from the query, no connection
|
||||
* could be found between a shard and this relation. Therefore relation is replaced
|
||||
* by set of NULL values so that the query would work at worker without any problems.
|
||||
*
|
||||
*/
|
||||
static bool
|
||||
UpdateRelationNames(Node *node, RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
RangeTblEntry *newRte = NULL;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
Oid relationId = InvalidOid;
|
||||
Oid schemaId = InvalidOid;
|
||||
char *relationName = NULL;
|
||||
char *schemaName = NULL;
|
||||
ListCell *relationRestrictionCell = NULL;
|
||||
RelationRestriction *relationRestriction = NULL;
|
||||
List *shardIntervalList = NIL;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
bool replaceRteWithNullValues = false;
|
||||
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* want to look at all RTEs, even in subqueries, CTEs and such */
|
||||
if (IsA(node, Query))
|
||||
{
|
||||
return query_tree_walker((Query *) node, UpdateRelationNames, restrictionContext,
|
||||
QTW_EXAMINE_RTES);
|
||||
}
|
||||
|
||||
if (!IsA(node, RangeTblEntry))
|
||||
{
|
||||
return expression_tree_walker(node, UpdateRelationNames, restrictionContext);
|
||||
}
|
||||
|
||||
|
||||
newRte = (RangeTblEntry *) node;
|
||||
|
||||
if (newRte->rtekind != RTE_RELATION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search for the restrictions associated with the RTE. There better be
|
||||
* some, otherwise this query wouldn't be elegible as a router query.
|
||||
*
|
||||
* FIXME: We should probably use a hashtable here, to do efficient
|
||||
* lookup.
|
||||
*/
|
||||
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
|
||||
{
|
||||
relationRestriction =
|
||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||
|
||||
if (GetRTEIdentity(relationRestriction->rte) == GetRTEIdentity(newRte))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
relationRestriction = NULL;
|
||||
}
|
||||
|
||||
replaceRteWithNullValues = (relationRestriction == NULL) ||
|
||||
relationRestriction->prunedShardIntervalList == NIL;
|
||||
|
||||
if (replaceRteWithNullValues)
|
||||
{
|
||||
ConvertRteToSubqueryWithEmptyResult(newRte);
|
||||
return false;
|
||||
}
|
||||
|
||||
Assert(relationRestriction != NULL);
|
||||
|
||||
shardIntervalList = relationRestriction->prunedShardIntervalList;
|
||||
|
||||
Assert(list_length(shardIntervalList) == 1);
|
||||
shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||
|
||||
shardId = shardInterval->shardId;
|
||||
relationId = shardInterval->relationId;
|
||||
relationName = get_rel_name(relationId);
|
||||
AppendShardIdToName(&relationName, shardId);
|
||||
|
||||
schemaId = get_rel_namespace(relationId);
|
||||
schemaName = get_namespace_name(schemaId);
|
||||
|
||||
ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterQueryJob creates a Job for the specified query to execute the
|
||||
* provided single shard select task.
|
||||
*/
|
||||
static Job *
|
||||
RouterQueryJob(Query *query, Task *task)
|
||||
RouterQueryJob(Query *query, Task *task, List *placementList)
|
||||
{
|
||||
Job *job = NULL;
|
||||
List *taskList = NIL;
|
||||
|
@ -1007,7 +1403,8 @@ RouterQueryJob(Query *query, Task *task)
|
|||
|
||||
/*
|
||||
* We send modify task to the first replica, otherwise we choose the target shard
|
||||
* according to task assignment policy.
|
||||
* according to task assignment policy. Placement list for select queries are
|
||||
* provided as function parameter.
|
||||
*/
|
||||
if (taskType == MODIFY_TASK)
|
||||
{
|
||||
|
@ -1015,7 +1412,10 @@ RouterQueryJob(Query *query, Task *task)
|
|||
}
|
||||
else
|
||||
{
|
||||
taskList = AssignAnchorShardTaskList(list_make1(task));
|
||||
Assert(placementList != NIL);
|
||||
|
||||
task->taskPlacementList = placementList;
|
||||
taskList = list_make1(task);
|
||||
}
|
||||
|
||||
job = CitusMakeNode(Job);
|
||||
|
@ -1036,22 +1436,11 @@ RouterQueryJob(Query *query, Task *task)
|
|||
* partition column. This feature is enabled if task executor is set to real-time
|
||||
*/
|
||||
bool
|
||||
MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
|
||||
MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
uint32 rangeTableId = 1;
|
||||
List *rangeTableList = NIL;
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
Oid distributedTableId = InvalidOid;
|
||||
Var *partitionColumn = NULL;
|
||||
char partitionMethod = '\0';
|
||||
Node *quals = NULL;
|
||||
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
|
||||
FromExpr *joinTree = query->jointree;
|
||||
List *varClauseList = NIL;
|
||||
ListCell *varClauseCell = NULL;
|
||||
bool partitionColumnMatchExpression = false;
|
||||
int partitionColumnReferenceCount = 0;
|
||||
int shardCount = 0;
|
||||
CmdType commandType = query->commandType;
|
||||
ListCell *relationRestrictionContextCell = NULL;
|
||||
|
||||
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
|
||||
commandType == CMD_DELETE)
|
||||
|
@ -1059,6 +1448,7 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
|
|||
return true;
|
||||
}
|
||||
|
||||
/* FIXME: I tend to think it's time to remove this */
|
||||
if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME)
|
||||
{
|
||||
return false;
|
||||
|
@ -1066,171 +1456,28 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
|
|||
|
||||
Assert(commandType == CMD_SELECT);
|
||||
|
||||
/*
|
||||
* Reject subqueries which are in SELECT or WHERE clause.
|
||||
* Queries which are recursive, with CommonTableExpr, with locking (hasForUpdate),
|
||||
* or with window functions are also rejected here.
|
||||
* Queries which have subqueries, or tablesamples in FROM clauses are rejected later
|
||||
* during RangeTblEntry checks.
|
||||
*/
|
||||
if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate ||
|
||||
query->hasRecursive)
|
||||
if (query->hasForUpdate)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (query->groupingSets)
|
||||
foreach(relationRestrictionContextCell, restrictionContext->relationRestrictionList)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* only hash partitioned tables are supported */
|
||||
distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||
partitionMethod = PartitionMethod(distributedTableId);
|
||||
|
||||
if (partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* extract range table entries */
|
||||
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
|
||||
|
||||
/* query can have only one range table of type RTE_RELATION */
|
||||
if (list_length(rangeTableList) != 1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (rangeTableEntry->tablesample)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (joinTree == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
quals = joinTree->quals;
|
||||
if (quals == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* convert list of expressions into expression tree */
|
||||
if (quals != NULL && IsA(quals, List))
|
||||
{
|
||||
quals = (Node *) make_ands_explicit((List *) quals);
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition column must be used in a simple equality match check and it must be
|
||||
* place at top level conjustion operator.
|
||||
*/
|
||||
partitionColumnMatchExpression =
|
||||
ColumnMatchExpressionAtTopLevelConjunction(quals, partitionColumn);
|
||||
|
||||
if (!partitionColumnMatchExpression)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* make sure partition column is used only once in the query */
|
||||
varClauseList = pull_var_clause_default(quals);
|
||||
foreach(varClauseCell, varClauseList)
|
||||
{
|
||||
Var *column = (Var *) lfirst(varClauseCell);
|
||||
if (equal(column, partitionColumn))
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) lfirst(relationRestrictionContextCell);
|
||||
RangeTblEntry *rte = relationRestriction->rte;
|
||||
if (rte->rtekind == RTE_RELATION)
|
||||
{
|
||||
partitionColumnReferenceCount++;
|
||||
}
|
||||
}
|
||||
/* only hash partitioned tables are supported */
|
||||
Oid distributedTableId = rte->relid;
|
||||
char partitionMethod = PartitionMethod(distributedTableId);
|
||||
|
||||
if (partitionColumnReferenceCount != 1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* We need to make sure there is at least one shard for this hash partitioned
|
||||
* query to be router plannable. We can not prepare a router plan if there
|
||||
* are no shards.
|
||||
*/
|
||||
shardCount = ShardIntervalCount(distributedTableId);
|
||||
if (shardCount == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnMatchExpressionAtTopLevelConjunction returns true if the query contains an exact
|
||||
* match (equal) expression on the provided column. The function returns true only
|
||||
* if the match expression has an AND relation with the rest of the expression tree.
|
||||
*/
|
||||
static bool
|
||||
ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
|
||||
{
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsA(node, OpExpr))
|
||||
{
|
||||
OpExpr *opExpr = (OpExpr *) node;
|
||||
bool simpleExpression = SimpleOpExpression((Expr *) opExpr);
|
||||
bool columnInExpr = false;
|
||||
bool usingEqualityOperator = false;
|
||||
|
||||
if (!simpleExpression)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
columnInExpr = OpExpressionContainsColumn(opExpr, column);
|
||||
if (!columnInExpr)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
usingEqualityOperator = OperatorImplementsEquality(opExpr->opno);
|
||||
|
||||
return usingEqualityOperator;
|
||||
}
|
||||
else if (IsA(node, BoolExpr))
|
||||
{
|
||||
BoolExpr *boolExpr = (BoolExpr *) node;
|
||||
List *argumentList = boolExpr->args;
|
||||
ListCell *argumentCell = NULL;
|
||||
|
||||
if (boolExpr->boolop != AND_EXPR)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(argumentCell, argumentList)
|
||||
{
|
||||
Node *argumentNode = (Node *) lfirst(argumentCell);
|
||||
bool columnMatch =
|
||||
ColumnMatchExpressionAtTopLevelConjunction(argumentNode, column);
|
||||
if (columnMatch)
|
||||
if (partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "optimizer/planner.h"
|
||||
#include "optimizer/paths.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/guc_tables.h"
|
||||
|
||||
|
@ -142,6 +143,9 @@ _PG_init(void)
|
|||
/* register utility hook */
|
||||
ProcessUtility_hook = multi_ProcessUtility;
|
||||
|
||||
/* register for planner hook */
|
||||
set_rel_pathlist_hook = multi_relation_restriction_hook;
|
||||
|
||||
/* organize that task tracker is started once server is up */
|
||||
TaskTrackerRegister();
|
||||
|
||||
|
|
|
@ -181,6 +181,8 @@ extern bool SubqueryPushdown;
|
|||
/* Function declarations for building logical plans */
|
||||
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree);
|
||||
extern bool NeedsDistributedPlanning(Query *queryTree);
|
||||
extern int GetRTEIdentity(RangeTblEntry *rte);
|
||||
extern void IdentifyRTE(RangeTblEntry *rte, int identifier);
|
||||
extern MultiNode * ParentNode(MultiNode *multiNode);
|
||||
extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
|
||||
extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode);
|
||||
|
|
|
@ -13,14 +13,41 @@
|
|||
#include "nodes/plannodes.h"
|
||||
#include "nodes/relation.h"
|
||||
|
||||
|
||||
typedef struct RelationRestrictionContext
|
||||
{
|
||||
bool hasDistributedRelation;
|
||||
bool hasLocalRelation;
|
||||
List *relationRestrictionList;
|
||||
} RelationRestrictionContext;
|
||||
|
||||
typedef struct RelationRestriction
|
||||
{
|
||||
Index index;
|
||||
Oid relationId;
|
||||
bool distributedRelation;
|
||||
RangeTblEntry *rte;
|
||||
RelOptInfo *relOptInfo;
|
||||
PlannerInfo *plannerInfo;
|
||||
List *prunedShardIntervalList;
|
||||
} RelationRestriction;
|
||||
|
||||
|
||||
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
|
||||
ParamListInfo boundParams);
|
||||
|
||||
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
|
||||
struct MultiPlan;
|
||||
extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query);
|
||||
extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
|
||||
RelationRestrictionContext *
|
||||
restrictionContext);
|
||||
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
|
||||
extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
|
||||
struct MultiPlan *multiPlan);
|
||||
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||
Index index, RangeTblEntry *rte);
|
||||
extern RelationRestrictionContext * CreateAndPushRestrictionContext(void);
|
||||
extern RelationRestrictionContext * CurrentRestrictionContext(void);
|
||||
extern void PopRestrictionContext(void);
|
||||
|
||||
#endif /* MULTI_PLANNER_H */
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
|
||||
|
@ -29,7 +30,8 @@
|
|||
|
||||
|
||||
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
|
||||
MultiExecutorType taskExecutorType);
|
||||
MultiExecutorType taskExecutorType,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -48,10 +48,10 @@ SELECT count(*) FROM orders_hash_partitioned;
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -59,10 +59,10 @@ DEBUG: Plan is router executable
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -70,10 +70,10 @@ DEBUG: Plan is router executable
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -81,10 +81,10 @@ DEBUG: Plan is router executable
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -93,10 +93,10 @@ DEBUG: Plan is router executable
|
|||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -104,10 +104,10 @@ DEBUG: Plan is router executable
|
|||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
|
@ -198,6 +198,8 @@ SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
|
|||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR o_orderkey = 2;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
count
|
||||
-------
|
||||
|
@ -214,6 +216,8 @@ SELECT count(*) FROM orders_hash_partitioned
|
|||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
|
@ -232,6 +236,8 @@ SELECT count(*) FROM
|
|||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
|
@ -242,6 +248,8 @@ DEBUG: predicate pruning for shardId 630003
|
|||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = ANY ('{1,2,3}');
|
||||
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
||||
HINT: Consider rewriting the expression with OR/AND clauses.
|
||||
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
||||
HINT: Consider rewriting the expression with OR/AND clauses.
|
||||
count
|
||||
-------
|
||||
|
@ -285,6 +293,8 @@ SELECT count(*) FROM orders_hash_partitioned
|
|||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
|
@ -319,9 +329,11 @@ SELECT count(*)
|
|||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
|
|
|
@ -0,0 +1,329 @@
|
|||
--
|
||||
-- MULTI_HASH_PRUNING
|
||||
--
|
||||
-- Tests for shard and join pruning logic on hash partitioned tables.
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 630000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 630000;
|
||||
-- Create a table partitioned on integer column and update partition type to
|
||||
-- hash. Then stage data to this table and update shard min max values with
|
||||
-- hashed ones. Hash value of 1, 2, 3 and 4 are consecutively -1905060026,
|
||||
-- 1134484726, -28094569 and -1011077333.
|
||||
CREATE TABLE orders_hash_partitioned (
|
||||
o_orderkey integer,
|
||||
o_custkey integer,
|
||||
o_orderstatus char(1),
|
||||
o_totalprice decimal(15,2),
|
||||
o_orderdate date,
|
||||
o_orderpriority char(15),
|
||||
o_clerk char(15),
|
||||
o_shippriority integer,
|
||||
o_comment varchar(79) );
|
||||
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
-- Check that we can prune shards for simple cases, boolean expressions and
|
||||
-- immutable functions.
|
||||
-- Since router plans are not triggered for task-tracker executor type,
|
||||
-- we need to run the tests that triggers router planning seperately for
|
||||
-- both executors. Otherwise, check-full fails on the task-tracker.
|
||||
-- Later, we need to switch back to the actual task executor
|
||||
-- to contuinue with correct executor type for check-full.
|
||||
SELECT quote_literal(current_setting('citus.task_executor_type')) AS actual_task_executor
|
||||
\gset
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
SELECT count(*) FROM orders_hash_partitioned;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM orders_hash_partitioned;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
|
||||
DEBUG: predicate pruning for shardId 630000
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO :actual_task_executor;
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR o_orderkey = 2;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR o_orderkey is NULL;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM
|
||||
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Check that we don't support pruning for ANY (array expression) and give
|
||||
-- a notice message when used with the partition column
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = ANY ('{1,2,3}');
|
||||
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
||||
HINT: Consider rewriting the expression with OR/AND clauses.
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Check that we don't show the message if the operator is not
|
||||
-- equality operator
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey < ALL ('{1,2,3}');
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Check that we don't give a spurious hint message when non-partition
|
||||
-- columns are used with ANY/IN/ALL
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Check that we cannot prune for mutable functions.
|
||||
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = random() OR o_orderkey = 1;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM orders_hash_partitioned
|
||||
WHERE o_orderkey = random() AND o_orderkey = 1;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- Check that we can do join pruning.
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
|
||||
WHERE orders1.o_orderkey = orders2.o_orderkey;
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
|
||||
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
|
||||
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
|
||||
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
|
||||
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
|
||||
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
|
||||
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
|
||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
|
||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
|
||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)
|
||||
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
|
||||
WHERE orders1.o_orderkey = orders2.o_orderkey
|
||||
AND orders1.o_orderkey = 1
|
||||
AND orders2.o_orderkey is NULL;
|
||||
DEBUG: predicate pruning for shardId 630001
|
||||
DEBUG: predicate pruning for shardId 630002
|
||||
DEBUG: predicate pruning for shardId 630003
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
|
||||
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
|
@ -90,7 +90,7 @@ SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('customer_hash', 1, 1);
|
||||
SELECT master_create_worker_shards('customer_hash', 2, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -25,13 +25,6 @@ SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'ha
|
|||
|
||||
(1 row)
|
||||
|
||||
-- test when a table is distributed but no shards created yet
|
||||
SELECT count(*) from articles;
|
||||
count
|
||||
-------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('articles', 2, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
@ -97,13 +90,6 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
|
|||
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
|
||||
-- insert a single row for the test
|
||||
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
|
||||
-- first, test zero-shard SELECT, which should return an empty row
|
||||
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
|
||||
count
|
||||
-------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- zero-shard modifications should fail
|
||||
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
|
||||
ERROR: distributed modifications must target exactly one shard
|
||||
|
@ -170,18 +156,20 @@ SELECT title, author_id FROM articles
|
|||
alkylic | 8
|
||||
(10 rows)
|
||||
|
||||
-- add in some grouping expressions, still on same shard
|
||||
-- add in some grouping expressions.
|
||||
-- it is supported if it is on the same shard, but not supported if it
|
||||
-- involves multiple shards.
|
||||
-- having queries unsupported in Citus
|
||||
SELECT author_id, sum(word_count) AS corpus_size FROM articles
|
||||
WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10
|
||||
WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
|
||||
GROUP BY author_id
|
||||
HAVING sum(word_count) > 40000
|
||||
ORDER BY sum(word_count) DESC;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Having qual is currently unsupported
|
||||
-- UNION/INTERSECT queries are unsupported
|
||||
-- UNION/INTERSECT queries are unsupported if on multiple shards
|
||||
SELECT * FROM articles WHERE author_id = 10 UNION
|
||||
SELECT * FROM articles WHERE author_id = 1;
|
||||
SELECT * FROM articles WHERE author_id = 2;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Union, Intersect, or Except are currently unsupported
|
||||
-- queries using CTEs are unsupported
|
||||
|
@ -324,8 +312,8 @@ SET citus.task_executor_type TO 'real-time';
|
|||
SELECT *
|
||||
FROM articles
|
||||
WHERE author_id = 1;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
id | author_id | title | word_count
|
||||
----+-----------+--------------+------------
|
||||
|
@ -341,6 +329,7 @@ SELECT *
|
|||
FROM articles
|
||||
WHERE author_id = 1 OR author_id = 17;
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
id | author_id | title | word_count
|
||||
----+-----------+--------------+------------
|
||||
|
@ -368,8 +357,8 @@ SELECT *
|
|||
SELECT id as article_id, word_count * id as random_value
|
||||
FROM articles
|
||||
WHERE author_id = 1;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
article_id | random_value
|
||||
------------+--------------
|
||||
|
@ -386,9 +375,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
|
|||
FROM articles a, articles b
|
||||
WHERE a.author_id = 10 and a.author_id = b.author_id
|
||||
LIMIT 3;
|
||||
DEBUG: push down of limit count: 3
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
first_author | second_word_count
|
||||
--------------+-------------------
|
||||
|
@ -403,8 +392,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
|
|||
FROM articles a, articles_single_shard b
|
||||
WHERE a.author_id = 10 and a.author_id = b.author_id
|
||||
LIMIT 3;
|
||||
DEBUG: push down of limit count: 3
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
first_author | second_word_count
|
||||
--------------+-------------------
|
||||
10 | 19519
|
||||
|
@ -417,8 +407,8 @@ SELECT *
|
|||
FROM articles
|
||||
WHERE author_id = 1
|
||||
LIMIT 2;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
id | author_id | title | word_count
|
||||
----+-----------+----------+------------
|
||||
|
@ -433,8 +423,8 @@ SELECT id
|
|||
FROM articles
|
||||
WHERE author_id = 1
|
||||
GROUP BY id;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
id
|
||||
----
|
||||
|
@ -447,14 +437,15 @@ DEBUG: Plan is router executable
|
|||
|
||||
-- copying from a single shard table does not require the master query
|
||||
COPY articles_single_shard TO stdout;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
50 10 anjanette 19519
|
||||
-- error out for queries with aggregates
|
||||
SELECT avg(word_count)
|
||||
FROM articles
|
||||
WHERE author_id = 2;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850000
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
avg
|
||||
--------------------
|
||||
|
@ -467,8 +458,8 @@ SELECT max(word_count) as max, min(word_count) as min,
|
|||
sum(word_count) as sum, count(word_count) as cnt
|
||||
FROM articles
|
||||
WHERE author_id = 2;
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: predicate pruning for shardId 850000
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
max | min | sum | cnt
|
||||
-------+------+-------+-----
|
||||
|
@ -480,6 +471,7 @@ SELECT *
|
|||
FROM articles a, articles b
|
||||
WHERE a.id = b.id AND a.author_id = 1;
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: predicate pruning for shardId 850001
|
||||
DEBUG: join prunable for task partitionId 0 and 1
|
||||
DEBUG: join prunable for task partitionId 0 and 2
|
||||
DEBUG: join prunable for task partitionId 0 and 3
|
||||
|
|
|
@ -60,7 +60,7 @@ SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
|
|||
-- sharded table.
|
||||
|
||||
SELECT count(distinct p_mfgr) FROM part;
|
||||
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr;
|
||||
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr;
|
||||
|
||||
-- We don't support count(distinct) queries if table is append partitioned and
|
||||
-- has multiple shards
|
||||
|
|
|
@ -105,12 +105,13 @@ WHERE
|
|||
|
||||
|
||||
-- This query is an INNER JOIN in disguise since there cannot be NULL results
|
||||
-- Added extra filter to make query not router plannable
|
||||
SELECT
|
||||
min(l_custkey), max(l_custkey)
|
||||
FROM
|
||||
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
|
||||
WHERE
|
||||
r_custkey = 5;
|
||||
r_custkey = 5 or r_custkey > 15;
|
||||
|
||||
|
||||
-- Apply a filter before the join
|
||||
|
@ -204,12 +205,13 @@ WHERE
|
|||
|
||||
|
||||
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
|
||||
-- Added extra filter to make query not router plannable
|
||||
SELECT
|
||||
min(l_custkey), max(l_custkey)
|
||||
FROM
|
||||
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
|
||||
WHERE
|
||||
r_custkey = 21;
|
||||
r_custkey = 21 or r_custkey < 10;
|
||||
|
||||
|
||||
-- Apply a filter before the join
|
||||
|
|
|
@ -85,14 +85,14 @@ SELECT count(distinct p_mfgr) FROM part;
|
|||
5
|
||||
(1 row)
|
||||
|
||||
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr;
|
||||
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr;
|
||||
p_mfgr | count
|
||||
---------------------------+-------
|
||||
Manufacturer#1 | 193
|
||||
Manufacturer#3 | 228
|
||||
Manufacturer#5 | 185
|
||||
Manufacturer#2 | 190
|
||||
Manufacturer#3 | 228
|
||||
Manufacturer#4 | 204
|
||||
Manufacturer#5 | 185
|
||||
(5 rows)
|
||||
|
||||
-- We don't support count(distinct) queries if table is append partitioned and
|
||||
|
|
|
@ -134,12 +134,13 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
|
|||
(1 row)
|
||||
|
||||
-- This query is an INNER JOIN in disguise since there cannot be NULL results
|
||||
-- Added extra filter to make query not router plannable
|
||||
SELECT
|
||||
min(l_custkey), max(l_custkey)
|
||||
FROM
|
||||
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
|
||||
WHERE
|
||||
r_custkey = 5;
|
||||
r_custkey = 5 or r_custkey > 15;
|
||||
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_right" ]
|
||||
min | max
|
||||
-----+-----
|
||||
|
@ -273,12 +274,13 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
|
|||
(1 row)
|
||||
|
||||
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
|
||||
-- Added extra filter to make query not router plannable
|
||||
SELECT
|
||||
min(l_custkey), max(l_custkey)
|
||||
FROM
|
||||
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
|
||||
WHERE
|
||||
r_custkey = 21;
|
||||
r_custkey = 21 or r_custkey < 10;
|
||||
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
|
||||
min | max
|
||||
-----+-----
|
||||
|
|
|
@ -63,7 +63,7 @@ CREATE TABLE customer_hash (
|
|||
c_mktsegment char(10) not null,
|
||||
c_comment varchar(117) not null);
|
||||
SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
|
||||
SELECT master_create_worker_shards('customer_hash', 1, 1);
|
||||
SELECT master_create_worker_shards('customer_hash', 2, 1);
|
||||
|
||||
-- The following query checks that we can correctly handle self-joins
|
||||
|
||||
|
|
|
@ -94,9 +94,6 @@ SET client_min_messages TO 'DEBUG2';
|
|||
-- insert a single row for the test
|
||||
INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519);
|
||||
|
||||
-- first, test zero-shard SELECT, which should return an empty row
|
||||
SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2;
|
||||
|
||||
-- single-shard tests
|
||||
|
||||
-- test simple select for a single row
|
||||
|
@ -141,32 +138,130 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
|
|||
HAVING sum(word_count) > 1000
|
||||
ORDER BY sum(word_count) DESC;
|
||||
|
||||
|
||||
-- UNION/INTERSECT queries are unsupported
|
||||
-- this is rejected by router planner and handled by multi_logical_planner
|
||||
SELECT * FROM articles_hash WHERE author_id = 10 UNION
|
||||
SELECT * FROM articles_hash WHERE author_id = 1;
|
||||
|
||||
-- query is a single shard query but can't do shard pruning,
|
||||
-- not router-plannable due to <= and IN
|
||||
SELECT * FROM articles_hash WHERE author_id <= 1;
|
||||
SELECT * FROM articles_hash WHERE author_id IN (1, 3);
|
||||
|
||||
-- queries using CTEs are unsupported
|
||||
-- queries with CTEs are supported
|
||||
WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1)
|
||||
SELECT * FROM first_author;
|
||||
|
||||
-- queries with CTEs are supported even if CTE is not referenced inside query
|
||||
WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1)
|
||||
SELECT title FROM articles_hash WHERE author_id = 1;
|
||||
|
||||
-- queries which involve functions in FROM clause are unsupported.
|
||||
-- two CTE joins are supported if they go to the same worker
|
||||
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
|
||||
id_title AS (SELECT id, title from articles_hash WHERE author_id = 1)
|
||||
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
|
||||
|
||||
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
|
||||
id_title AS (SELECT id, title from articles_hash WHERE author_id = 3)
|
||||
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
|
||||
|
||||
-- CTE joins are not supported if table shards are at different workers
|
||||
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
|
||||
id_title AS (SELECT id, title from articles_hash WHERE author_id = 2)
|
||||
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
|
||||
|
||||
-- recursive CTEs are supported when filtered on partition column
|
||||
CREATE TABLE company_employees (company_id int, employee_id int, manager_id int);
|
||||
SELECT master_create_distributed_table('company_employees', 'company_id', 'hash');
|
||||
SELECT master_create_worker_shards('company_employees', 4, 1);
|
||||
|
||||
INSERT INTO company_employees values(1, 1, 0);
|
||||
INSERT INTO company_employees values(1, 2, 1);
|
||||
INSERT INTO company_employees values(1, 3, 1);
|
||||
INSERT INTO company_employees values(1, 4, 2);
|
||||
INSERT INTO company_employees values(1, 5, 4);
|
||||
|
||||
INSERT INTO company_employees values(3, 1, 0);
|
||||
INSERT INTO company_employees values(3, 15, 1);
|
||||
INSERT INTO company_employees values(3, 3, 1);
|
||||
|
||||
-- find employees at top 2 level within company hierarchy
|
||||
WITH RECURSIVE hierarchy as (
|
||||
SELECT *, 1 AS level
|
||||
FROM company_employees
|
||||
WHERE company_id = 1 and manager_id = 0
|
||||
UNION
|
||||
SELECT ce.*, (h.level+1)
|
||||
FROM hierarchy h JOIN company_employees ce
|
||||
ON (h.employee_id = ce.manager_id AND
|
||||
h.company_id = ce.company_id AND
|
||||
ce.company_id = 1))
|
||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||
|
||||
-- query becomes not router plannble and gets rejected
|
||||
-- if filter on company is dropped
|
||||
WITH RECURSIVE hierarchy as (
|
||||
SELECT *, 1 AS level
|
||||
FROM company_employees
|
||||
WHERE company_id = 1 and manager_id = 0
|
||||
UNION
|
||||
SELECT ce.*, (h.level+1)
|
||||
FROM hierarchy h JOIN company_employees ce
|
||||
ON (h.employee_id = ce.manager_id AND
|
||||
h.company_id = ce.company_id))
|
||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||
|
||||
-- logically wrong query, query involves different shards
|
||||
-- from the same table, but still router plannable due to
|
||||
-- shard being placed on the same worker.
|
||||
WITH RECURSIVE hierarchy as (
|
||||
SELECT *, 1 AS level
|
||||
FROM company_employees
|
||||
WHERE company_id = 3 and manager_id = 0
|
||||
UNION
|
||||
SELECT ce.*, (h.level+1)
|
||||
FROM hierarchy h JOIN company_employees ce
|
||||
ON (h.employee_id = ce.manager_id AND
|
||||
h.company_id = ce.company_id AND
|
||||
ce.company_id = 2))
|
||||
SELECT * FROM hierarchy WHERE LEVEL <= 2;
|
||||
|
||||
-- grouping sets are supported on single shard
|
||||
SELECT
|
||||
id, substring(title, 2, 1) AS subtitle, count(*)
|
||||
FROM articles_hash
|
||||
WHERE author_id = 1 or author_id = 3
|
||||
GROUP BY GROUPING SETS ((id),(subtitle));
|
||||
|
||||
-- grouping sets are not supported on multiple shards
|
||||
SELECT
|
||||
id, substring(title, 2, 1) AS subtitle, count(*)
|
||||
FROM articles_hash
|
||||
WHERE author_id = 1 or author_id = 2
|
||||
GROUP BY GROUPING SETS ((id),(subtitle));
|
||||
|
||||
-- queries which involve functions in FROM clause are supported if it goes to a single worker.
|
||||
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1;
|
||||
|
||||
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3;
|
||||
|
||||
-- they are not supported if multiple workers are involved
|
||||
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2;
|
||||
|
||||
-- subqueries are not supported in WHERE clause in Citus
|
||||
SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a');
|
||||
|
||||
SELECT * FROM articles_hash WHERE author_id IN (SELECT author_id FROM articles_hash WHERE author_id = 1 or author_id = 3);
|
||||
|
||||
SELECT * FROM articles_hash WHERE author_id = (SELECT 1);
|
||||
|
||||
|
||||
-- subqueries are supported in FROM clause but they are not router plannable
|
||||
SELECT articles_hash.id,test.word_count
|
||||
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id
|
||||
ORDER BY articles_hash.id;
|
||||
|
||||
|
||||
SELECT articles_hash.id,test.word_count
|
||||
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test
|
||||
WHERE test.id = articles_hash.id and articles_hash.author_id = 1
|
||||
ORDER BY articles_hash.id;
|
||||
|
||||
-- subqueries are not supported in SELECT clause
|
||||
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1)
|
||||
AS special_price FROM articles_hash a;
|
||||
|
@ -176,8 +271,7 @@ SELECT *
|
|||
FROM articles_hash
|
||||
WHERE author_id = 1;
|
||||
|
||||
-- below query hits a single shard, but it is not router plannable
|
||||
-- still router executable
|
||||
-- below query hits a single shard, router plannable
|
||||
SELECT *
|
||||
FROM articles_hash
|
||||
WHERE author_id = 1 OR author_id = 17;
|
||||
|
@ -194,18 +288,26 @@ SELECT id as article_id, word_count * id as random_value
|
|||
WHERE author_id = 1;
|
||||
|
||||
-- we can push down co-located joins to a single worker
|
||||
-- this is not router plannable but router executable
|
||||
-- handled by real-time executor
|
||||
SELECT a.author_id as first_author, b.word_count as second_word_count
|
||||
FROM articles_hash a, articles_hash b
|
||||
WHERE a.author_id = 10 and a.author_id = b.author_id
|
||||
LIMIT 3;
|
||||
|
||||
-- following join is neither router plannable, nor router executable
|
||||
-- following join is router plannable since the same worker
|
||||
-- has both shards
|
||||
SELECT a.author_id as first_author, b.word_count as second_word_count
|
||||
FROM articles_hash a, articles_single_shard_hash b
|
||||
WHERE a.author_id = 10 and a.author_id = b.author_id
|
||||
LIMIT 3;
|
||||
|
||||
-- following join is not router plannable since there are no
|
||||
-- workers containing both shards, added a CTE to make this fail
|
||||
-- at logical planner
|
||||
WITH single_shard as (SELECT * FROM articles_single_shard_hash)
|
||||
SELECT a.author_id as first_author, b.word_count as second_word_count
|
||||
FROM articles_hash a, single_shard b
|
||||
WHERE a.author_id = 2 and a.author_id = b.author_id
|
||||
LIMIT 3;
|
||||
|
||||
-- single shard select with limit is router plannable
|
||||
SELECT *
|
||||
|
@ -257,7 +359,45 @@ SELECT max(word_count)
|
|||
WHERE author_id = 1
|
||||
GROUP BY author_id;
|
||||
|
||||
|
||||
-- router plannable union queries are supported
|
||||
(SELECT * FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash WHERE author_id = 3);
|
||||
|
||||
SELECT * FROM (
|
||||
(SELECT * FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash WHERE author_id = 3)) uu;
|
||||
|
||||
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3);
|
||||
|
||||
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1)
|
||||
INTERSECT
|
||||
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3);
|
||||
|
||||
(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 1)
|
||||
EXCEPT
|
||||
(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 3);
|
||||
|
||||
-- union queries are not supported if not router plannable
|
||||
-- there is an inconsistency on shard pruning between
|
||||
-- ubuntu/mac disabling log messages for this queries only
|
||||
|
||||
SET client_min_messages to 'NOTICE';
|
||||
|
||||
(SELECT * FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash WHERE author_id = 2);
|
||||
|
||||
|
||||
SELECT * FROM (
|
||||
(SELECT * FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash WHERE author_id = 2)) uu;
|
||||
|
||||
-- error out for queries with repartition jobs
|
||||
SELECT *
|
||||
FROM articles_hash a, articles_hash b
|
||||
|
@ -275,7 +415,7 @@ SET citus.task_executor_type TO 'real-time';
|
|||
SET client_min_messages to 'DEBUG2';
|
||||
|
||||
-- this is definitely single shard
|
||||
-- but not router plannable
|
||||
-- and router plannable
|
||||
SELECT *
|
||||
FROM articles_hash
|
||||
WHERE author_id = 1 and author_id >= 1;
|
||||
|
@ -387,11 +527,9 @@ SELECT id, MIN(id) over (order by word_count)
|
|||
FROM articles_hash
|
||||
WHERE author_id = 1 or author_id = 2;
|
||||
|
||||
|
||||
-- but they are not supported for not router plannable queries
|
||||
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
|
||||
FROM articles_hash
|
||||
WHERE author_id = 5 or author_id = 1;
|
||||
WHERE author_id = 5 or author_id = 2;
|
||||
|
||||
-- complex query hitting a single shard
|
||||
SELECT
|
||||
|
@ -510,6 +648,19 @@ $$ LANGUAGE plpgsql;
|
|||
|
||||
SELECT * FROM author_articles_id_word_count();
|
||||
|
||||
-- materialized views can be created for router plannable queries
|
||||
CREATE MATERIALIZED VIEW mv_articles_hash AS
|
||||
SELECT * FROM articles_hash WHERE author_id = 1;
|
||||
|
||||
SELECT * FROM mv_articles_hash;
|
||||
|
||||
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
|
||||
SELECT * FROM articles_hash WHERE author_id in (1,2);
|
||||
|
||||
-- materialized views with (NO DATA) is still not supported
|
||||
CREATE MATERIALIZED VIEW mv_articles_hash AS
|
||||
SELECT * FROM articles_hash WHERE author_id = 1 WITH NO DATA;
|
||||
|
||||
-- router planner/executor is disabled for task-tracker executor
|
||||
-- following query is router plannable, but router planner is disabled
|
||||
SET citus.task_executor_type to 'task-tracker';
|
||||
|
@ -530,6 +681,8 @@ SET client_min_messages to 'NOTICE';
|
|||
DROP FUNCTION author_articles_max_id();
|
||||
DROP FUNCTION author_articles_id_word_count();
|
||||
|
||||
DROP MATERIALIZED VIEW mv_articles_hash;
|
||||
DROP TABLE articles_hash;
|
||||
DROP TABLE articles_single_shard_hash;
|
||||
DROP TABLE authors_hash;
|
||||
DROP TABLE company_employees;
|
||||
|
|
|
@ -23,10 +23,6 @@ CREATE TABLE articles_single_shard (LIKE articles);
|
|||
SELECT master_create_distributed_table('articles', 'author_id', 'hash');
|
||||
SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash');
|
||||
|
||||
|
||||
-- test when a table is distributed but no shards created yet
|
||||
SELECT count(*) from articles;
|
||||
|
||||
SELECT master_create_worker_shards('articles', 2, 1);
|
||||
SELECT master_create_worker_shards('articles_single_shard', 1, 1);
|
||||
|
||||
|
@ -85,9 +81,6 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
|
|||
-- insert a single row for the test
|
||||
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
|
||||
|
||||
-- first, test zero-shard SELECT, which should return an empty row
|
||||
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
|
||||
|
||||
-- zero-shard modifications should fail
|
||||
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
|
||||
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
|
||||
|
@ -116,17 +109,19 @@ SELECT title, author_id FROM articles
|
|||
WHERE author_id = 7 OR author_id = 8
|
||||
ORDER BY author_id ASC, id;
|
||||
|
||||
-- add in some grouping expressions, still on same shard
|
||||
-- add in some grouping expressions.
|
||||
-- it is supported if it is on the same shard, but not supported if it
|
||||
-- involves multiple shards.
|
||||
-- having queries unsupported in Citus
|
||||
SELECT author_id, sum(word_count) AS corpus_size FROM articles
|
||||
WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10
|
||||
WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
|
||||
GROUP BY author_id
|
||||
HAVING sum(word_count) > 40000
|
||||
ORDER BY sum(word_count) DESC;
|
||||
|
||||
-- UNION/INTERSECT queries are unsupported
|
||||
-- UNION/INTERSECT queries are unsupported if on multiple shards
|
||||
SELECT * FROM articles WHERE author_id = 10 UNION
|
||||
SELECT * FROM articles WHERE author_id = 1;
|
||||
SELECT * FROM articles WHERE author_id = 2;
|
||||
|
||||
-- queries using CTEs are unsupported
|
||||
WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 )
|
||||
|
|
Loading…
Reference in New Issue