Merge pull request #647 from citusdata/501_use_relation_restrictions

Expand router planner coverage for single worker queries
pull/686/head
Murat Tuncer 2016-07-27 23:49:24 +03:00 committed by GitHub
commit 1ee3d091da
20 changed files with 1759 additions and 389 deletions

View File

@ -107,6 +107,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
instr_time planStart; instr_time planStart;
instr_time planDuration; instr_time planDuration;
Query *originalQuery = NULL; Query *originalQuery = NULL;
RelationRestrictionContext *restrictionContext = NULL;
/* if local query, run the standard explain and return */ /* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query); bool localQuery = !NeedsDistributedPlanning(query);
@ -137,22 +138,35 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
/* measure the full planning time to display in EXPLAIN ANALYZE */ /* measure the full planning time to display in EXPLAIN ANALYZE */
INSTR_TIME_SET_CURRENT(planStart); INSTR_TIME_SET_CURRENT(planStart);
/* call standard planner to modify the query structure before multi planning */ restrictionContext = CreateAndPushRestrictionContext();
initialPlan = standard_planner(query, 0, params);
commandType = initialPlan->commandType; PG_TRY();
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{ {
if (es->analyze) /* call standard planner to modify the query structure before multi planning */
{ initialPlan = standard_planner(query, 0, params);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
"distributed tables is not supported.")));
}
}
multiPlan = CreatePhysicalPlan(originalQuery, query); commandType = initialPlan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
if (es->analyze)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
"distributed tables is not supported.")));
}
}
multiPlan = CreatePhysicalPlan(originalQuery, query, restrictionContext);
}
PG_CATCH();
{
PopRestrictionContext();
PG_RE_THROW();
}
PG_END_TRY();
PopRestrictionContext();
INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart); INSTR_TIME_SUBTRACT(planDuration, planStart);

View File

@ -1410,8 +1410,41 @@ FindNodesOfType(MultiNode *node, int type)
/* /*
* NeedsDistributedPlanning checks if the passed in query is a Select query * IdentifyRTE assigns an identifier to an RTE, for tracking purposes.
* running on partitioned relations. If it is, we start distributed planning. *
* To be able to track RTEs through postgres' query planning, which copies and
* duplicate, and modifies them, we sometimes need to figure out whether two
* RTEs are copies of the same original RTE. For that we, hackishly, use a
* field normally unused in RTE_RELATION RTEs.
*
* The assigned identifier better be unique within a plantree.
*/
void
IdentifyRTE(RangeTblEntry *rte, int identifier)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(rte->values_lists == NIL);
rte->values_lists = list_make1_int(identifier);
}
/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
Assert(IsA(rte->values_lists, IntList));
Assert(list_length(rte->values_lists) == 1);
return linitial_int(rte->values_lists);
}
/*
* NeedsDistributedPlanning checks if the passed in query is a query running
* on a distributed table. If it is, we start distributed planning.
*
* For distributed relations it also assigns identifiers to the relevant RTEs.
*/ */
bool bool
NeedsDistributedPlanning(Query *queryTree) NeedsDistributedPlanning(Query *queryTree)
@ -1421,6 +1454,7 @@ NeedsDistributedPlanning(Query *queryTree)
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
bool hasLocalRelation = false; bool hasLocalRelation = false;
bool hasDistributedRelation = false; bool hasDistributedRelation = false;
int rteIdentifier = 1;
if (commandType != CMD_SELECT && commandType != CMD_INSERT && if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE) commandType != CMD_UPDATE && commandType != CMD_DELETE)
@ -1441,6 +1475,17 @@ NeedsDistributedPlanning(Query *queryTree)
if (IsDistributedTable(relationId)) if (IsDistributedTable(relationId))
{ {
hasDistributedRelation = true; hasDistributedRelation = true;
/*
* To be able to track individual RTEs through postgres' query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*/
if (rangeTableEntry->rtekind == RTE_RELATION)
{
IdentifyRTE(rangeTableEntry, rteIdentifier++);
}
} }
else else
{ {

View File

@ -31,6 +31,10 @@
#include "utils/memutils.h" #include "utils/memutils.h"
static List *relationRestrictionContextList = NIL;
/* local function forward declarations */ /* local function forward declarations */
static void CheckNodeIsDumpable(Node *node); static void CheckNodeIsDumpable(Node *node);
@ -46,6 +50,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
PlannedStmt *result = NULL; PlannedStmt *result = NULL;
bool needsDistributedPlanning = NeedsDistributedPlanning(parse); bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
Query *originalQuery = NULL; Query *originalQuery = NULL;
RelationRestrictionContext *restrictionContext = NULL;
/* /*
* standard_planner scribbles on it's input, but for deparsing we need the * standard_planner scribbles on it's input, but for deparsing we need the
@ -56,19 +61,36 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
originalQuery = copyObject(parse); originalQuery = copyObject(parse);
} }
/* /* create a restriction context and put it at the end if context list */
* First call into standard planner. This is required because the Citus restrictionContext = CreateAndPushRestrictionContext();
* planner relies on parse tree transformations made by postgres' planner.
*/
result = standard_planner(parse, cursorOptions, boundParams);
if (needsDistributedPlanning) PG_TRY();
{ {
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse); /*
* First call into standard planner. This is required because the Citus
* planner relies on parse tree transformations made by postgres' planner.
*/
/* store required data into the planned statement */ result = standard_planner(parse, cursorOptions, boundParams);
result = MultiQueryContainerNode(result, physicalPlan);
if (needsDistributedPlanning)
{
MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse,
restrictionContext);
/* store required data into the planned statement */
result = MultiQueryContainerNode(result, physicalPlan);
}
} }
PG_CATCH();
{
PopRestrictionContext();
PG_RE_THROW();
}
PG_END_TRY();
/* remove the context from the context list */
PopRestrictionContext();
return result; return result;
} }
@ -82,10 +104,11 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* physical plan process needed to produce distributed query plans. * physical plan process needed to produce distributed query plans.
*/ */
MultiPlan * MultiPlan *
CreatePhysicalPlan(Query *originalQuery, Query *query) CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{ {
MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query, MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query,
TaskExecutorType); TaskExecutorType, restrictionContext);
if (physicalPlan == NULL) if (physicalPlan == NULL)
{ {
/* Create and optimize logical plan */ /* Create and optimize logical plan */
@ -296,3 +319,90 @@ CheckNodeIsDumpable(Node *node)
pfree(out); pfree(out);
#endif #endif
} }
/*
* multi_relation_restriction_hook is a hook called by postgresql standard planner
* to notify us about various planning information regarding a relation. We use
* it to retrieve restrictions on relations.
*/
void
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index,
RangeTblEntry *rte)
{
RelationRestrictionContext *restrictionContext = NULL;
RelationRestriction *relationRestriction = NULL;
bool distributedTable = false;
bool localTable = false;
if (rte->rtekind != RTE_RELATION)
{
return;
}
distributedTable = IsDistributedTable(rte->relid);
localTable = !distributedTable;
restrictionContext = CurrentRestrictionContext();
Assert(restrictionContext != NULL);
relationRestriction = palloc0(sizeof(RelationRestriction));
relationRestriction->index = index;
relationRestriction->relationId = rte->relid;
relationRestriction->rte = rte;
relationRestriction->relOptInfo = relOptInfo;
relationRestriction->distributedRelation = distributedTable;
relationRestriction->plannerInfo = root;
relationRestriction->prunedShardIntervalList = NIL;
restrictionContext->hasDistributedRelation |= distributedTable;
restrictionContext->hasLocalRelation |= localTable;
restrictionContext->relationRestrictionList =
lappend(restrictionContext->relationRestrictionList, relationRestriction);
}
/*
* CreateAndPushRestrictionContext creates a new restriction context, inserts it to the
* beginning of the context list, and returns the newly created context.
*/
RelationRestrictionContext *
CreateAndPushRestrictionContext(void)
{
RelationRestrictionContext *restrictionContext =
palloc0(sizeof(RelationRestrictionContext));
relationRestrictionContextList = lcons(restrictionContext,
relationRestrictionContextList);
return restrictionContext;
}
/*
* CurrentRestrictionContext returns the the last restriction context from the
* list.
*/
RelationRestrictionContext *
CurrentRestrictionContext(void)
{
RelationRestrictionContext *restrictionContext = NULL;
Assert(relationRestrictionContextList != NIL);
restrictionContext =
(RelationRestrictionContext *) linitial(relationRestrictionContextList);
return restrictionContext;
}
/*
* PopRestrictionContext removes the most recently added restriction context from
* context list. The function assumes the list is not empty.
*/
void
PopRestrictionContext(void)
{
relationRestrictionContextList = list_delete_first(relationRestrictionContextList);
}

View File

@ -18,7 +18,9 @@
#include "access/stratnum.h" #include "access/stratnum.h"
#include "access/xact.h" #include "access/xact.h"
#include "distributed/citus_clauses.h" #include "distributed/citus_clauses.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
@ -40,6 +42,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/restrictinfo.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "utils/elog.h" #include "utils/elog.h"
@ -65,17 +68,25 @@ static bool MasterIrreducibleExpression(Node *expression, bool *varArgument,
static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state);
static char MostPermissiveVolatileFlag(char left, char right); static char MostPermissiveVolatileFlag(char left, char right);
static Task * RouterModifyTask(Query *originalQuery, Query *query); static Task * RouterModifyTask(Query *originalQuery, Query *query);
static ShardInterval * TargetShardInterval(Query *query); static ShardInterval * TargetShardIntervalForModify(Query *query);
static List * QueryRestrictList(Query *query); static List * QueryRestrictList(Query *query);
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
static ShardInterval * FastShardPruning(Oid distributedTableId, static ShardInterval * FastShardPruning(Oid distributedTableId,
Const *partionColumnValue); Const *partionColumnValue);
static Oid ExtractFirstDistributedTableId(Query *query); static Oid ExtractFirstDistributedTableId(Query *query);
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *originalQuery, Query *query); static Task * RouterSelectTask(Query *originalQuery, Query *query,
static Job * RouterQueryJob(Query *query, Task *task); RelationRestrictionContext *restrictionContext,
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType); List **placementList);
static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); static List * TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
static bool UpdateRelationNames(Node *node,
RelationRestrictionContext *restrictionContext);
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
/* /*
@ -87,21 +98,23 @@ static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column);
*/ */
MultiPlan * MultiPlan *
MultiRouterPlanCreate(Query *originalQuery, Query *query, MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType) MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext)
{ {
Task *task = NULL; Task *task = NULL;
Job *job = NULL; Job *job = NULL;
MultiPlan *multiPlan = NULL; MultiPlan *multiPlan = NULL;
CmdType commandType = query->commandType; CmdType commandType = query->commandType;
bool modifyTask = false; bool modifyTask = false;
List *placementList = NIL;
bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType); bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType,
restrictionContext);
if (!routerPlannable) if (!routerPlannable)
{ {
return NULL; return NULL;
} }
ereport(DEBUG2, (errmsg("Creating router plan")));
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
@ -118,11 +131,17 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query,
{ {
Assert(commandType == CMD_SELECT); Assert(commandType == CMD_SELECT);
task = RouterSelectTask(originalQuery, query); task = RouterSelectTask(originalQuery, query, restrictionContext, &placementList);
} }
if (task == NULL)
{
return NULL;
}
job = RouterQueryJob(originalQuery, task); ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan = CitusMakeNode(MultiPlan); multiPlan = CitusMakeNode(MultiPlan);
multiPlan->workerJob = job; multiPlan->workerJob = job;
@ -669,7 +688,7 @@ MostPermissiveVolatileFlag(char left, char right)
static Task * static Task *
RouterModifyTask(Query *originalQuery, Query *query) RouterModifyTask(Query *originalQuery, Query *query)
{ {
ShardInterval *shardInterval = TargetShardInterval(query); ShardInterval *shardInterval = TargetShardIntervalForModify(query);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL; Task *modifyTask = NULL;
@ -713,25 +732,23 @@ RouterModifyTask(Query *originalQuery, Query *query)
/* /*
* TargetShardInterval determines the single shard targeted by a provided command. * TargetShardIntervalForModify determines the single shard targeted by a provided
* If no matching shards exist, or if the modification targets more than one one * modify command. If no matching shards exist, or if the modification targets more
* shard, this function raises an error depending on the command type. * than one shard, this function raises an error depending on the command type.
*/ */
static ShardInterval * static ShardInterval *
TargetShardInterval(Query *query) TargetShardIntervalForModify(Query *query)
{ {
CmdType commandType = query->commandType;
bool selectTask = (commandType == CMD_SELECT);
List *prunedShardList = NIL; List *prunedShardList = NIL;
int prunedShardCount = 0; int prunedShardCount = 0;
int shardCount = 0; int shardCount = 0;
Oid distributedTableId = ExtractFirstDistributedTableId(query); Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod; char partitionMethod = cacheEntry->partitionMethod;
bool fastShardPruningPossible = false; bool fastShardPruningPossible = false;
Assert(query->commandType != CMD_SELECT);
/* error out if no shards exist for the table */ /* error out if no shards exist for the table */
shardCount = cacheEntry->shardIntervalArrayLength; shardCount = cacheEntry->shardIntervalArrayLength;
if (shardCount == 0) if (shardCount == 0)
@ -774,18 +791,9 @@ TargetShardInterval(Query *query)
prunedShardCount = list_length(prunedShardList); prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1) if (prunedShardCount != 1)
{ {
if (selectTask) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
{ errmsg("distributed modifications must target exactly one "
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), "shard")));
errmsg("router executor queries must target exactly one "
"shard")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributed modifications must target exactly one "
"shard")));
}
} }
return (ShardInterval *) linitial(prunedShardList); return (ShardInterval *) linitial(prunedShardList);
@ -963,22 +971,88 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
/* RouterSelectTask builds a Task to represent a single shard select query */ /* RouterSelectTask builds a Task to represent a single shard select query */
static Task * static Task *
RouterSelectTask(Query *originalQuery, Query *query) RouterSelectTask(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext,
List **placementList)
{ {
Task *task = NULL; Task *task = NULL;
ShardInterval *shardInterval = TargetShardInterval(query); List *prunedRelationShardList = TargetShardIntervalsForSelect(query,
restrictionContext);
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
bool upsertQuery = false; bool upsertQuery = false;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
ListCell *prunedRelationShardListCell = NULL;
List *workerList = NIL;
bool shardsPresent = false;
*placementList = NIL;
if (prunedRelationShardList == NULL)
{
return NULL;
}
Assert(shardInterval != NULL);
Assert(commandType == CMD_SELECT); Assert(commandType == CMD_SELECT);
shardId = shardInterval->shardId; foreach(prunedRelationShardListCell, prunedRelationShardList)
{
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
ShardInterval *shardInterval = NULL;
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString); /* no shard is present or all shards are pruned out case will be handled later */
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); if (prunedShardList == NIL)
{
continue;
}
shardsPresent = true;
/* all relations are now pruned down to 0 or 1 shards */
Assert(list_length(prunedShardList) <= 1);
/* anchor shard id */
if (shardId == INVALID_SHARD_ID)
{
shardInterval = (ShardInterval *) linitial(prunedShardList);
shardId = shardInterval->shardId;
}
}
/*
* Determine the worker that has all shard placements if a shard placement found.
* If no shard placement exists, we will still run the query but the result will
* be empty. We create a dummy shard placement for the first active worker.
*/
if (shardsPresent)
{
workerList = WorkersContainingAllShards(prunedRelationShardList);
}
else
{
List *workerNodeList = WorkerNodeList();
if (workerNodeList != NIL)
{
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);
ShardPlacement *dummyPlacement =
(ShardPlacement *) CitusMakeNode(ShardPlacement);
dummyPlacement->nodeName = workerNode->workerName;
dummyPlacement->nodePort = workerNode->workerPort;
workerList = lappend(workerList, dummyPlacement);
}
}
if (workerList == NIL)
{
ereport(DEBUG2, (errmsg("Found no worker with all shard placements")));
return NULL;
}
UpdateRelationNames((Node *) originalQuery, restrictionContext);
pg_get_query_def(originalQuery, queryString);
task = CitusMakeNode(Task); task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID; task->jobId = INVALID_JOB_ID;
@ -990,16 +1064,338 @@ RouterSelectTask(Query *originalQuery, Query *query)
task->upsertQuery = upsertQuery; task->upsertQuery = upsertQuery;
task->requiresMasterEvaluation = false; task->requiresMasterEvaluation = false;
*placementList = workerList;
return task; return task;
} }
/*
* TargetShardIntervalsForSelect performs shard pruning for all referenced relations
* in the query and returns list of shards per relation. Shard pruning is done based
* on provided restriction context per relation. The function bails out and returns NULL
* if any of the relations pruned down to more than one active shard. It also records
* pruned shard intervals in relation restriction context to be used later on.
*/
static List *
TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext)
{
List *prunedRelationShardList = NIL;
ListCell *restrictionCell = NULL;
Assert(query->commandType == CMD_SELECT);
Assert(restrictionContext != NULL);
foreach(restrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
Index tableId = relationRestriction->index;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
int shardCount = cacheEntry->shardIntervalArrayLength;
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = extract_actual_clauses(baseRestrictionList, false);
List *shardIntervalList = NIL;
List *prunedShardList = NIL;
int shardIndex = 0;
relationRestriction->prunedShardIntervalList = NIL;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval =
cacheEntry->sortedShardIntervalArray[shardIndex];
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
if (shardCount > 0)
{
prunedShardList = PruneShardList(relationId, tableId,
restrictClauseList,
shardIntervalList);
/*
* Quick bail out. The query can not be router plannable if one
* relation has more than one shard left after pruning. Having no
* shard left is okay at this point. It will be handled at a later
* stage.
*/
if (list_length(prunedShardList) > 1)
{
return NULL;
}
}
relationRestriction->prunedShardIntervalList = prunedShardList;
prunedRelationShardList = lappend(prunedRelationShardList, prunedShardList);
}
return prunedRelationShardList;
}
/*
* WorkersContainingAllShards returns list of shard placements that contain all
* shard intervals provided to the function. It returns NIL if no placement exists.
* The caller should check if there are any shard intervals exist for placement
* check prior to calling this function.
*/
static List *
WorkersContainingAllShards(List *prunedShardIntervalsList)
{
ListCell *prunedShardIntervalCell = NULL;
bool firstShard = true;
List *currentPlacementList = NIL;
foreach(prunedShardIntervalCell, prunedShardIntervalsList)
{
List *shardIntervalList = (List *) lfirst(prunedShardIntervalCell);
ShardInterval *shardInterval = NULL;
uint64 shardId = INVALID_SHARD_ID;
List *newPlacementList = NIL;
if (shardIntervalList == NIL)
{
continue;
}
Assert(list_length(shardIntervalList) == 1);
shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
/* retrieve all active shard placements for this shard */
newPlacementList = FinalizedShardPlacementList(shardId);
if (firstShard)
{
firstShard = false;
currentPlacementList = newPlacementList;
}
else
{
/* keep placements that still exists for this shard */
currentPlacementList = IntersectPlacementList(currentPlacementList,
newPlacementList);
}
/*
* Bail out if placement list becomes empty. This means there is no worker
* containing all shards referecend by the query, hence we can not forward
* this query directly to any worker.
*/
if (currentPlacementList == NIL)
{
break;
}
}
return currentPlacementList;
}
/*
* IntersectPlacementList performs placement pruning based on matching on
* nodeName:nodePort fields of shard placement data. We start pruning from all
* placements of the first relation's shard. Then for each relation's shard, we
* compute intersection of the new shards placement with existing placement list.
* This operation could have been done using other methods, but since we do not
* expect very high replication factor, iterating over a list and making string
* comparisons should be sufficient.
*/
static List *
IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
{
ListCell *lhsPlacementCell = NULL;
List *placementList = NIL;
/* Keep existing placement in the list if it is also present in new placement list */
foreach(lhsPlacementCell, lhsPlacementList)
{
ShardPlacement *lhsPlacement = (ShardPlacement *) lfirst(lhsPlacementCell);
ListCell *rhsPlacementCell = NULL;
foreach(rhsPlacementCell, rhsPlacementList)
{
ShardPlacement *rhsPlacement = (ShardPlacement *) lfirst(rhsPlacementCell);
if (rhsPlacement->nodePort == lhsPlacement->nodePort &&
strncmp(rhsPlacement->nodeName, lhsPlacement->nodeName,
WORKER_LENGTH) == 0)
{
placementList = lappend(placementList, rhsPlacement);
}
}
}
return placementList;
}
/*
* ConvertRteToSubqueryWithEmptyResult converts given relation RTE into
* subquery RTE that returns no results.
*/
static void
ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
{
Relation relation = heap_open(rte->relid, NoLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
int columnCount = tupleDescriptor->natts;
int columnIndex = 0;
Query *subquery = NULL;
List *targetList = NIL;
FromExpr *joinTree = NULL;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex];
TargetEntry *targetEntry = NULL;
StringInfo resname = NULL;
Const *constValue = NULL;
if (attributeForm->attisdropped)
{
continue;
}
resname = makeStringInfo();
constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod,
attributeForm->attcollation);
appendStringInfo(resname, "%s", attributeForm->attname.data);
targetEntry = makeNode(TargetEntry);
targetEntry->expr = (Expr *) constValue;
targetEntry->resno = columnIndex;
targetEntry->resname = resname->data;
targetList = lappend(targetList, targetEntry);
}
heap_close(relation, NoLock);
joinTree = makeNode(FromExpr);
joinTree->quals = makeBoolConst(false, false);
subquery = makeNode(Query);
subquery->commandType = CMD_SELECT;
subquery->querySource = QSRC_ORIGINAL;
subquery->canSetTag = true;
subquery->targetList = targetList;
subquery->jointree = joinTree;
rte->rtekind = RTE_SUBQUERY;
rte->subquery = subquery;
rte->alias = copyObject(rte->eref);
}
/*
* UpdateRelationNames walks over the query tree and appends shard ids to
* relations. It uses unique identity value to establish connection between a
* shard and the range table entry. If the range table id is not given a
* identity, than the relation is not referenced from the query, no connection
* could be found between a shard and this relation. Therefore relation is replaced
* by set of NULL values so that the query would work at worker without any problems.
*
*/
static bool
UpdateRelationNames(Node *node, RelationRestrictionContext *restrictionContext)
{
RangeTblEntry *newRte = NULL;
uint64 shardId = INVALID_SHARD_ID;
Oid relationId = InvalidOid;
Oid schemaId = InvalidOid;
char *relationName = NULL;
char *schemaName = NULL;
ListCell *relationRestrictionCell = NULL;
RelationRestriction *relationRestriction = NULL;
List *shardIntervalList = NIL;
ShardInterval *shardInterval = NULL;
bool replaceRteWithNullValues = false;
if (node == NULL)
{
return false;
}
/* want to look at all RTEs, even in subqueries, CTEs and such */
if (IsA(node, Query))
{
return query_tree_walker((Query *) node, UpdateRelationNames, restrictionContext,
QTW_EXAMINE_RTES);
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, UpdateRelationNames, restrictionContext);
}
newRte = (RangeTblEntry *) node;
if (newRte->rtekind != RTE_RELATION)
{
return false;
}
/*
* Search for the restrictions associated with the RTE. There better be
* some, otherwise this query wouldn't be elegible as a router query.
*
* FIXME: We should probably use a hashtable here, to do efficient
* lookup.
*/
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
if (GetRTEIdentity(relationRestriction->rte) == GetRTEIdentity(newRte))
{
break;
}
relationRestriction = NULL;
}
replaceRteWithNullValues = (relationRestriction == NULL) ||
relationRestriction->prunedShardIntervalList == NIL;
if (replaceRteWithNullValues)
{
ConvertRteToSubqueryWithEmptyResult(newRte);
return false;
}
Assert(relationRestriction != NULL);
shardIntervalList = relationRestriction->prunedShardIntervalList;
Assert(list_length(shardIntervalList) == 1);
shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
relationId = shardInterval->relationId;
relationName = get_rel_name(relationId);
AppendShardIdToName(&relationName, shardId);
schemaId = get_rel_namespace(relationId);
schemaName = get_namespace_name(schemaId);
ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL);
return false;
}
/* /*
* RouterQueryJob creates a Job for the specified query to execute the * RouterQueryJob creates a Job for the specified query to execute the
* provided single shard select task. * provided single shard select task.
*/ */
static Job * static Job *
RouterQueryJob(Query *query, Task *task) RouterQueryJob(Query *query, Task *task, List *placementList)
{ {
Job *job = NULL; Job *job = NULL;
List *taskList = NIL; List *taskList = NIL;
@ -1007,7 +1403,8 @@ RouterQueryJob(Query *query, Task *task)
/* /*
* We send modify task to the first replica, otherwise we choose the target shard * We send modify task to the first replica, otherwise we choose the target shard
* according to task assignment policy. * according to task assignment policy. Placement list for select queries are
* provided as function parameter.
*/ */
if (taskType == MODIFY_TASK) if (taskType == MODIFY_TASK)
{ {
@ -1015,7 +1412,10 @@ RouterQueryJob(Query *query, Task *task)
} }
else else
{ {
taskList = AssignAnchorShardTaskList(list_make1(task)); Assert(placementList != NIL);
task->taskPlacementList = placementList;
taskList = list_make1(task);
} }
job = CitusMakeNode(Job); job = CitusMakeNode(Job);
@ -1036,22 +1436,11 @@ RouterQueryJob(Query *query, Task *task)
* partition column. This feature is enabled if task executor is set to real-time * partition column. This feature is enabled if task executor is set to real-time
*/ */
bool bool
MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType) MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext)
{ {
uint32 rangeTableId = 1; CmdType commandType = query->commandType;
List *rangeTableList = NIL; ListCell *relationRestrictionContextCell = NULL;
RangeTblEntry *rangeTableEntry = NULL;
Oid distributedTableId = InvalidOid;
Var *partitionColumn = NULL;
char partitionMethod = '\0';
Node *quals = NULL;
CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType;
FromExpr *joinTree = query->jointree;
List *varClauseList = NIL;
ListCell *varClauseCell = NULL;
bool partitionColumnMatchExpression = false;
int partitionColumnReferenceCount = 0;
int shardCount = 0;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
@ -1059,6 +1448,7 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
return true; return true;
} }
/* FIXME: I tend to think it's time to remove this */
if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME) if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME)
{ {
return false; return false;
@ -1066,171 +1456,28 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType)
Assert(commandType == CMD_SELECT); Assert(commandType == CMD_SELECT);
/* if (query->hasForUpdate)
* Reject subqueries which are in SELECT or WHERE clause.
* Queries which are recursive, with CommonTableExpr, with locking (hasForUpdate),
* or with window functions are also rejected here.
* Queries which have subqueries, or tablesamples in FROM clauses are rejected later
* during RangeTblEntry checks.
*/
if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate ||
query->hasRecursive)
{ {
return false; return false;
} }
if (query->groupingSets) foreach(relationRestrictionContextCell, restrictionContext->relationRestrictionList)
{ {
return false; RelationRestriction *relationRestriction =
} (RelationRestriction *) lfirst(relationRestrictionContextCell);
RangeTblEntry *rte = relationRestriction->rte;
/* only hash partitioned tables are supported */ if (rte->rtekind == RTE_RELATION)
distributedTableId = ExtractFirstDistributedTableId(query);
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
partitionMethod = PartitionMethod(distributedTableId);
if (partitionMethod != DISTRIBUTE_BY_HASH)
{
return false;
}
/* extract range table entries */
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
/* query can have only one range table of type RTE_RELATION */
if (list_length(rangeTableList) != 1)
{
return false;
}
rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
if (rangeTableEntry->rtekind != RTE_RELATION)
{
return false;
}
if (rangeTableEntry->tablesample)
{
return false;
}
if (joinTree == NULL)
{
return false;
}
quals = joinTree->quals;
if (quals == NULL)
{
return false;
}
/* convert list of expressions into expression tree */
if (quals != NULL && IsA(quals, List))
{
quals = (Node *) make_ands_explicit((List *) quals);
}
/*
* Partition column must be used in a simple equality match check and it must be
* place at top level conjustion operator.
*/
partitionColumnMatchExpression =
ColumnMatchExpressionAtTopLevelConjunction(quals, partitionColumn);
if (!partitionColumnMatchExpression)
{
return false;
}
/* make sure partition column is used only once in the query */
varClauseList = pull_var_clause_default(quals);
foreach(varClauseCell, varClauseList)
{
Var *column = (Var *) lfirst(varClauseCell);
if (equal(column, partitionColumn))
{ {
partitionColumnReferenceCount++; /* only hash partitioned tables are supported */
} Oid distributedTableId = rte->relid;
} char partitionMethod = PartitionMethod(distributedTableId);
if (partitionColumnReferenceCount != 1) if (partitionMethod != DISTRIBUTE_BY_HASH)
{
return false;
}
/*
* We need to make sure there is at least one shard for this hash partitioned
* query to be router plannable. We can not prepare a router plan if there
* are no shards.
*/
shardCount = ShardIntervalCount(distributedTableId);
if (shardCount == 0)
{
return false;
}
return true;
}
/*
* ColumnMatchExpressionAtTopLevelConjunction returns true if the query contains an exact
* match (equal) expression on the provided column. The function returns true only
* if the match expression has an AND relation with the rest of the expression tree.
*/
static bool
ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column)
{
if (node == NULL)
{
return false;
}
if (IsA(node, OpExpr))
{
OpExpr *opExpr = (OpExpr *) node;
bool simpleExpression = SimpleOpExpression((Expr *) opExpr);
bool columnInExpr = false;
bool usingEqualityOperator = false;
if (!simpleExpression)
{
return false;
}
columnInExpr = OpExpressionContainsColumn(opExpr, column);
if (!columnInExpr)
{
return false;
}
usingEqualityOperator = OperatorImplementsEquality(opExpr->opno);
return usingEqualityOperator;
}
else if (IsA(node, BoolExpr))
{
BoolExpr *boolExpr = (BoolExpr *) node;
List *argumentList = boolExpr->args;
ListCell *argumentCell = NULL;
if (boolExpr->boolop != AND_EXPR)
{
return false;
}
foreach(argumentCell, argumentList)
{
Node *argumentNode = (Node *) lfirst(argumentCell);
bool columnMatch =
ColumnMatchExpressionAtTopLevelConjunction(argumentNode, column);
if (columnMatch)
{ {
return true; return false;
} }
} }
} }
return false; return true;
} }

View File

@ -35,6 +35,7 @@
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "optimizer/planner.h" #include "optimizer/planner.h"
#include "optimizer/paths.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/guc_tables.h" #include "utils/guc_tables.h"
@ -142,6 +143,9 @@ _PG_init(void)
/* register utility hook */ /* register utility hook */
ProcessUtility_hook = multi_ProcessUtility; ProcessUtility_hook = multi_ProcessUtility;
/* register for planner hook */
set_rel_pathlist_hook = multi_relation_restriction_hook;
/* organize that task tracker is started once server is up */ /* organize that task tracker is started once server is up */
TaskTrackerRegister(); TaskTrackerRegister();

View File

@ -181,6 +181,8 @@ extern bool SubqueryPushdown;
/* Function declarations for building logical plans */ /* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree); extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree);
extern bool NeedsDistributedPlanning(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree);
extern int GetRTEIdentity(RangeTblEntry *rte);
extern void IdentifyRTE(RangeTblEntry *rte, int identifier);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode);

View File

@ -13,14 +13,41 @@
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
#include "nodes/relation.h" #include "nodes/relation.h"
typedef struct RelationRestrictionContext
{
bool hasDistributedRelation;
bool hasLocalRelation;
List *relationRestrictionList;
} RelationRestrictionContext;
typedef struct RelationRestriction
{
Index index;
Oid relationId;
bool distributedRelation;
RangeTblEntry *rte;
RelOptInfo *relOptInfo;
PlannerInfo *plannerInfo;
List *prunedShardIntervalList;
} RelationRestriction;
extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
ParamListInfo boundParams); ParamListInfo boundParams);
extern bool HasCitusToplevelNode(PlannedStmt *planStatement); extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan; struct MultiPlan;
extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query); extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *
restrictionContext);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan); struct MultiPlan *multiPlan);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte);
extern RelationRestrictionContext * CreateAndPushRestrictionContext(void);
extern RelationRestrictionContext * CurrentRestrictionContext(void);
extern void PopRestrictionContext(void);
#endif /* MULTI_PLANNER_H */ #endif /* MULTI_PLANNER_H */

View File

@ -16,6 +16,7 @@
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -29,7 +30,8 @@
extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query,
MultiExecutorType taskExecutorType); MultiExecutorType taskExecutorType,
RelationRestrictionContext *restrictionContext);
extern void ErrorIfModifyQueryNotSupported(Query *queryTree); extern void ErrorIfModifyQueryNotSupported(Query *queryTree);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -48,10 +48,10 @@ SELECT count(*) FROM orders_hash_partitioned;
(1 row) (1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -59,10 +59,10 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -70,10 +70,10 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -81,10 +81,10 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -93,10 +93,10 @@ DEBUG: Plan is router executable
SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa'; WHERE o_orderkey = 1 AND o_clerk = 'aaa';
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -104,10 +104,10 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
count count
------- -------
@ -198,6 +198,8 @@ SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2; WHERE o_orderkey = 1 OR o_orderkey = 2;
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
count count
------- -------
@ -214,6 +216,8 @@ SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa'); WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
count count
------- -------
@ -232,6 +236,8 @@ SELECT count(*) FROM
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count count
------- -------
0 0
@ -242,6 +248,8 @@ DEBUG: predicate pruning for shardId 630003
SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}'); WHERE o_orderkey = ANY ('{1,2,3}');
NOTICE: cannot use shard pruning with ANY/ALL (array expression) NOTICE: cannot use shard pruning with ANY/ALL (array expression)
HINT: Consider rewriting the expression with OR/AND clauses.
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
HINT: Consider rewriting the expression with OR/AND clauses. HINT: Consider rewriting the expression with OR/AND clauses.
count count
------- -------
@ -285,6 +293,8 @@ SELECT count(*) FROM orders_hash_partitioned
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count count
------- -------
0 0
@ -319,9 +329,11 @@ SELECT count(*)
DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003 DEBUG: predicate pruning for shardId 630003
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] DEBUG: predicate pruning for shardId 630001
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] DEBUG: predicate pruning for shardId 630002
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count count
------- -------
0 0

View File

@ -0,0 +1,329 @@
--
-- MULTI_HASH_PRUNING
--
-- Tests for shard and join pruning logic on hash partitioned tables.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 630000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 630000;
-- Create a table partitioned on integer column and update partition type to
-- hash. Then stage data to this table and update shard min max values with
-- hashed ones. Hash value of 1, 2, 3 and 4 are consecutively -1905060026,
-- 1134484726, -28094569 and -1011077333.
CREATE TABLE orders_hash_partitioned (
o_orderkey integer,
o_custkey integer,
o_orderstatus char(1),
o_totalprice decimal(15,2),
o_orderdate date,
o_orderpriority char(15),
o_clerk char(15),
o_shippriority integer,
o_comment varchar(79) );
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
SET client_min_messages TO DEBUG2;
-- Check that we can prune shards for simple cases, boolean expressions and
-- immutable functions.
-- Since router plans are not triggered for task-tracker executor type,
-- we need to run the tests that triggers router planning seperately for
-- both executors. Otherwise, check-full fails on the task-tracker.
-- Later, we need to switch back to the actual task executor
-- to contuinue with correct executor type for check-full.
SELECT quote_literal(current_setting('citus.task_executor_type')) AS actual_task_executor
\gset
SET citus.task_executor_type TO 'real-time';
SELECT count(*) FROM orders_hash_partitioned;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: Creating router plan
DEBUG: Plan is router executable
count
-------
0
(1 row)
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM orders_hash_partitioned;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
DEBUG: predicate pruning for shardId 630000
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SET citus.task_executor_type TO :actual_task_executor;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey is NULL;
count
-------
0
(1 row)
SELECT count(*) FROM
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
-- Check that we don't support pruning for ANY (array expression) and give
-- a notice message when used with the partition column
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}');
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
HINT: Consider rewriting the expression with OR/AND clauses.
count
-------
0
(1 row)
-- Check that we don't show the message if the operator is not
-- equality operator
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey < ALL ('{1,2,3}');
count
-------
0
(1 row)
-- Check that we don't give a spurious hint message when non-partition
-- columns are used with ANY/IN/ALL
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
count
-------
0
(1 row)
-- Check that we cannot prune for mutable functions.
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() OR o_orderkey = 1;
count
-------
0
(1 row)
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() AND o_orderkey = 1;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
count
-------
0
(1 row)
-- Check that we can do join pruning.
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey;
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
count
-------
0
(1 row)
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey
AND orders1.o_orderkey = 1
AND orders2.o_orderkey is NULL;
DEBUG: predicate pruning for shardId 630001
DEBUG: predicate pruning for shardId 630002
DEBUG: predicate pruning for shardId 630003
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
count
-------
0
(1 row)

View File

@ -90,7 +90,7 @@ SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
(1 row) (1 row)
SELECT master_create_worker_shards('customer_hash', 1, 1); SELECT master_create_worker_shards('customer_hash', 2, 1);
master_create_worker_shards master_create_worker_shards
----------------------------- -----------------------------

File diff suppressed because it is too large Load Diff

View File

@ -25,13 +25,6 @@ SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'ha
(1 row) (1 row)
-- test when a table is distributed but no shards created yet
SELECT count(*) from articles;
count
-------
(1 row)
SELECT master_create_worker_shards('articles', 2, 1); SELECT master_create_worker_shards('articles', 2, 1);
master_create_worker_shards master_create_worker_shards
----------------------------- -----------------------------
@ -97,13 +90,6 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
-- insert a single row for the test -- insert a single row for the test
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
-- first, test zero-shard SELECT, which should return an empty row
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
count
-------
(1 row)
-- zero-shard modifications should fail -- zero-shard modifications should fail
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
ERROR: distributed modifications must target exactly one shard ERROR: distributed modifications must target exactly one shard
@ -170,18 +156,20 @@ SELECT title, author_id FROM articles
alkylic | 8 alkylic | 8
(10 rows) (10 rows)
-- add in some grouping expressions, still on same shard -- add in some grouping expressions.
-- it is supported if it is on the same shard, but not supported if it
-- involves multiple shards.
-- having queries unsupported in Citus -- having queries unsupported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles SELECT author_id, sum(word_count) AS corpus_size FROM articles
WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
GROUP BY author_id GROUP BY author_id
HAVING sum(word_count) > 40000 HAVING sum(word_count) > 40000
ORDER BY sum(word_count) DESC; ORDER BY sum(word_count) DESC;
ERROR: cannot perform distributed planning on this query ERROR: cannot perform distributed planning on this query
DETAIL: Having qual is currently unsupported DETAIL: Having qual is currently unsupported
-- UNION/INTERSECT queries are unsupported -- UNION/INTERSECT queries are unsupported if on multiple shards
SELECT * FROM articles WHERE author_id = 10 UNION SELECT * FROM articles WHERE author_id = 10 UNION
SELECT * FROM articles WHERE author_id = 1; SELECT * FROM articles WHERE author_id = 2;
ERROR: cannot perform distributed planning on this query ERROR: cannot perform distributed planning on this query
DETAIL: Union, Intersect, or Except are currently unsupported DETAIL: Union, Intersect, or Except are currently unsupported
-- queries using CTEs are unsupported -- queries using CTEs are unsupported
@ -324,8 +312,8 @@ SET citus.task_executor_type TO 'real-time';
SELECT * SELECT *
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
@ -341,6 +329,7 @@ SELECT *
FROM articles FROM articles
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+--------------+------------ ----+-----------+--------------+------------
@ -368,8 +357,8 @@ SELECT *
SELECT id as article_id, word_count * id as random_value SELECT id as article_id, word_count * id as random_value
FROM articles FROM articles
WHERE author_id = 1; WHERE author_id = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
article_id | random_value article_id | random_value
------------+-------------- ------------+--------------
@ -386,9 +375,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles b FROM articles a, articles b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
@ -403,8 +392,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles a, articles_single_shard b FROM articles a, articles_single_shard b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
DEBUG: push down of limit count: 3
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable
first_author | second_word_count first_author | second_word_count
--------------+------------------- --------------+-------------------
10 | 19519 10 | 19519
@ -417,8 +407,8 @@ SELECT *
FROM articles FROM articles
WHERE author_id = 1 WHERE author_id = 1
LIMIT 2; LIMIT 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | author_id | title | word_count id | author_id | title | word_count
----+-----------+----------+------------ ----+-----------+----------+------------
@ -433,8 +423,8 @@ SELECT id
FROM articles FROM articles
WHERE author_id = 1 WHERE author_id = 1
GROUP BY id; GROUP BY id;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id id
---- ----
@ -447,14 +437,15 @@ DEBUG: Plan is router executable
-- copying from a single shard table does not require the master query -- copying from a single shard table does not require the master query
COPY articles_single_shard TO stdout; COPY articles_single_shard TO stdout;
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
50 10 anjanette 19519 50 10 anjanette 19519
-- error out for queries with aggregates -- error out for queries with aggregates
SELECT avg(word_count) SELECT avg(word_count)
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850000 DEBUG: predicate pruning for shardId 850000
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
avg avg
-------------------- --------------------
@ -467,8 +458,8 @@ SELECT max(word_count) as max, min(word_count) as min,
sum(word_count) as sum, count(word_count) as cnt sum(word_count) as sum, count(word_count) as cnt
FROM articles FROM articles
WHERE author_id = 2; WHERE author_id = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 850000 DEBUG: predicate pruning for shardId 850000
DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
max | min | sum | cnt max | min | sum | cnt
-------+------+-------+----- -------+------+-------+-----
@ -480,6 +471,7 @@ SELECT *
FROM articles a, articles b FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
DEBUG: predicate pruning for shardId 850001 DEBUG: predicate pruning for shardId 850001
DEBUG: predicate pruning for shardId 850001
DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3 DEBUG: join prunable for task partitionId 0 and 3

View File

@ -60,7 +60,7 @@ SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range;
-- sharded table. -- sharded table.
SELECT count(distinct p_mfgr) FROM part; SELECT count(distinct p_mfgr) FROM part;
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr; SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr;
-- We don't support count(distinct) queries if table is append partitioned and -- We don't support count(distinct) queries if table is append partitioned and
-- has multiple shards -- has multiple shards

View File

@ -105,12 +105,13 @@ WHERE
-- This query is an INNER JOIN in disguise since there cannot be NULL results -- This query is an INNER JOIN in disguise since there cannot be NULL results
-- Added extra filter to make query not router plannable
SELECT SELECT
min(l_custkey), max(l_custkey) min(l_custkey), max(l_custkey)
FROM FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
WHERE WHERE
r_custkey = 5; r_custkey = 5 or r_custkey > 15;
-- Apply a filter before the join -- Apply a filter before the join
@ -204,12 +205,13 @@ WHERE
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21) -- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
-- Added extra filter to make query not router plannable
SELECT SELECT
min(l_custkey), max(l_custkey) min(l_custkey), max(l_custkey)
FROM FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
WHERE WHERE
r_custkey = 21; r_custkey = 21 or r_custkey < 10;
-- Apply a filter before the join -- Apply a filter before the join

View File

@ -85,14 +85,14 @@ SELECT count(distinct p_mfgr) FROM part;
5 5
(1 row) (1 row)
SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr; SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr;
p_mfgr | count p_mfgr | count
---------------------------+------- ---------------------------+-------
Manufacturer#1 | 193 Manufacturer#1 | 193
Manufacturer#3 | 228
Manufacturer#5 | 185
Manufacturer#2 | 190 Manufacturer#2 | 190
Manufacturer#3 | 228
Manufacturer#4 | 204 Manufacturer#4 | 204
Manufacturer#5 | 185
(5 rows) (5 rows)
-- We don't support count(distinct) queries if table is append partitioned and -- We don't support count(distinct) queries if table is append partitioned and

View File

@ -134,12 +134,13 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
(1 row) (1 row)
-- This query is an INNER JOIN in disguise since there cannot be NULL results -- This query is an INNER JOIN in disguise since there cannot be NULL results
-- Added extra filter to make query not router plannable
SELECT SELECT
min(l_custkey), max(l_custkey) min(l_custkey), max(l_custkey)
FROM FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
WHERE WHERE
r_custkey = 5; r_custkey = 5 or r_custkey > 15;
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_right" ]
min | max min | max
-----+----- -----+-----
@ -273,12 +274,13 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
(1 row) (1 row)
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21) -- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
-- Added extra filter to make query not router plannable
SELECT SELECT
min(l_custkey), max(l_custkey) min(l_custkey), max(l_custkey)
FROM FROM
multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey)
WHERE WHERE
r_custkey = 21; r_custkey = 21 or r_custkey < 10;
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
min | max min | max
-----+----- -----+-----

View File

@ -63,7 +63,7 @@ CREATE TABLE customer_hash (
c_mktsegment char(10) not null, c_mktsegment char(10) not null,
c_comment varchar(117) not null); c_comment varchar(117) not null);
SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash'); SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash');
SELECT master_create_worker_shards('customer_hash', 1, 1); SELECT master_create_worker_shards('customer_hash', 2, 1);
-- The following query checks that we can correctly handle self-joins -- The following query checks that we can correctly handle self-joins

View File

@ -94,9 +94,6 @@ SET client_min_messages TO 'DEBUG2';
-- insert a single row for the test -- insert a single row for the test
INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519);
-- first, test zero-shard SELECT, which should return an empty row
SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2;
-- single-shard tests -- single-shard tests
-- test simple select for a single row -- test simple select for a single row
@ -141,32 +138,130 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash
HAVING sum(word_count) > 1000 HAVING sum(word_count) > 1000
ORDER BY sum(word_count) DESC; ORDER BY sum(word_count) DESC;
-- UNION/INTERSECT queries are unsupported
-- this is rejected by router planner and handled by multi_logical_planner
SELECT * FROM articles_hash WHERE author_id = 10 UNION
SELECT * FROM articles_hash WHERE author_id = 1;
-- query is a single shard query but can't do shard pruning, -- query is a single shard query but can't do shard pruning,
-- not router-plannable due to <= and IN -- not router-plannable due to <= and IN
SELECT * FROM articles_hash WHERE author_id <= 1; SELECT * FROM articles_hash WHERE author_id <= 1;
SELECT * FROM articles_hash WHERE author_id IN (1, 3); SELECT * FROM articles_hash WHERE author_id IN (1, 3);
-- queries using CTEs are unsupported -- queries with CTEs are supported
WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1)
SELECT * FROM first_author;
-- queries with CTEs are supported even if CTE is not referenced inside query
WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1)
SELECT title FROM articles_hash WHERE author_id = 1; SELECT title FROM articles_hash WHERE author_id = 1;
-- queries which involve functions in FROM clause are unsupported. -- two CTE joins are supported if they go to the same worker
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
id_title AS (SELECT id, title from articles_hash WHERE author_id = 1)
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
id_title AS (SELECT id, title from articles_hash WHERE author_id = 3)
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
-- CTE joins are not supported if table shards are at different workers
WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1),
id_title AS (SELECT id, title from articles_hash WHERE author_id = 2)
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
-- recursive CTEs are supported when filtered on partition column
CREATE TABLE company_employees (company_id int, employee_id int, manager_id int);
SELECT master_create_distributed_table('company_employees', 'company_id', 'hash');
SELECT master_create_worker_shards('company_employees', 4, 1);
INSERT INTO company_employees values(1, 1, 0);
INSERT INTO company_employees values(1, 2, 1);
INSERT INTO company_employees values(1, 3, 1);
INSERT INTO company_employees values(1, 4, 2);
INSERT INTO company_employees values(1, 5, 4);
INSERT INTO company_employees values(3, 1, 0);
INSERT INTO company_employees values(3, 15, 1);
INSERT INTO company_employees values(3, 3, 1);
-- find employees at top 2 level within company hierarchy
WITH RECURSIVE hierarchy as (
SELECT *, 1 AS level
FROM company_employees
WHERE company_id = 1 and manager_id = 0
UNION
SELECT ce.*, (h.level+1)
FROM hierarchy h JOIN company_employees ce
ON (h.employee_id = ce.manager_id AND
h.company_id = ce.company_id AND
ce.company_id = 1))
SELECT * FROM hierarchy WHERE LEVEL <= 2;
-- query becomes not router plannble and gets rejected
-- if filter on company is dropped
WITH RECURSIVE hierarchy as (
SELECT *, 1 AS level
FROM company_employees
WHERE company_id = 1 and manager_id = 0
UNION
SELECT ce.*, (h.level+1)
FROM hierarchy h JOIN company_employees ce
ON (h.employee_id = ce.manager_id AND
h.company_id = ce.company_id))
SELECT * FROM hierarchy WHERE LEVEL <= 2;
-- logically wrong query, query involves different shards
-- from the same table, but still router plannable due to
-- shard being placed on the same worker.
WITH RECURSIVE hierarchy as (
SELECT *, 1 AS level
FROM company_employees
WHERE company_id = 3 and manager_id = 0
UNION
SELECT ce.*, (h.level+1)
FROM hierarchy h JOIN company_employees ce
ON (h.employee_id = ce.manager_id AND
h.company_id = ce.company_id AND
ce.company_id = 2))
SELECT * FROM hierarchy WHERE LEVEL <= 2;
-- grouping sets are supported on single shard
SELECT
id, substring(title, 2, 1) AS subtitle, count(*)
FROM articles_hash
WHERE author_id = 1 or author_id = 3
GROUP BY GROUPING SETS ((id),(subtitle));
-- grouping sets are not supported on multiple shards
SELECT
id, substring(title, 2, 1) AS subtitle, count(*)
FROM articles_hash
WHERE author_id = 1 or author_id = 2
GROUP BY GROUPING SETS ((id),(subtitle));
-- queries which involve functions in FROM clause are supported if it goes to a single worker.
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1;
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3;
-- they are not supported if multiple workers are involved
SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2;
-- subqueries are not supported in WHERE clause in Citus -- subqueries are not supported in WHERE clause in Citus
SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a'); SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a');
SELECT * FROM articles_hash WHERE author_id IN (SELECT author_id FROM articles_hash WHERE author_id = 1 or author_id = 3);
SELECT * FROM articles_hash WHERE author_id = (SELECT 1);
-- subqueries are supported in FROM clause but they are not router plannable -- subqueries are supported in FROM clause but they are not router plannable
SELECT articles_hash.id,test.word_count SELECT articles_hash.id,test.word_count
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id
ORDER BY articles_hash.id; ORDER BY articles_hash.id;
SELECT articles_hash.id,test.word_count
FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test
WHERE test.id = articles_hash.id and articles_hash.author_id = 1
ORDER BY articles_hash.id;
-- subqueries are not supported in SELECT clause -- subqueries are not supported in SELECT clause
SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1) SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1)
AS special_price FROM articles_hash a; AS special_price FROM articles_hash a;
@ -176,8 +271,7 @@ SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1; WHERE author_id = 1;
-- below query hits a single shard, but it is not router plannable -- below query hits a single shard, router plannable
-- still router executable
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 OR author_id = 17; WHERE author_id = 1 OR author_id = 17;
@ -194,18 +288,26 @@ SELECT id as article_id, word_count * id as random_value
WHERE author_id = 1; WHERE author_id = 1;
-- we can push down co-located joins to a single worker -- we can push down co-located joins to a single worker
-- this is not router plannable but router executable
-- handled by real-time executor
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
-- following join is neither router plannable, nor router executable -- following join is router plannable since the same worker
-- has both shards
SELECT a.author_id as first_author, b.word_count as second_word_count SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, articles_single_shard_hash b FROM articles_hash a, articles_single_shard_hash b
WHERE a.author_id = 10 and a.author_id = b.author_id WHERE a.author_id = 10 and a.author_id = b.author_id
LIMIT 3; LIMIT 3;
-- following join is not router plannable since there are no
-- workers containing both shards, added a CTE to make this fail
-- at logical planner
WITH single_shard as (SELECT * FROM articles_single_shard_hash)
SELECT a.author_id as first_author, b.word_count as second_word_count
FROM articles_hash a, single_shard b
WHERE a.author_id = 2 and a.author_id = b.author_id
LIMIT 3;
-- single shard select with limit is router plannable -- single shard select with limit is router plannable
SELECT * SELECT *
@ -257,7 +359,45 @@ SELECT max(word_count)
WHERE author_id = 1 WHERE author_id = 1
GROUP BY author_id; GROUP BY author_id;
-- router plannable union queries are supported
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 3);
SELECT * FROM (
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 3)) uu;
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1)
UNION
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3);
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1)
INTERSECT
(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3);
(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 1)
EXCEPT
(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 3);
-- union queries are not supported if not router plannable
-- there is an inconsistency on shard pruning between
-- ubuntu/mac disabling log messages for this queries only
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 2);
SELECT * FROM (
(SELECT * FROM articles_hash WHERE author_id = 1)
UNION
(SELECT * FROM articles_hash WHERE author_id = 2)) uu;
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b
@ -275,7 +415,7 @@ SET citus.task_executor_type TO 'real-time';
SET client_min_messages to 'DEBUG2'; SET client_min_messages to 'DEBUG2';
-- this is definitely single shard -- this is definitely single shard
-- but not router plannable -- and router plannable
SELECT * SELECT *
FROM articles_hash FROM articles_hash
WHERE author_id = 1 and author_id >= 1; WHERE author_id = 1 and author_id >= 1;
@ -387,11 +527,9 @@ SELECT id, MIN(id) over (order by word_count)
FROM articles_hash FROM articles_hash
WHERE author_id = 1 or author_id = 2; WHERE author_id = 1 or author_id = 2;
-- but they are not supported for not router plannable queries
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash FROM articles_hash
WHERE author_id = 5 or author_id = 1; WHERE author_id = 5 or author_id = 2;
-- complex query hitting a single shard -- complex query hitting a single shard
SELECT SELECT
@ -510,6 +648,19 @@ $$ LANGUAGE plpgsql;
SELECT * FROM author_articles_id_word_count(); SELECT * FROM author_articles_id_word_count();
-- materialized views can be created for router plannable queries
CREATE MATERIALIZED VIEW mv_articles_hash AS
SELECT * FROM articles_hash WHERE author_id = 1;
SELECT * FROM mv_articles_hash;
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
SELECT * FROM articles_hash WHERE author_id in (1,2);
-- materialized views with (NO DATA) is still not supported
CREATE MATERIALIZED VIEW mv_articles_hash AS
SELECT * FROM articles_hash WHERE author_id = 1 WITH NO DATA;
-- router planner/executor is disabled for task-tracker executor -- router planner/executor is disabled for task-tracker executor
-- following query is router plannable, but router planner is disabled -- following query is router plannable, but router planner is disabled
SET citus.task_executor_type to 'task-tracker'; SET citus.task_executor_type to 'task-tracker';
@ -530,6 +681,8 @@ SET client_min_messages to 'NOTICE';
DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count(); DROP FUNCTION author_articles_id_word_count();
DROP MATERIALIZED VIEW mv_articles_hash;
DROP TABLE articles_hash; DROP TABLE articles_hash;
DROP TABLE articles_single_shard_hash; DROP TABLE articles_single_shard_hash;
DROP TABLE authors_hash; DROP TABLE authors_hash;
DROP TABLE company_employees;

View File

@ -23,10 +23,6 @@ CREATE TABLE articles_single_shard (LIKE articles);
SELECT master_create_distributed_table('articles', 'author_id', 'hash'); SELECT master_create_distributed_table('articles', 'author_id', 'hash');
SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash'); SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash');
-- test when a table is distributed but no shards created yet
SELECT count(*) from articles;
SELECT master_create_worker_shards('articles', 2, 1); SELECT master_create_worker_shards('articles', 2, 1);
SELECT master_create_worker_shards('articles_single_shard', 1, 1); SELECT master_create_worker_shards('articles_single_shard', 1, 1);
@ -85,9 +81,6 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
-- insert a single row for the test -- insert a single row for the test
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
-- first, test zero-shard SELECT, which should return an empty row
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
-- zero-shard modifications should fail -- zero-shard modifications should fail
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
DELETE FROM articles WHERE author_id = 1 AND author_id = 2; DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
@ -116,17 +109,19 @@ SELECT title, author_id FROM articles
WHERE author_id = 7 OR author_id = 8 WHERE author_id = 7 OR author_id = 8
ORDER BY author_id ASC, id; ORDER BY author_id ASC, id;
-- add in some grouping expressions, still on same shard -- add in some grouping expressions.
-- it is supported if it is on the same shard, but not supported if it
-- involves multiple shards.
-- having queries unsupported in Citus -- having queries unsupported in Citus
SELECT author_id, sum(word_count) AS corpus_size FROM articles SELECT author_id, sum(word_count) AS corpus_size FROM articles
WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10
GROUP BY author_id GROUP BY author_id
HAVING sum(word_count) > 40000 HAVING sum(word_count) > 40000
ORDER BY sum(word_count) DESC; ORDER BY sum(word_count) DESC;
-- UNION/INTERSECT queries are unsupported -- UNION/INTERSECT queries are unsupported if on multiple shards
SELECT * FROM articles WHERE author_id = 10 UNION SELECT * FROM articles WHERE author_id = 10 UNION
SELECT * FROM articles WHERE author_id = 1; SELECT * FROM articles WHERE author_id = 2;
-- queries using CTEs are unsupported -- queries using CTEs are unsupported
WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 ) WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 )