diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 19dee0437..ff92fb32d 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -107,6 +107,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, instr_time planStart; instr_time planDuration; Query *originalQuery = NULL; + RelationRestrictionContext *restrictionContext = NULL; /* if local query, run the standard explain and return */ bool localQuery = !NeedsDistributedPlanning(query); @@ -137,22 +138,35 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, /* measure the full planning time to display in EXPLAIN ANALYZE */ INSTR_TIME_SET_CURRENT(planStart); - /* call standard planner to modify the query structure before multi planning */ - initialPlan = standard_planner(query, 0, params); + restrictionContext = CreateAndPushRestrictionContext(); - commandType = initialPlan->commandType; - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || - commandType == CMD_DELETE) + PG_TRY(); { - if (es->analyze) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on " - "distributed tables is not supported."))); - } - } + /* call standard planner to modify the query structure before multi planning */ + initialPlan = standard_planner(query, 0, params); - multiPlan = CreatePhysicalPlan(originalQuery, query); + commandType = initialPlan->commandType; + if (commandType == CMD_INSERT || commandType == CMD_UPDATE || + commandType == CMD_DELETE) + { + if (es->analyze) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on " + "distributed tables is not supported."))); + } + } + + multiPlan = CreatePhysicalPlan(originalQuery, query, restrictionContext); + } + PG_CATCH(); + { + PopRestrictionContext(); + PG_RE_THROW(); + } + PG_END_TRY(); + + PopRestrictionContext(); INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SUBTRACT(planDuration, planStart); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 469331f46..fa4384a02 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -1410,8 +1410,41 @@ FindNodesOfType(MultiNode *node, int type) /* - * NeedsDistributedPlanning checks if the passed in query is a Select query - * running on partitioned relations. If it is, we start distributed planning. + * IdentifyRTE assigns an identifier to an RTE, for tracking purposes. + * + * To be able to track RTEs through postgres' query planning, which copies and + * duplicate, and modifies them, we sometimes need to figure out whether two + * RTEs are copies of the same original RTE. For that we, hackishly, use a + * field normally unused in RTE_RELATION RTEs. + * + * The assigned identifier better be unique within a plantree. + */ +void +IdentifyRTE(RangeTblEntry *rte, int identifier) +{ + Assert(rte->rtekind == RTE_RELATION); + Assert(rte->values_lists == NIL); + rte->values_lists = list_make1_int(identifier); +} + + +/* GetRTEIdentity returns the identity assigned with IdentifyRTE. */ +int +GetRTEIdentity(RangeTblEntry *rte) +{ + Assert(rte->rtekind == RTE_RELATION); + Assert(IsA(rte->values_lists, IntList)); + Assert(list_length(rte->values_lists) == 1); + + return linitial_int(rte->values_lists); +} + + +/* + * NeedsDistributedPlanning checks if the passed in query is a query running + * on a distributed table. If it is, we start distributed planning. + * + * For distributed relations it also assigns identifiers to the relevant RTEs. */ bool NeedsDistributedPlanning(Query *queryTree) @@ -1421,6 +1454,7 @@ NeedsDistributedPlanning(Query *queryTree) ListCell *rangeTableCell = NULL; bool hasLocalRelation = false; bool hasDistributedRelation = false; + int rteIdentifier = 1; if (commandType != CMD_SELECT && commandType != CMD_INSERT && commandType != CMD_UPDATE && commandType != CMD_DELETE) @@ -1441,6 +1475,17 @@ NeedsDistributedPlanning(Query *queryTree) if (IsDistributedTable(relationId)) { hasDistributedRelation = true; + + /* + * To be able to track individual RTEs through postgres' query + * planning, we need to be able to figure out whether an RTE is + * actually a copy of another, rather than a different one. We + * simply number the RTEs starting from 1. + */ + if (rangeTableEntry->rtekind == RTE_RELATION) + { + IdentifyRTE(rangeTableEntry, rteIdentifier++); + } } else { diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index d79c584aa..8cb91cd28 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -31,6 +31,10 @@ #include "utils/memutils.h" + +static List *relationRestrictionContextList = NIL; + + /* local function forward declarations */ static void CheckNodeIsDumpable(Node *node); @@ -46,6 +50,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PlannedStmt *result = NULL; bool needsDistributedPlanning = NeedsDistributedPlanning(parse); Query *originalQuery = NULL; + RelationRestrictionContext *restrictionContext = NULL; /* * standard_planner scribbles on it's input, but for deparsing we need the @@ -56,19 +61,36 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) originalQuery = copyObject(parse); } - /* - * First call into standard planner. This is required because the Citus - * planner relies on parse tree transformations made by postgres' planner. - */ - result = standard_planner(parse, cursorOptions, boundParams); + /* create a restriction context and put it at the end if context list */ + restrictionContext = CreateAndPushRestrictionContext(); - if (needsDistributedPlanning) + PG_TRY(); { - MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse); + /* + * First call into standard planner. This is required because the Citus + * planner relies on parse tree transformations made by postgres' planner. + */ - /* store required data into the planned statement */ - result = MultiQueryContainerNode(result, physicalPlan); + result = standard_planner(parse, cursorOptions, boundParams); + + if (needsDistributedPlanning) + { + MultiPlan *physicalPlan = CreatePhysicalPlan(originalQuery, parse, + restrictionContext); + + /* store required data into the planned statement */ + result = MultiQueryContainerNode(result, physicalPlan); + } } + PG_CATCH(); + { + PopRestrictionContext(); + PG_RE_THROW(); + } + PG_END_TRY(); + + /* remove the context from the context list */ + PopRestrictionContext(); return result; } @@ -82,10 +104,11 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * physical plan process needed to produce distributed query plans. */ MultiPlan * -CreatePhysicalPlan(Query *originalQuery, Query *query) +CreatePhysicalPlan(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext) { MultiPlan *physicalPlan = MultiRouterPlanCreate(originalQuery, query, - TaskExecutorType); + TaskExecutorType, restrictionContext); if (physicalPlan == NULL) { /* Create and optimize logical plan */ @@ -296,3 +319,90 @@ CheckNodeIsDumpable(Node *node) pfree(out); #endif } + + +/* + * multi_relation_restriction_hook is a hook called by postgresql standard planner + * to notify us about various planning information regarding a relation. We use + * it to retrieve restrictions on relations. + */ +void +multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, + RangeTblEntry *rte) +{ + RelationRestrictionContext *restrictionContext = NULL; + RelationRestriction *relationRestriction = NULL; + bool distributedTable = false; + bool localTable = false; + + if (rte->rtekind != RTE_RELATION) + { + return; + } + + distributedTable = IsDistributedTable(rte->relid); + localTable = !distributedTable; + + restrictionContext = CurrentRestrictionContext(); + Assert(restrictionContext != NULL); + + relationRestriction = palloc0(sizeof(RelationRestriction)); + relationRestriction->index = index; + relationRestriction->relationId = rte->relid; + relationRestriction->rte = rte; + relationRestriction->relOptInfo = relOptInfo; + relationRestriction->distributedRelation = distributedTable; + relationRestriction->plannerInfo = root; + relationRestriction->prunedShardIntervalList = NIL; + + restrictionContext->hasDistributedRelation |= distributedTable; + restrictionContext->hasLocalRelation |= localTable; + + restrictionContext->relationRestrictionList = + lappend(restrictionContext->relationRestrictionList, relationRestriction); +} + + +/* + * CreateAndPushRestrictionContext creates a new restriction context, inserts it to the + * beginning of the context list, and returns the newly created context. + */ +RelationRestrictionContext * +CreateAndPushRestrictionContext(void) +{ + RelationRestrictionContext *restrictionContext = + palloc0(sizeof(RelationRestrictionContext)); + relationRestrictionContextList = lcons(restrictionContext, + relationRestrictionContextList); + + return restrictionContext; +} + + +/* + * CurrentRestrictionContext returns the the last restriction context from the + * list. + */ +RelationRestrictionContext * +CurrentRestrictionContext(void) +{ + RelationRestrictionContext *restrictionContext = NULL; + + Assert(relationRestrictionContextList != NIL); + + restrictionContext = + (RelationRestrictionContext *) linitial(relationRestrictionContextList); + + return restrictionContext; +} + + +/* + * PopRestrictionContext removes the most recently added restriction context from + * context list. The function assumes the list is not empty. + */ +void +PopRestrictionContext(void) +{ + relationRestrictionContextList = list_delete_first(relationRestrictionContextList); +} diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 256541c68..1c54e58bb 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -18,7 +18,9 @@ #include "access/stratnum.h" #include "access/xact.h" #include "distributed/citus_clauses.h" +#include "catalog/pg_type.h" #include "distributed/citus_nodes.h" +#include "distributed/citus_nodefuncs.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" @@ -40,6 +42,7 @@ #include "nodes/pg_list.h" #include "nodes/primnodes.h" #include "optimizer/clauses.h" +#include "optimizer/restrictinfo.h" #include "parser/parsetree.h" #include "storage/lock.h" #include "utils/elog.h" @@ -65,17 +68,25 @@ static bool MasterIrreducibleExpression(Node *expression, bool *varArgument, static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *state); static char MostPermissiveVolatileFlag(char left, char right); static Task * RouterModifyTask(Query *originalQuery, Query *query); -static ShardInterval * TargetShardInterval(Query *query); +static ShardInterval * TargetShardIntervalForModify(Query *query); static List * QueryRestrictList(Query *query); static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); static ShardInterval * FastShardPruning(Oid distributedTableId, Const *partionColumnValue); static Oid ExtractFirstDistributedTableId(Query *query); static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); -static Task * RouterSelectTask(Query *originalQuery, Query *query); -static Job * RouterQueryJob(Query *query, Task *task); -static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType); -static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); +static Task * RouterSelectTask(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext, + List **placementList); +static List * TargetShardIntervalsForSelect(Query *query, + RelationRestrictionContext *restrictionContext); +static List * WorkersContainingAllShards(List *prunedShardIntervalsList); +static List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); +static bool UpdateRelationNames(Node *node, + RelationRestrictionContext *restrictionContext); +static Job * RouterQueryJob(Query *query, Task *task, List *placementList); +static bool MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, + RelationRestrictionContext *restrictionContext); /* @@ -87,21 +98,23 @@ static bool ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column); */ MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, - MultiExecutorType taskExecutorType) + MultiExecutorType taskExecutorType, + RelationRestrictionContext *restrictionContext) { Task *task = NULL; Job *job = NULL; MultiPlan *multiPlan = NULL; CmdType commandType = query->commandType; bool modifyTask = false; + List *placementList = NIL; - bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType); + bool routerPlannable = MultiRouterPlannableQuery(query, taskExecutorType, + restrictionContext); if (!routerPlannable) { return NULL; } - ereport(DEBUG2, (errmsg("Creating router plan"))); if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) @@ -118,11 +131,17 @@ MultiRouterPlanCreate(Query *originalQuery, Query *query, { Assert(commandType == CMD_SELECT); - task = RouterSelectTask(originalQuery, query); + task = RouterSelectTask(originalQuery, query, restrictionContext, &placementList); } + if (task == NULL) + { + return NULL; + } - job = RouterQueryJob(originalQuery, task); + ereport(DEBUG2, (errmsg("Creating router plan"))); + + job = RouterQueryJob(originalQuery, task, placementList); multiPlan = CitusMakeNode(MultiPlan); multiPlan->workerJob = job; @@ -669,7 +688,7 @@ MostPermissiveVolatileFlag(char left, char right) static Task * RouterModifyTask(Query *originalQuery, Query *query) { - ShardInterval *shardInterval = TargetShardInterval(query); + ShardInterval *shardInterval = TargetShardIntervalForModify(query); uint64 shardId = shardInterval->shardId; StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; @@ -713,25 +732,23 @@ RouterModifyTask(Query *originalQuery, Query *query) /* - * TargetShardInterval determines the single shard targeted by a provided command. - * If no matching shards exist, or if the modification targets more than one one - * shard, this function raises an error depending on the command type. + * TargetShardIntervalForModify determines the single shard targeted by a provided + * modify command. If no matching shards exist, or if the modification targets more + * than one shard, this function raises an error depending on the command type. */ static ShardInterval * -TargetShardInterval(Query *query) +TargetShardIntervalForModify(Query *query) { - CmdType commandType = query->commandType; - bool selectTask = (commandType == CMD_SELECT); List *prunedShardList = NIL; int prunedShardCount = 0; - - int shardCount = 0; Oid distributedTableId = ExtractFirstDistributedTableId(query); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); char partitionMethod = cacheEntry->partitionMethod; bool fastShardPruningPossible = false; + Assert(query->commandType != CMD_SELECT); + /* error out if no shards exist for the table */ shardCount = cacheEntry->shardIntervalArrayLength; if (shardCount == 0) @@ -774,18 +791,9 @@ TargetShardInterval(Query *query) prunedShardCount = list_length(prunedShardList); if (prunedShardCount != 1) { - if (selectTask) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("router executor queries must target exactly one " - "shard"))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributed modifications must target exactly one " - "shard"))); - } + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("distributed modifications must target exactly one " + "shard"))); } return (ShardInterval *) linitial(prunedShardList); @@ -963,22 +971,88 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn) /* RouterSelectTask builds a Task to represent a single shard select query */ static Task * -RouterSelectTask(Query *originalQuery, Query *query) +RouterSelectTask(Query *originalQuery, Query *query, + RelationRestrictionContext *restrictionContext, + List **placementList) { Task *task = NULL; - ShardInterval *shardInterval = TargetShardInterval(query); + List *prunedRelationShardList = TargetShardIntervalsForSelect(query, + restrictionContext); StringInfo queryString = makeStringInfo(); uint64 shardId = INVALID_SHARD_ID; bool upsertQuery = false; CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; + ListCell *prunedRelationShardListCell = NULL; + List *workerList = NIL; + bool shardsPresent = false; + + *placementList = NIL; + + if (prunedRelationShardList == NULL) + { + return NULL; + } - Assert(shardInterval != NULL); Assert(commandType == CMD_SELECT); - shardId = shardInterval->shardId; + foreach(prunedRelationShardListCell, prunedRelationShardList) + { + List *prunedShardList = (List *) lfirst(prunedRelationShardListCell); + ShardInterval *shardInterval = NULL; - deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString); - ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + /* no shard is present or all shards are pruned out case will be handled later */ + if (prunedShardList == NIL) + { + continue; + } + + shardsPresent = true; + + /* all relations are now pruned down to 0 or 1 shards */ + Assert(list_length(prunedShardList) <= 1); + + /* anchor shard id */ + if (shardId == INVALID_SHARD_ID) + { + shardInterval = (ShardInterval *) linitial(prunedShardList); + shardId = shardInterval->shardId; + } + } + + /* + * Determine the worker that has all shard placements if a shard placement found. + * If no shard placement exists, we will still run the query but the result will + * be empty. We create a dummy shard placement for the first active worker. + */ + if (shardsPresent) + { + workerList = WorkersContainingAllShards(prunedRelationShardList); + } + else + { + List *workerNodeList = WorkerNodeList(); + if (workerNodeList != NIL) + { + WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList); + ShardPlacement *dummyPlacement = + (ShardPlacement *) CitusMakeNode(ShardPlacement); + dummyPlacement->nodeName = workerNode->workerName; + dummyPlacement->nodePort = workerNode->workerPort; + + workerList = lappend(workerList, dummyPlacement); + } + } + + if (workerList == NIL) + { + ereport(DEBUG2, (errmsg("Found no worker with all shard placements"))); + + return NULL; + } + + UpdateRelationNames((Node *) originalQuery, restrictionContext); + + pg_get_query_def(originalQuery, queryString); task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; @@ -990,16 +1064,338 @@ RouterSelectTask(Query *originalQuery, Query *query) task->upsertQuery = upsertQuery; task->requiresMasterEvaluation = false; + *placementList = workerList; + return task; } +/* + * TargetShardIntervalsForSelect performs shard pruning for all referenced relations + * in the query and returns list of shards per relation. Shard pruning is done based + * on provided restriction context per relation. The function bails out and returns NULL + * if any of the relations pruned down to more than one active shard. It also records + * pruned shard intervals in relation restriction context to be used later on. + */ +static List * +TargetShardIntervalsForSelect(Query *query, + RelationRestrictionContext *restrictionContext) +{ + List *prunedRelationShardList = NIL; + ListCell *restrictionCell = NULL; + + Assert(query->commandType == CMD_SELECT); + Assert(restrictionContext != NULL); + + foreach(restrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = + (RelationRestriction *) lfirst(restrictionCell); + Oid relationId = relationRestriction->relationId; + Index tableId = relationRestriction->index; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + int shardCount = cacheEntry->shardIntervalArrayLength; + List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo; + List *restrictClauseList = extract_actual_clauses(baseRestrictionList, false); + List *shardIntervalList = NIL; + List *prunedShardList = NIL; + int shardIndex = 0; + + relationRestriction->prunedShardIntervalList = NIL; + + for (shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + ShardInterval *shardInterval = + cacheEntry->sortedShardIntervalArray[shardIndex]; + shardIntervalList = lappend(shardIntervalList, shardInterval); + } + + if (shardCount > 0) + { + prunedShardList = PruneShardList(relationId, tableId, + restrictClauseList, + shardIntervalList); + + /* + * Quick bail out. The query can not be router plannable if one + * relation has more than one shard left after pruning. Having no + * shard left is okay at this point. It will be handled at a later + * stage. + */ + if (list_length(prunedShardList) > 1) + { + return NULL; + } + } + + relationRestriction->prunedShardIntervalList = prunedShardList; + prunedRelationShardList = lappend(prunedRelationShardList, prunedShardList); + } + + return prunedRelationShardList; +} + + +/* + * WorkersContainingAllShards returns list of shard placements that contain all + * shard intervals provided to the function. It returns NIL if no placement exists. + * The caller should check if there are any shard intervals exist for placement + * check prior to calling this function. + */ +static List * +WorkersContainingAllShards(List *prunedShardIntervalsList) +{ + ListCell *prunedShardIntervalCell = NULL; + bool firstShard = true; + List *currentPlacementList = NIL; + + foreach(prunedShardIntervalCell, prunedShardIntervalsList) + { + List *shardIntervalList = (List *) lfirst(prunedShardIntervalCell); + ShardInterval *shardInterval = NULL; + uint64 shardId = INVALID_SHARD_ID; + List *newPlacementList = NIL; + + if (shardIntervalList == NIL) + { + continue; + } + + Assert(list_length(shardIntervalList) == 1); + + shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* retrieve all active shard placements for this shard */ + newPlacementList = FinalizedShardPlacementList(shardId); + + if (firstShard) + { + firstShard = false; + currentPlacementList = newPlacementList; + } + else + { + /* keep placements that still exists for this shard */ + currentPlacementList = IntersectPlacementList(currentPlacementList, + newPlacementList); + } + + /* + * Bail out if placement list becomes empty. This means there is no worker + * containing all shards referecend by the query, hence we can not forward + * this query directly to any worker. + */ + if (currentPlacementList == NIL) + { + break; + } + } + + return currentPlacementList; +} + + +/* + * IntersectPlacementList performs placement pruning based on matching on + * nodeName:nodePort fields of shard placement data. We start pruning from all + * placements of the first relation's shard. Then for each relation's shard, we + * compute intersection of the new shards placement with existing placement list. + * This operation could have been done using other methods, but since we do not + * expect very high replication factor, iterating over a list and making string + * comparisons should be sufficient. + */ +static List * +IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList) +{ + ListCell *lhsPlacementCell = NULL; + List *placementList = NIL; + + /* Keep existing placement in the list if it is also present in new placement list */ + foreach(lhsPlacementCell, lhsPlacementList) + { + ShardPlacement *lhsPlacement = (ShardPlacement *) lfirst(lhsPlacementCell); + ListCell *rhsPlacementCell = NULL; + foreach(rhsPlacementCell, rhsPlacementList) + { + ShardPlacement *rhsPlacement = (ShardPlacement *) lfirst(rhsPlacementCell); + if (rhsPlacement->nodePort == lhsPlacement->nodePort && + strncmp(rhsPlacement->nodeName, lhsPlacement->nodeName, + WORKER_LENGTH) == 0) + { + placementList = lappend(placementList, rhsPlacement); + } + } + } + + return placementList; +} + + +/* + * ConvertRteToSubqueryWithEmptyResult converts given relation RTE into + * subquery RTE that returns no results. + */ +static void +ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte) +{ + Relation relation = heap_open(rte->relid, NoLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + int columnCount = tupleDescriptor->natts; + int columnIndex = 0; + Query *subquery = NULL; + List *targetList = NIL; + FromExpr *joinTree = NULL; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex]; + TargetEntry *targetEntry = NULL; + StringInfo resname = NULL; + Const *constValue = NULL; + + if (attributeForm->attisdropped) + { + continue; + } + + resname = makeStringInfo(); + constValue = makeNullConst(attributeForm->atttypid, attributeForm->atttypmod, + attributeForm->attcollation); + + appendStringInfo(resname, "%s", attributeForm->attname.data); + + targetEntry = makeNode(TargetEntry); + targetEntry->expr = (Expr *) constValue; + targetEntry->resno = columnIndex; + targetEntry->resname = resname->data; + + targetList = lappend(targetList, targetEntry); + } + + heap_close(relation, NoLock); + + joinTree = makeNode(FromExpr); + joinTree->quals = makeBoolConst(false, false); + + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_ORIGINAL; + subquery->canSetTag = true; + subquery->targetList = targetList; + subquery->jointree = joinTree; + + rte->rtekind = RTE_SUBQUERY; + rte->subquery = subquery; + rte->alias = copyObject(rte->eref); +} + + +/* + * UpdateRelationNames walks over the query tree and appends shard ids to + * relations. It uses unique identity value to establish connection between a + * shard and the range table entry. If the range table id is not given a + * identity, than the relation is not referenced from the query, no connection + * could be found between a shard and this relation. Therefore relation is replaced + * by set of NULL values so that the query would work at worker without any problems. + * + */ +static bool +UpdateRelationNames(Node *node, RelationRestrictionContext *restrictionContext) +{ + RangeTblEntry *newRte = NULL; + uint64 shardId = INVALID_SHARD_ID; + Oid relationId = InvalidOid; + Oid schemaId = InvalidOid; + char *relationName = NULL; + char *schemaName = NULL; + ListCell *relationRestrictionCell = NULL; + RelationRestriction *relationRestriction = NULL; + List *shardIntervalList = NIL; + ShardInterval *shardInterval = NULL; + bool replaceRteWithNullValues = false; + + if (node == NULL) + { + return false; + } + + /* want to look at all RTEs, even in subqueries, CTEs and such */ + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, UpdateRelationNames, restrictionContext, + QTW_EXAMINE_RTES); + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, UpdateRelationNames, restrictionContext); + } + + + newRte = (RangeTblEntry *) node; + + if (newRte->rtekind != RTE_RELATION) + { + return false; + } + + /* + * Search for the restrictions associated with the RTE. There better be + * some, otherwise this query wouldn't be elegible as a router query. + * + * FIXME: We should probably use a hashtable here, to do efficient + * lookup. + */ + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + relationRestriction = + (RelationRestriction *) lfirst(relationRestrictionCell); + + if (GetRTEIdentity(relationRestriction->rte) == GetRTEIdentity(newRte)) + { + break; + } + + relationRestriction = NULL; + } + + replaceRteWithNullValues = (relationRestriction == NULL) || + relationRestriction->prunedShardIntervalList == NIL; + + if (replaceRteWithNullValues) + { + ConvertRteToSubqueryWithEmptyResult(newRte); + return false; + } + + Assert(relationRestriction != NULL); + + shardIntervalList = relationRestriction->prunedShardIntervalList; + + Assert(list_length(shardIntervalList) == 1); + shardInterval = (ShardInterval *) linitial(shardIntervalList); + + shardId = shardInterval->shardId; + relationId = shardInterval->relationId; + relationName = get_rel_name(relationId); + AppendShardIdToName(&relationName, shardId); + + schemaId = get_rel_namespace(relationId); + schemaName = get_namespace_name(schemaId); + + ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL); + + return false; +} + + /* * RouterQueryJob creates a Job for the specified query to execute the * provided single shard select task. */ static Job * -RouterQueryJob(Query *query, Task *task) +RouterQueryJob(Query *query, Task *task, List *placementList) { Job *job = NULL; List *taskList = NIL; @@ -1007,7 +1403,8 @@ RouterQueryJob(Query *query, Task *task) /* * We send modify task to the first replica, otherwise we choose the target shard - * according to task assignment policy. + * according to task assignment policy. Placement list for select queries are + * provided as function parameter. */ if (taskType == MODIFY_TASK) { @@ -1015,7 +1412,10 @@ RouterQueryJob(Query *query, Task *task) } else { - taskList = AssignAnchorShardTaskList(list_make1(task)); + Assert(placementList != NIL); + + task->taskPlacementList = placementList; + taskList = list_make1(task); } job = CitusMakeNode(Job); @@ -1036,22 +1436,11 @@ RouterQueryJob(Query *query, Task *task) * partition column. This feature is enabled if task executor is set to real-time */ bool -MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType) +MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType, + RelationRestrictionContext *restrictionContext) { - uint32 rangeTableId = 1; - List *rangeTableList = NIL; - RangeTblEntry *rangeTableEntry = NULL; - Oid distributedTableId = InvalidOid; - Var *partitionColumn = NULL; - char partitionMethod = '\0'; - Node *quals = NULL; - CmdType commandType PG_USED_FOR_ASSERTS_ONLY = query->commandType; - FromExpr *joinTree = query->jointree; - List *varClauseList = NIL; - ListCell *varClauseCell = NULL; - bool partitionColumnMatchExpression = false; - int partitionColumnReferenceCount = 0; - int shardCount = 0; + CmdType commandType = query->commandType; + ListCell *relationRestrictionContextCell = NULL; if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) @@ -1059,6 +1448,7 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType) return true; } + /* FIXME: I tend to think it's time to remove this */ if (taskExecutorType != MULTI_EXECUTOR_REAL_TIME) { return false; @@ -1066,171 +1456,28 @@ MultiRouterPlannableQuery(Query *query, MultiExecutorType taskExecutorType) Assert(commandType == CMD_SELECT); - /* - * Reject subqueries which are in SELECT or WHERE clause. - * Queries which are recursive, with CommonTableExpr, with locking (hasForUpdate), - * or with window functions are also rejected here. - * Queries which have subqueries, or tablesamples in FROM clauses are rejected later - * during RangeTblEntry checks. - */ - if (query->hasSubLinks == true || query->cteList != NIL || query->hasForUpdate || - query->hasRecursive) + if (query->hasForUpdate) { return false; } - if (query->groupingSets) + foreach(relationRestrictionContextCell, restrictionContext->relationRestrictionList) { - return false; - } - - /* only hash partitioned tables are supported */ - distributedTableId = ExtractFirstDistributedTableId(query); - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - partitionMethod = PartitionMethod(distributedTableId); - - if (partitionMethod != DISTRIBUTE_BY_HASH) - { - return false; - } - - /* extract range table entries */ - ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); - - /* query can have only one range table of type RTE_RELATION */ - if (list_length(rangeTableList) != 1) - { - return false; - } - - rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); - if (rangeTableEntry->rtekind != RTE_RELATION) - { - return false; - } - - if (rangeTableEntry->tablesample) - { - return false; - } - - if (joinTree == NULL) - { - return false; - } - - quals = joinTree->quals; - if (quals == NULL) - { - return false; - } - - /* convert list of expressions into expression tree */ - if (quals != NULL && IsA(quals, List)) - { - quals = (Node *) make_ands_explicit((List *) quals); - } - - /* - * Partition column must be used in a simple equality match check and it must be - * place at top level conjustion operator. - */ - partitionColumnMatchExpression = - ColumnMatchExpressionAtTopLevelConjunction(quals, partitionColumn); - - if (!partitionColumnMatchExpression) - { - return false; - } - - /* make sure partition column is used only once in the query */ - varClauseList = pull_var_clause_default(quals); - foreach(varClauseCell, varClauseList) - { - Var *column = (Var *) lfirst(varClauseCell); - if (equal(column, partitionColumn)) + RelationRestriction *relationRestriction = + (RelationRestriction *) lfirst(relationRestrictionContextCell); + RangeTblEntry *rte = relationRestriction->rte; + if (rte->rtekind == RTE_RELATION) { - partitionColumnReferenceCount++; - } - } + /* only hash partitioned tables are supported */ + Oid distributedTableId = rte->relid; + char partitionMethod = PartitionMethod(distributedTableId); - if (partitionColumnReferenceCount != 1) - { - return false; - } - - /* - * We need to make sure there is at least one shard for this hash partitioned - * query to be router plannable. We can not prepare a router plan if there - * are no shards. - */ - shardCount = ShardIntervalCount(distributedTableId); - if (shardCount == 0) - { - return false; - } - - return true; -} - - -/* - * ColumnMatchExpressionAtTopLevelConjunction returns true if the query contains an exact - * match (equal) expression on the provided column. The function returns true only - * if the match expression has an AND relation with the rest of the expression tree. - */ -static bool -ColumnMatchExpressionAtTopLevelConjunction(Node *node, Var *column) -{ - if (node == NULL) - { - return false; - } - - if (IsA(node, OpExpr)) - { - OpExpr *opExpr = (OpExpr *) node; - bool simpleExpression = SimpleOpExpression((Expr *) opExpr); - bool columnInExpr = false; - bool usingEqualityOperator = false; - - if (!simpleExpression) - { - return false; - } - - columnInExpr = OpExpressionContainsColumn(opExpr, column); - if (!columnInExpr) - { - return false; - } - - usingEqualityOperator = OperatorImplementsEquality(opExpr->opno); - - return usingEqualityOperator; - } - else if (IsA(node, BoolExpr)) - { - BoolExpr *boolExpr = (BoolExpr *) node; - List *argumentList = boolExpr->args; - ListCell *argumentCell = NULL; - - if (boolExpr->boolop != AND_EXPR) - { - return false; - } - - foreach(argumentCell, argumentList) - { - Node *argumentNode = (Node *) lfirst(argumentCell); - bool columnMatch = - ColumnMatchExpressionAtTopLevelConjunction(argumentNode, column); - if (columnMatch) + if (partitionMethod != DISTRIBUTE_BY_HASH) { - return true; + return false; } } } - return false; + return true; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 5eb231099..bc8abd0b7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -35,6 +35,7 @@ #include "distributed/worker_protocol.h" #include "postmaster/postmaster.h" #include "optimizer/planner.h" +#include "optimizer/paths.h" #include "utils/guc.h" #include "utils/guc_tables.h" @@ -142,6 +143,9 @@ _PG_init(void) /* register utility hook */ ProcessUtility_hook = multi_ProcessUtility; + /* register for planner hook */ + set_rel_pathlist_hook = multi_relation_restriction_hook; + /* organize that task tracker is started once server is up */ TaskTrackerRegister(); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index ce9a8d092..839dd740b 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -181,6 +181,8 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree); +extern int GetRTEIdentity(RangeTblEntry *rte); +extern void IdentifyRTE(RangeTblEntry *rte, int identifier); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 2a141abe3..8dac47e00 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -13,14 +13,41 @@ #include "nodes/plannodes.h" #include "nodes/relation.h" + +typedef struct RelationRestrictionContext +{ + bool hasDistributedRelation; + bool hasLocalRelation; + List *relationRestrictionList; +} RelationRestrictionContext; + +typedef struct RelationRestriction +{ + Index index; + Oid relationId; + bool distributedRelation; + RangeTblEntry *rte; + RelOptInfo *relOptInfo; + PlannerInfo *plannerInfo; + List *prunedShardIntervalList; +} RelationRestriction; + + extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern bool HasCitusToplevelNode(PlannedStmt *planStatement); struct MultiPlan; -extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query); +extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query, + RelationRestrictionContext * + restrictionContext); extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, struct MultiPlan *multiPlan); +extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, + Index index, RangeTblEntry *rte); +extern RelationRestrictionContext * CreateAndPushRestrictionContext(void); +extern RelationRestrictionContext * CurrentRestrictionContext(void); +extern void PopRestrictionContext(void); #endif /* MULTI_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 36210561a..c77f13482 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -16,6 +16,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/multi_server_executor.h" #include "nodes/parsenodes.h" @@ -29,7 +30,8 @@ extern MultiPlan * MultiRouterPlanCreate(Query *originalQuery, Query *query, - MultiExecutorType taskExecutorType); + MultiExecutorType taskExecutorType, + RelationRestrictionContext *restrictionContext); extern void ErrorIfModifyQueryNotSupported(Query *queryTree); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out index 0eb4da5ae..a6f5a0688 100644 --- a/src/test/regress/expected/multi_hash_pruning.out +++ b/src/test/regress/expected/multi_hash_pruning.out @@ -48,10 +48,10 @@ SELECT count(*) FROM orders_hash_partitioned; (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -59,10 +59,10 @@ DEBUG: Plan is router executable (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -70,10 +70,10 @@ DEBUG: Plan is router executable (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -81,10 +81,10 @@ DEBUG: Plan is router executable (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630000 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -93,10 +93,10 @@ DEBUG: Plan is router executable SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1 AND o_clerk = 'aaa'; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -104,10 +104,10 @@ DEBUG: Plan is router executable (1 row) SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan DEBUG: Plan is router executable count ------- @@ -198,6 +198,8 @@ SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2; SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1 OR o_orderkey = 2; DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 count ------- @@ -214,6 +216,8 @@ SELECT count(*) FROM orders_hash_partitioned SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa'); DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 count ------- @@ -232,6 +236,8 @@ SELECT count(*) FROM DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable count ------- 0 @@ -242,6 +248,8 @@ DEBUG: predicate pruning for shardId 630003 SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = ANY ('{1,2,3}'); NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +NOTICE: cannot use shard pruning with ANY/ALL (array expression) HINT: Consider rewriting the expression with OR/AND clauses. count ------- @@ -285,6 +293,8 @@ SELECT count(*) FROM orders_hash_partitioned DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable count ------- 0 @@ -319,9 +329,11 @@ SELECT count(*) DEBUG: predicate pruning for shardId 630001 DEBUG: predicate pruning for shardId 630002 DEBUG: predicate pruning for shardId 630003 -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] -DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable count ------- 0 diff --git a/src/test/regress/expected/multi_hash_pruning_0.out b/src/test/regress/expected/multi_hash_pruning_0.out new file mode 100644 index 000000000..35b3ea99b --- /dev/null +++ b/src/test/regress/expected/multi_hash_pruning_0.out @@ -0,0 +1,329 @@ +-- +-- MULTI_HASH_PRUNING +-- +-- Tests for shard and join pruning logic on hash partitioned tables. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 630000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 630000; +-- Create a table partitioned on integer column and update partition type to +-- hash. Then stage data to this table and update shard min max values with +-- hashed ones. Hash value of 1, 2, 3 and 4 are consecutively -1905060026, +-- 1134484726, -28094569 and -1011077333. +CREATE TABLE orders_hash_partitioned ( + o_orderkey integer, + o_custkey integer, + o_orderstatus char(1), + o_totalprice decimal(15,2), + o_orderdate date, + o_orderpriority char(15), + o_clerk char(15), + o_shippriority integer, + o_comment varchar(79) ); +SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +SET client_min_messages TO DEBUG2; +-- Check that we can prune shards for simple cases, boolean expressions and +-- immutable functions. +-- Since router plans are not triggered for task-tracker executor type, +-- we need to run the tests that triggers router planning seperately for +-- both executors. Otherwise, check-full fails on the task-tracker. +-- Later, we need to switch back to the actual task executor +-- to contuinue with correct executor type for check-full. +SELECT quote_literal(current_setting('citus.task_executor_type')) AS actual_task_executor +\gset +SET citus.task_executor_type TO 'real-time'; +SELECT count(*) FROM orders_hash_partitioned; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 AND o_clerk = 'aaa'; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: Creating router plan +DEBUG: Plan is router executable + count +------- + 0 +(1 row) + +SET citus.task_executor_type TO 'task-tracker'; +SELECT count(*) FROM orders_hash_partitioned; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4; +DEBUG: predicate pruning for shardId 630000 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 AND o_clerk = 'aaa'; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1); +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SET citus.task_executor_type TO :actual_task_executor; +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 OR o_orderkey = 2; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 OR o_clerk = 'aaa'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa'); +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 OR o_orderkey is NULL; + count +------- + 0 +(1 row) + +SELECT count(*) FROM + (SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +-- Check that we don't support pruning for ANY (array expression) and give +-- a notice message when used with the partition column +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = ANY ('{1,2,3}'); +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. + count +------- + 0 +(1 row) + +-- Check that we don't show the message if the operator is not +-- equality operator +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey < ALL ('{1,2,3}'); + count +------- + 0 +(1 row) + +-- Check that we don't give a spurious hint message when non-partition +-- columns are used with ANY/IN/ALL +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = 1 OR o_totalprice IN (2, 5); + count +------- + 0 +(1 row) + +-- Check that we cannot prune for mutable functions. +SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random(); + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = random() OR o_orderkey = 1; + count +------- + 0 +(1 row) + +SELECT count(*) FROM orders_hash_partitioned + WHERE o_orderkey = random() AND o_orderkey = 1; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 + count +------- + 0 +(1 row) + +-- Check that we can do join pruning. +SELECT count(*) + FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2 + WHERE orders1.o_orderkey = orders2.o_orderkey; +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] +DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823] +DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647] +DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1] +DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825] +DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] +DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] + count +------- + 0 +(1 row) + +SELECT count(*) + FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2 + WHERE orders1.o_orderkey = orders2.o_orderkey + AND orders1.o_orderkey = 1 + AND orders2.o_orderkey is NULL; +DEBUG: predicate pruning for shardId 630001 +DEBUG: predicate pruning for shardId 630002 +DEBUG: predicate pruning for shardId 630003 +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823] +DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647] + count +------- + 0 +(1 row) + diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index aaf2165e6..aeb69e202 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -90,7 +90,7 @@ SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash'); (1 row) -SELECT master_create_worker_shards('customer_hash', 1, 1); +SELECT master_create_worker_shards('customer_hash', 2, 1); master_create_worker_shards ----------------------------- diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 3001c23b7..21393dcc9 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -31,7 +31,7 @@ SELECT master_create_distributed_table('articles_single_shard_hash', 'author_id' SELECT count(*) from articles_hash; count ------- - + 0 (1 row) SELECT master_create_worker_shards('articles_hash', 2, 1); @@ -104,20 +104,11 @@ SET client_min_messages TO 'DEBUG2'; INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); DEBUG: Creating router plan DEBUG: Plan is router executable --- first, test zero-shard SELECT, which should return an empty row -SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; -DEBUG: predicate pruning for shardId 840000 -DEBUG: predicate pruning for shardId 840001 - count -------- - -(1 row) - -- single-shard tests -- test simple select for a single row SELECT * FROM articles_hash WHERE author_id = 10 AND id = 50; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+-----------+------------ @@ -126,8 +117,8 @@ DEBUG: Plan is router executable -- get all titles by a single author SELECT title FROM articles_hash WHERE author_id = 10; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable title ------------ @@ -142,8 +133,8 @@ DEBUG: Plan is router executable SELECT title, word_count FROM articles_hash WHERE author_id = 10 ORDER BY word_count DESC NULLS LAST; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable title | word_count ------------+------------ @@ -159,8 +150,8 @@ SELECT title, id FROM articles_hash WHERE author_id = 5 ORDER BY id LIMIT 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable title | id ---------+---- @@ -174,6 +165,8 @@ SELECT title, author_id FROM articles_hash WHERE author_id = 7 OR author_id = 8 ORDER BY author_id ASC, id; DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable title | author_id -------------+----------- aseptic | 7 @@ -192,6 +185,7 @@ DEBUG: predicate pruning for shardId 840001 SELECT title, author_id FROM articles_hash WHERE author_id = 7 OR author_id = 8; DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable title | author_id -------------+----------- @@ -214,28 +208,31 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash GROUP BY author_id HAVING sum(word_count) > 1000 ORDER BY sum(word_count) DESC; -ERROR: cannot perform distributed planning on this query -DETAIL: Having qual is currently unsupported +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + author_id | corpus_size +-----------+------------- + 10 | 59955 + 8 | 55410 + 7 | 36756 + 1 | 35894 +(4 rows) + -- however having clause is supported if it goes to a single shard SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash WHERE author_id = 1 GROUP BY author_id HAVING sum(word_count) > 1000 ORDER BY sum(word_count) DESC; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable author_id | corpus_size -----------+------------- 1 | 35894 (1 row) --- UNION/INTERSECT queries are unsupported --- this is rejected by router planner and handled by multi_logical_planner -SELECT * FROM articles_hash WHERE author_id = 10 UNION -SELECT * FROM articles_hash WHERE author_id = 1; -ERROR: cannot perform distributed planning on this query -DETAIL: Union, Intersect, or Except are currently unsupported -- query is a single shard query but can't do shard pruning, -- not router-plannable due to <= and IN SELECT * FROM articles_hash WHERE author_id <= 1; @@ -250,6 +247,8 @@ SELECT * FROM articles_hash WHERE author_id <= 1; SELECT * FROM articles_hash WHERE author_id IN (1, 3); NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +NOTICE: cannot use shard pruning with ANY/ALL (array expression) HINT: Consider rewriting the expression with OR/AND clauses. id | author_id | title | word_count ----+-----------+--------------+------------ @@ -265,18 +264,266 @@ HINT: Consider rewriting the expression with OR/AND clauses. 43 | 3 | affixal | 12723 (10 rows) --- queries using CTEs are unsupported +-- queries with CTEs are supported +WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) +SELECT * FROM first_author; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id +---- + 1 + 11 + 21 + 31 + 41 +(5 rows) + +-- queries with CTEs are supported even if CTE is not referenced inside query WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) SELECT title FROM articles_hash WHERE author_id = 1; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + title +-------------- + arsenous + alamo + arcading + athwartships + aznavour +(5 rows) + +-- two CTE joins are supported if they go to the same worker +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 1) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | id | title +----+-----------+----+-------------- + 1 | 1 | 1 | arsenous + 11 | 1 | 11 | alamo + 21 | 1 | 21 | arcading + 31 | 1 | 31 | athwartships + 41 | 1 | 41 | aznavour +(5 rows) + +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 3) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | id | title +----+-----------+----+------- +(0 rows) + +-- CTE joins are not supported if table shards are at different workers +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 2) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840000 +DEBUG: Found no worker with all shard placements ERROR: cannot perform distributed planning on this query -DETAIL: Common Table Expressions are currently unsupported --- queries which involve functions in FROM clause are unsupported. +DETAIL: Complex table expressions are currently unsupported +-- recursive CTEs are supported when filtered on partition column +CREATE TABLE company_employees (company_id int, employee_id int, manager_id int); +SELECT master_create_distributed_table('company_employees', 'company_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('company_employees', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO company_employees values(1, 1, 0); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(1, 2, 1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(1, 3, 1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(1, 4, 2); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(1, 5, 4); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(3, 1, 0); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(3, 15, 1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +INSERT INTO company_employees values(3, 3, 1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +-- find employees at top 2 level within company hierarchy +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 1 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id AND + ce.company_id = 1)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; +DEBUG: predicate pruning for shardId 840004 +DEBUG: predicate pruning for shardId 840005 +DEBUG: predicate pruning for shardId 840006 +DEBUG: predicate pruning for shardId 840004 +DEBUG: predicate pruning for shardId 840005 +DEBUG: predicate pruning for shardId 840006 +DEBUG: Creating router plan +DEBUG: Plan is router executable + company_id | employee_id | manager_id | level +------------+-------------+------------+------- + 1 | 1 | 0 | 1 + 1 | 2 | 1 | 2 + 1 | 3 | 1 | 2 +(3 rows) + +-- query becomes not router plannble and gets rejected +-- if filter on company is dropped +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 1 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; +DEBUG: predicate pruning for shardId 840004 +DEBUG: predicate pruning for shardId 840005 +DEBUG: predicate pruning for shardId 840006 +ERROR: cannot perform distributed planning on this query +DETAIL: Complex table expressions are currently unsupported +-- logically wrong query, query involves different shards +-- from the same table, but still router plannable due to +-- shard being placed on the same worker. +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 3 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id AND + ce.company_id = 2)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; +DEBUG: predicate pruning for shardId 840003 +DEBUG: predicate pruning for shardId 840005 +DEBUG: predicate pruning for shardId 840006 +DEBUG: predicate pruning for shardId 840003 +DEBUG: predicate pruning for shardId 840004 +DEBUG: predicate pruning for shardId 840005 +DEBUG: Creating router plan +DEBUG: Plan is router executable + company_id | employee_id | manager_id | level +------------+-------------+------------+------- + 3 | 1 | 0 | 1 +(1 row) + +-- grouping sets are supported on single shard +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 3 + GROUP BY GROUPING SETS ((id),(subtitle)); +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | subtitle | count +----+----------+------- + 1 | | 1 + 3 | | 1 + 11 | | 1 + 13 | | 1 + 21 | | 1 + 23 | | 1 + 31 | | 1 + 33 | | 1 + 41 | | 1 + 43 | | 1 + | b | 1 + | f | 1 + | l | 1 + | r | 2 + | s | 2 + | t | 1 + | u | 1 + | z | 1 +(18 rows) + +-- grouping sets are not supported on multiple shards +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)); +ERROR: cannot perform distributed planning on this query +DETAIL: Grouping sets, cube, and rollup is currently unsupported +-- queries which involve functions in FROM clause are supported if it goes to a single worker. SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count | position +----+-----------+--------------+------------+---------- + 1 | 1 | arsenous | 9572 | 3 + 11 | 1 | alamo | 1347 | 3 + 21 | 1 | arcading | 5890 | 3 + 31 | 1 | athwartships | 7271 | 3 + 41 | 1 | aznavour | 11814 | 3 +(5 rows) + +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count | position +----+-----------+--------------+------------+---------- + 1 | 1 | arsenous | 9572 | 3 + 3 | 3 | asternal | 10480 | 3 + 11 | 1 | alamo | 1347 | 3 + 13 | 3 | aseyev | 2255 | 3 + 21 | 1 | arcading | 5890 | 3 + 23 | 3 | abhorring | 6799 | 3 + 31 | 1 | athwartships | 7271 | 3 + 33 | 3 | autochrome | 8180 | 3 + 41 | 1 | aznavour | 11814 | 3 + 43 | 3 | affixal | 12723 | 3 +(10 rows) + +-- they are not supported if multiple workers are involved +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; ERROR: cannot perform distributed planning on this query DETAIL: Complex table expressions are currently unsupported -- subqueries are not supported in WHERE clause in Citus SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a'); ERROR: cannot plan queries that include both regular and partitioned relations +SELECT * FROM articles_hash WHERE author_id IN (SELECT author_id FROM articles_hash WHERE author_id = 1 or author_id = 3); +ERROR: cannot perform distributed planning on this query +DETAIL: Join types other than inner/outer joins are currently unsupported +SELECT * FROM articles_hash WHERE author_id = (SELECT 1); +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries other than in from-clause are currently unsupported -- subqueries are supported in FROM clause but they are not router plannable SELECT articles_hash.id,test.word_count FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id @@ -311,6 +558,42 @@ DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 ERROR: cannot use real time executor with repartition jobs HINT: Set citus.task_executor_type to "task-tracker". +SELECT articles_hash.id,test.word_count +FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test +WHERE test.id = articles_hash.id and articles_hash.author_id = 1 +ORDER BY articles_hash.id; +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 3 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 5 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 7 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 11 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 9 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 14 +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". -- subqueries are not supported in SELECT clause SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles_hash a; @@ -320,8 +603,8 @@ DETAIL: Subqueries other than in from-clause are currently unsupported SELECT * FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -332,12 +615,12 @@ DEBUG: Plan is router executable 41 | 1 | aznavour | 11814 (5 rows) --- below query hits a single shard, but it is not router plannable --- still router executable +-- below query hits a single shard, router plannable SELECT * FROM articles_hash WHERE author_id = 1 OR author_id = 17; DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -366,8 +649,8 @@ SELECT * SELECT id as article_id, word_count * id as random_value FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable article_id | random_value ------------+-------------- @@ -379,15 +662,13 @@ DEBUG: Plan is router executable (5 rows) -- we can push down co-located joins to a single worker --- this is not router plannable but router executable --- handled by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_hash b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; -DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 840001 -DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- @@ -396,13 +677,15 @@ DEBUG: Plan is router executable 10 | 6363 (3 rows) --- following join is neither router plannable, nor router executable +-- following join is router plannable since the same worker +-- has both shards SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; -DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 19519 @@ -410,13 +693,26 @@ DEBUG: predicate pruning for shardId 840001 10 | 19519 (3 rows) + +-- following join is not router plannable since there are no +-- workers containing both shards, added a CTE to make this fail +-- at logical planner +WITH single_shard as (SELECT * FROM articles_single_shard_hash) +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, single_shard b + WHERE a.author_id = 2 and a.author_id = b.author_id + LIMIT 3; +DEBUG: predicate pruning for shardId 840000 +DEBUG: Found no worker with all shard placements +ERROR: cannot perform distributed planning on this query +DETAIL: Complex table expressions are currently unsupported -- single shard select with limit is router plannable SELECT * FROM articles_hash WHERE author_id = 1 LIMIT 3; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -431,8 +727,8 @@ SELECT * WHERE author_id = 1 LIMIT 2 OFFSET 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -447,8 +743,8 @@ SELECT * ORDER BY id desc LIMIT 2 OFFSET 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -462,8 +758,8 @@ SELECT id FROM articles_hash WHERE author_id = 1 GROUP BY id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id ---- @@ -478,8 +774,8 @@ DEBUG: Plan is router executable SELECT distinct id FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id ---- @@ -494,8 +790,8 @@ DEBUG: Plan is router executable SELECT avg(word_count) FROM articles_hash WHERE author_id = 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840000 +DEBUG: Creating router plan DEBUG: Plan is router executable avg -------------------- @@ -507,8 +803,8 @@ SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles_hash WHERE author_id = 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840000 +DEBUG: Creating router plan DEBUG: Plan is router executable max | min | sum | cnt -------+------+-------+----- @@ -520,15 +816,113 @@ SELECT max(word_count) FROM articles_hash WHERE author_id = 1 GROUP BY author_id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable max ------- 11814 (1 row) + +-- router plannable union queries are supported +(SELECT * FROM articles_hash WHERE author_id = 1) +UNION +(SELECT * FROM articles_hash WHERE author_id = 3); +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+--------------+------------ + 3 | 3 | asternal | 10480 + 43 | 3 | affixal | 12723 + 23 | 3 | abhorring | 6799 + 13 | 3 | aseyev | 2255 + 11 | 1 | alamo | 1347 + 41 | 1 | aznavour | 11814 + 1 | 1 | arsenous | 9572 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 33 | 3 | autochrome | 8180 +(10 rows) + +SELECT * FROM ( + (SELECT * FROM articles_hash WHERE author_id = 1) + UNION + (SELECT * FROM articles_hash WHERE author_id = 3)) uu; +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + id | author_id | title | word_count +----+-----------+--------------+------------ + 3 | 3 | asternal | 10480 + 43 | 3 | affixal | 12723 + 23 | 3 | abhorring | 6799 + 13 | 3 | aseyev | 2255 + 11 | 1 | alamo | 1347 + 41 | 1 | aznavour | 11814 + 1 | 1 | arsenous | 9572 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 33 | 3 | autochrome | 8180 +(10 rows) + +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1) +UNION +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3); +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + left +------ + a +(1 row) + +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1) +INTERSECT +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3); +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + left +------ + a +(1 row) + +(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 1) +EXCEPT +(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 3); +DEBUG: predicate pruning for shardId 840001 +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable + left +------ + at + az + ar + al +(4 rows) + +-- union queries are not supported if not router plannable +-- there is an inconsistency on shard pruning between +-- ubuntu/mac disabling log messages for this queries only SET client_min_messages to 'NOTICE'; +(SELECT * FROM articles_hash WHERE author_id = 1) +UNION +(SELECT * FROM articles_hash WHERE author_id = 2); +ERROR: cannot perform distributed planning on this query +DETAIL: Union, Intersect, or Except are currently unsupported +SELECT * FROM ( + (SELECT * FROM articles_hash WHERE author_id = 1) + UNION + (SELECT * FROM articles_hash WHERE author_id = 2)) uu; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries without group by clause are not supported yet -- error out for queries with repartition jobs SELECT * FROM articles_hash a, articles_hash b @@ -563,11 +957,12 @@ SET citus.task_executor_type TO 'real-time'; -- Test various filtering options for router plannable check SET client_min_messages to 'DEBUG2'; -- this is definitely single shard --- but not router plannable +-- and router plannable SELECT * FROM articles_hash WHERE author_id = 1 and author_id >= 1; DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -596,8 +991,8 @@ SELECT * SELECT * FROM articles_hash WHERE author_id = 1 and (id = 1 or id = 41); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -609,8 +1004,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE author_id = 1 and (id = random()::int * 0); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+-------+------------ @@ -647,8 +1042,8 @@ SELECT * SELECT * FROM articles_hash WHERE author_id = abs(-1); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -689,8 +1084,8 @@ SELECT * SELECT * FROM articles_hash WHERE author_id = 1 and (id = abs(id - 2)); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -714,8 +1109,8 @@ SELECT * SELECT * FROM articles_hash WHERE (author_id = 1) = true; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -730,8 +1125,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE (author_id = 1) and id between 0 and 20; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -743,8 +1138,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE (author_id = 1) and (id = 1 or id = 31) and title like '%s'; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -756,8 +1151,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE (id = 1 or id = 31) and title like '%s' and (author_id = 1); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -769,8 +1164,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE (title like '%s' or title like 'a%') and (author_id = 1); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -785,8 +1180,8 @@ DEBUG: Plan is router executable SELECT * FROM articles_hash WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -799,8 +1194,8 @@ DEBUG: Plan is router executable SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash WHERE author_id = 5; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable prev | title | word_count ----------+----------+------------ @@ -815,8 +1210,8 @@ SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash WHERE author_id = 5 ORDER BY word_count DESC; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable prev | title | word_count ----------+----------+------------ @@ -830,8 +1225,8 @@ DEBUG: Plan is router executable SELECT id, MIN(id) over (order by word_count) FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | min ----+----- @@ -845,8 +1240,8 @@ DEBUG: Plan is router executable SELECT id, word_count, AVG(word_count) over (order by word_count) FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count | avg ----+------------+----------------------- @@ -860,8 +1255,8 @@ DEBUG: Plan is router executable SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count) FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable word_count | rank ------------+------ @@ -878,11 +1273,9 @@ SELECT id, MIN(id) over (order by word_count) WHERE author_id = 1 or author_id = 2; ERROR: cannot perform distributed planning on this query DETAIL: Window functions are currently unsupported - --- but they are not supported for not router plannable queries SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash - WHERE author_id = 5 or author_id = 1; + WHERE author_id = 5 or author_id = 2; ERROR: cannot perform distributed planning on this query DETAIL: Window functions are currently unsupported -- complex query hitting a single shard @@ -899,8 +1292,8 @@ SELECT articles_hash WHERE author_id = 5; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable c --- @@ -941,8 +1334,8 @@ SELECT * FROM articles_hash WHERE author_id = 1 ORDER BY id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -961,8 +1354,8 @@ DECLARE test_cursor CURSOR FOR FROM articles_hash WHERE author_id = 1 ORDER BY id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable FETCH test_cursor; id | author_id | title | word_count @@ -983,8 +1376,8 @@ COPY ( FROM articles_hash WHERE author_id = 1 ORDER BY id) TO STDOUT; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable 1 1 arsenous 9572 11 1 alamo 1347 @@ -998,15 +1391,15 @@ CREATE TEMP TABLE temp_articles_hash as FROM articles_hash WHERE author_id = 1 ORDER BY id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable -- router plannable queries may include filter for aggragates SELECT count(*), count(*) FILTER (WHERE id < 3) FROM articles_hash WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable count | count -------+------- @@ -1028,8 +1421,8 @@ PREPARE author_1_articles as FROM articles_hash WHERE author_id = 1; EXECUTE author_1_articles; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -1046,8 +1439,8 @@ PREPARE author_articles(int) as FROM articles_hash WHERE author_id = $1; EXECUTE author_articles(1); -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -1070,7 +1463,7 @@ BEGIN END; $$ LANGUAGE plpgsql; SELECT author_articles_max_id(); -DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 840001 CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_max_id() line 5 at SQL statement @@ -1078,6 +1471,14 @@ DEBUG: predicate pruning for shardId 840001 CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_max_id() line 5 at SQL statement +DEBUG: predicate pruning for shardId 840001 +CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_max_id() line 5 at SQL statement +DEBUG: Creating router plan +CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_max_id() line 5 at SQL statement DEBUG: Plan is router executable CONTEXT: SQL statement "SELECT MAX(id) FROM articles_hash ah WHERE author_id = 1" @@ -1099,12 +1500,12 @@ BEGIN END; $$ LANGUAGE plpgsql; SELECT * FROM author_articles_id_word_count(); -DEBUG: Creating router plan +DEBUG: predicate pruning for shardId 840001 CONTEXT: SQL statement "SELECT ah.id, ah.word_count FROM articles_hash ah WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY -DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan CONTEXT: SQL statement "SELECT ah.id, ah.word_count FROM articles_hash ah WHERE author_id = 1" @@ -1113,6 +1514,37 @@ DEBUG: Plan is router executable CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY ERROR: scan directions other than forward scans are unsupported CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +-- materialized views can be created for router plannable queries +CREATE MATERIALIZED VIEW mv_articles_hash AS + SELECT * FROM articles_hash WHERE author_id = 1; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable +SELECT * FROM mv_articles_hash; + id | author_id | title | word_count +----+-----------+--------------+------------ + 1 | 1 | arsenous | 9572 + 11 | 1 | alamo | 1347 + 21 | 1 | arcading | 5890 + 31 | 1 | athwartships | 7271 + 41 | 1 | aznavour | 11814 +(5 rows) + +CREATE MATERIALIZED VIEW mv_articles_hash_error AS + SELECT * FROM articles_hash WHERE author_id in (1,2); +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +NOTICE: cannot use shard pruning with ANY/ALL (array expression) +HINT: Consider rewriting the expression with OR/AND clauses. +ERROR: cannot create temporary table within security-restricted operation +-- materialized views with (NO DATA) is still not supported +CREATE MATERIALIZED VIEW mv_articles_hash AS + SELECT * FROM articles_hash WHERE author_id = 1 WITH NO DATA; +DEBUG: predicate pruning for shardId 840001 +DEBUG: Creating router plan +DEBUG: Plan is router executable +ERROR: scan directions other than forward scans are unsupported + -- router planner/executor is disabled for task-tracker executor -- following query is router plannable, but router planner is disabled SET citus.task_executor_type to 'task-tracker'; @@ -1151,6 +1583,8 @@ DEBUG: predicate pruning for shardId 840001 SET client_min_messages to 'NOTICE'; DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_id_word_count(); +DROP MATERIALIZED VIEW mv_articles_hash; DROP TABLE articles_hash; DROP TABLE articles_single_shard_hash; DROP TABLE authors_hash; +DROP TABLE company_employees; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 011faa5c5..8ea598c34 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -25,13 +25,6 @@ SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'ha (1 row) --- test when a table is distributed but no shards created yet -SELECT count(*) from articles; - count -------- - -(1 row) - SELECT master_create_worker_shards('articles', 2, 1); master_create_worker_shards ----------------------------- @@ -97,13 +90,6 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681); INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); -- insert a single row for the test INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); --- first, test zero-shard SELECT, which should return an empty row -SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2; - count -------- - -(1 row) - -- zero-shard modifications should fail UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; ERROR: distributed modifications must target exactly one shard @@ -170,18 +156,20 @@ SELECT title, author_id FROM articles alkylic | 8 (10 rows) --- add in some grouping expressions, still on same shard +-- add in some grouping expressions. +-- it is supported if it is on the same shard, but not supported if it +-- involves multiple shards. -- having queries unsupported in Citus SELECT author_id, sum(word_count) AS corpus_size FROM articles - WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 + WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10 GROUP BY author_id HAVING sum(word_count) > 40000 ORDER BY sum(word_count) DESC; ERROR: cannot perform distributed planning on this query DETAIL: Having qual is currently unsupported --- UNION/INTERSECT queries are unsupported +-- UNION/INTERSECT queries are unsupported if on multiple shards SELECT * FROM articles WHERE author_id = 10 UNION -SELECT * FROM articles WHERE author_id = 1; +SELECT * FROM articles WHERE author_id = 2; ERROR: cannot perform distributed planning on this query DETAIL: Union, Intersect, or Except are currently unsupported -- queries using CTEs are unsupported @@ -324,8 +312,8 @@ SET citus.task_executor_type TO 'real-time'; SELECT * FROM articles WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -341,6 +329,7 @@ SELECT * FROM articles WHERE author_id = 1 OR author_id = 17; DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+--------------+------------ @@ -368,8 +357,8 @@ SELECT * SELECT id as article_id, word_count * id as random_value FROM articles WHERE author_id = 1; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable article_id | random_value ------------+-------------- @@ -386,9 +375,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; -DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 850001 -DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] +DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- @@ -403,8 +392,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles a, articles_single_shard b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; -DEBUG: push down of limit count: 3 DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan +DEBUG: Plan is router executable first_author | second_word_count --------------+------------------- 10 | 19519 @@ -417,8 +407,8 @@ SELECT * FROM articles WHERE author_id = 1 LIMIT 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable id | author_id | title | word_count ----+-----------+----------+------------ @@ -433,8 +423,8 @@ SELECT id FROM articles WHERE author_id = 1 GROUP BY id; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850001 +DEBUG: Creating router plan DEBUG: Plan is router executable id ---- @@ -447,14 +437,15 @@ DEBUG: Plan is router executable -- copying from a single shard table does not require the master query COPY articles_single_shard TO stdout; +DEBUG: Creating router plan DEBUG: Plan is router executable 50 10 anjanette 19519 -- error out for queries with aggregates SELECT avg(word_count) FROM articles WHERE author_id = 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850000 +DEBUG: Creating router plan DEBUG: Plan is router executable avg -------------------- @@ -467,8 +458,8 @@ SELECT max(word_count) as max, min(word_count) as min, sum(word_count) as sum, count(word_count) as cnt FROM articles WHERE author_id = 2; -DEBUG: Creating router plan DEBUG: predicate pruning for shardId 850000 +DEBUG: Creating router plan DEBUG: Plan is router executable max | min | sum | cnt -------+------+-------+----- @@ -480,6 +471,7 @@ SELECT * FROM articles a, articles b WHERE a.id = b.id AND a.author_id = 1; DEBUG: predicate pruning for shardId 850001 +DEBUG: predicate pruning for shardId 850001 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/input/multi_agg_distinct.source b/src/test/regress/input/multi_agg_distinct.source index 10695bd9d..30cf329a1 100644 --- a/src/test/regress/input/multi_agg_distinct.source +++ b/src/test/regress/input/multi_agg_distinct.source @@ -60,7 +60,7 @@ SELECT count(distinct (l_orderkey + 1)) FROM lineitem_range; -- sharded table. SELECT count(distinct p_mfgr) FROM part; -SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr; +SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr; -- We don't support count(distinct) queries if table is append partitioned and -- has multiple shards diff --git a/src/test/regress/input/multi_outer_join.source b/src/test/regress/input/multi_outer_join.source index 82560ed36..3f4c89bd2 100644 --- a/src/test/regress/input/multi_outer_join.source +++ b/src/test/regress/input/multi_outer_join.source @@ -105,12 +105,13 @@ WHERE -- This query is an INNER JOIN in disguise since there cannot be NULL results +-- Added extra filter to make query not router plannable SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE - r_custkey = 5; + r_custkey = 5 or r_custkey > 15; -- Apply a filter before the join @@ -204,12 +205,13 @@ WHERE -- This query is an INNER JOIN in disguise since there cannot be NULL results (21) +-- Added extra filter to make query not router plannable SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE - r_custkey = 21; + r_custkey = 21 or r_custkey < 10; -- Apply a filter before the join diff --git a/src/test/regress/output/multi_agg_distinct.source b/src/test/regress/output/multi_agg_distinct.source index 42b25810f..ab1451235 100644 --- a/src/test/regress/output/multi_agg_distinct.source +++ b/src/test/regress/output/multi_agg_distinct.source @@ -85,14 +85,14 @@ SELECT count(distinct p_mfgr) FROM part; 5 (1 row) -SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr; +SELECT p_mfgr, count(distinct p_partkey) FROM part GROUP BY p_mfgr ORDER BY p_mfgr; p_mfgr | count ---------------------------+------- Manufacturer#1 | 193 - Manufacturer#3 | 228 - Manufacturer#5 | 185 Manufacturer#2 | 190 + Manufacturer#3 | 228 Manufacturer#4 | 204 + Manufacturer#5 | 185 (5 rows) -- We don't support count(distinct) queries if table is append partitioned and diff --git a/src/test/regress/output/multi_outer_join.source b/src/test/regress/output/multi_outer_join.source index 57c9e3d48..089e2a134 100644 --- a/src/test/regress/output/multi_outer_join.source +++ b/src/test/regress/output/multi_outer_join.source @@ -134,12 +134,13 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_ (1 row) -- This query is an INNER JOIN in disguise since there cannot be NULL results +-- Added extra filter to make query not router plannable SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE - r_custkey = 5; + r_custkey = 5 or r_custkey > 15; LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_right" ] min | max -----+----- @@ -273,12 +274,13 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer (1 row) -- This query is an INNER JOIN in disguise since there cannot be NULL results (21) +-- Added extra filter to make query not router plannable SELECT min(l_custkey), max(l_custkey) FROM multi_outer_join_left a LEFT JOIN multi_outer_join_right b ON (l_custkey = r_custkey) WHERE - r_custkey = 21; + r_custkey = 21 or r_custkey < 10; LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] min | max -----+----- diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql index fd650e6f9..425e276fb 100644 --- a/src/test/regress/sql/multi_join_order_additional.sql +++ b/src/test/regress/sql/multi_join_order_additional.sql @@ -63,7 +63,7 @@ CREATE TABLE customer_hash ( c_mktsegment char(10) not null, c_comment varchar(117) not null); SELECT master_create_distributed_table('customer_hash', 'c_custkey', 'hash'); -SELECT master_create_worker_shards('customer_hash', 1, 1); +SELECT master_create_worker_shards('customer_hash', 2, 1); -- The following query checks that we can correctly handle self-joins diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 8812e4520..7828ff72b 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -94,9 +94,6 @@ SET client_min_messages TO 'DEBUG2'; -- insert a single row for the test INSERT INTO articles_single_shard_hash VALUES (50, 10, 'anjanette', 19519); --- first, test zero-shard SELECT, which should return an empty row -SELECT COUNT(*) FROM articles_hash WHERE author_id = 1 AND author_id = 2; - -- single-shard tests -- test simple select for a single row @@ -141,32 +138,130 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles_hash HAVING sum(word_count) > 1000 ORDER BY sum(word_count) DESC; - --- UNION/INTERSECT queries are unsupported --- this is rejected by router planner and handled by multi_logical_planner -SELECT * FROM articles_hash WHERE author_id = 10 UNION -SELECT * FROM articles_hash WHERE author_id = 1; - -- query is a single shard query but can't do shard pruning, -- not router-plannable due to <= and IN SELECT * FROM articles_hash WHERE author_id <= 1; SELECT * FROM articles_hash WHERE author_id IN (1, 3); --- queries using CTEs are unsupported +-- queries with CTEs are supported +WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) +SELECT * FROM first_author; + +-- queries with CTEs are supported even if CTE is not referenced inside query WITH first_author AS ( SELECT id FROM articles_hash WHERE author_id = 1) SELECT title FROM articles_hash WHERE author_id = 1; --- queries which involve functions in FROM clause are unsupported. +-- two CTE joins are supported if they go to the same worker +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 1) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; + +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 3) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; + +-- CTE joins are not supported if table shards are at different workers +WITH id_author AS ( SELECT id, author_id FROM articles_hash WHERE author_id = 1), +id_title AS (SELECT id, title from articles_hash WHERE author_id = 2) +SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id; + +-- recursive CTEs are supported when filtered on partition column +CREATE TABLE company_employees (company_id int, employee_id int, manager_id int); +SELECT master_create_distributed_table('company_employees', 'company_id', 'hash'); +SELECT master_create_worker_shards('company_employees', 4, 1); + +INSERT INTO company_employees values(1, 1, 0); +INSERT INTO company_employees values(1, 2, 1); +INSERT INTO company_employees values(1, 3, 1); +INSERT INTO company_employees values(1, 4, 2); +INSERT INTO company_employees values(1, 5, 4); + +INSERT INTO company_employees values(3, 1, 0); +INSERT INTO company_employees values(3, 15, 1); +INSERT INTO company_employees values(3, 3, 1); + +-- find employees at top 2 level within company hierarchy +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 1 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id AND + ce.company_id = 1)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; + +-- query becomes not router plannble and gets rejected +-- if filter on company is dropped +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 1 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; + +-- logically wrong query, query involves different shards +-- from the same table, but still router plannable due to +-- shard being placed on the same worker. +WITH RECURSIVE hierarchy as ( + SELECT *, 1 AS level + FROM company_employees + WHERE company_id = 3 and manager_id = 0 + UNION + SELECT ce.*, (h.level+1) + FROM hierarchy h JOIN company_employees ce + ON (h.employee_id = ce.manager_id AND + h.company_id = ce.company_id AND + ce.company_id = 2)) +SELECT * FROM hierarchy WHERE LEVEL <= 2; + +-- grouping sets are supported on single shard +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 3 + GROUP BY GROUPING SETS ((id),(subtitle)); + +-- grouping sets are not supported on multiple shards +SELECT + id, substring(title, 2, 1) AS subtitle, count(*) + FROM articles_hash + WHERE author_id = 1 or author_id = 2 + GROUP BY GROUPING SETS ((id),(subtitle)); + +-- queries which involve functions in FROM clause are supported if it goes to a single worker. SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1; +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 3; + +-- they are not supported if multiple workers are involved +SELECT * FROM articles_hash, position('om' in 'Thomas') WHERE author_id = 1 or author_id = 2; + -- subqueries are not supported in WHERE clause in Citus SELECT * FROM articles_hash WHERE author_id IN (SELECT id FROM authors_hash WHERE name LIKE '%a'); +SELECT * FROM articles_hash WHERE author_id IN (SELECT author_id FROM articles_hash WHERE author_id = 1 or author_id = 3); + +SELECT * FROM articles_hash WHERE author_id = (SELECT 1); + + -- subqueries are supported in FROM clause but they are not router plannable SELECT articles_hash.id,test.word_count FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE test.id = articles_hash.id ORDER BY articles_hash.id; + +SELECT articles_hash.id,test.word_count +FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test +WHERE test.id = articles_hash.id and articles_hash.author_id = 1 +ORDER BY articles_hash.id; + -- subqueries are not supported in SELECT clause SELECT a.title AS name, (SELECT a2.id FROM articles_single_shard_hash a2 WHERE a.id = a2.id LIMIT 1) AS special_price FROM articles_hash a; @@ -176,8 +271,7 @@ SELECT * FROM articles_hash WHERE author_id = 1; --- below query hits a single shard, but it is not router plannable --- still router executable +-- below query hits a single shard, router plannable SELECT * FROM articles_hash WHERE author_id = 1 OR author_id = 17; @@ -194,18 +288,26 @@ SELECT id as article_id, word_count * id as random_value WHERE author_id = 1; -- we can push down co-located joins to a single worker --- this is not router plannable but router executable --- handled by real-time executor SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_hash b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; --- following join is neither router plannable, nor router executable +-- following join is router plannable since the same worker +-- has both shards SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id LIMIT 3; + +-- following join is not router plannable since there are no +-- workers containing both shards, added a CTE to make this fail +-- at logical planner +WITH single_shard as (SELECT * FROM articles_single_shard_hash) +SELECT a.author_id as first_author, b.word_count as second_word_count + FROM articles_hash a, single_shard b + WHERE a.author_id = 2 and a.author_id = b.author_id + LIMIT 3; -- single shard select with limit is router plannable SELECT * @@ -257,7 +359,45 @@ SELECT max(word_count) WHERE author_id = 1 GROUP BY author_id; + +-- router plannable union queries are supported +(SELECT * FROM articles_hash WHERE author_id = 1) +UNION +(SELECT * FROM articles_hash WHERE author_id = 3); + +SELECT * FROM ( + (SELECT * FROM articles_hash WHERE author_id = 1) + UNION + (SELECT * FROM articles_hash WHERE author_id = 3)) uu; + +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1) +UNION +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3); + +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 1) +INTERSECT +(SELECT LEFT(title, 1) FROM articles_hash WHERE author_id = 3); + +(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 1) +EXCEPT +(SELECT LEFT(title, 2) FROM articles_hash WHERE author_id = 3); + +-- union queries are not supported if not router plannable +-- there is an inconsistency on shard pruning between +-- ubuntu/mac disabling log messages for this queries only + SET client_min_messages to 'NOTICE'; + +(SELECT * FROM articles_hash WHERE author_id = 1) +UNION +(SELECT * FROM articles_hash WHERE author_id = 2); + + +SELECT * FROM ( + (SELECT * FROM articles_hash WHERE author_id = 1) + UNION + (SELECT * FROM articles_hash WHERE author_id = 2)) uu; + -- error out for queries with repartition jobs SELECT * FROM articles_hash a, articles_hash b @@ -275,7 +415,7 @@ SET citus.task_executor_type TO 'real-time'; SET client_min_messages to 'DEBUG2'; -- this is definitely single shard --- but not router plannable +-- and router plannable SELECT * FROM articles_hash WHERE author_id = 1 and author_id >= 1; @@ -387,11 +527,9 @@ SELECT id, MIN(id) over (order by word_count) FROM articles_hash WHERE author_id = 1 or author_id = 2; - --- but they are not supported for not router plannable queries SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count FROM articles_hash - WHERE author_id = 5 or author_id = 1; + WHERE author_id = 5 or author_id = 2; -- complex query hitting a single shard SELECT @@ -510,6 +648,19 @@ $$ LANGUAGE plpgsql; SELECT * FROM author_articles_id_word_count(); +-- materialized views can be created for router plannable queries +CREATE MATERIALIZED VIEW mv_articles_hash AS + SELECT * FROM articles_hash WHERE author_id = 1; + +SELECT * FROM mv_articles_hash; + +CREATE MATERIALIZED VIEW mv_articles_hash_error AS + SELECT * FROM articles_hash WHERE author_id in (1,2); + +-- materialized views with (NO DATA) is still not supported +CREATE MATERIALIZED VIEW mv_articles_hash AS + SELECT * FROM articles_hash WHERE author_id = 1 WITH NO DATA; + -- router planner/executor is disabled for task-tracker executor -- following query is router plannable, but router planner is disabled SET citus.task_executor_type to 'task-tracker'; @@ -530,6 +681,8 @@ SET client_min_messages to 'NOTICE'; DROP FUNCTION author_articles_max_id(); DROP FUNCTION author_articles_id_word_count(); +DROP MATERIALIZED VIEW mv_articles_hash; DROP TABLE articles_hash; DROP TABLE articles_single_shard_hash; DROP TABLE authors_hash; +DROP TABLE company_employees; diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 6d32b4e9e..7e3cf9f3c 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -23,10 +23,6 @@ CREATE TABLE articles_single_shard (LIKE articles); SELECT master_create_distributed_table('articles', 'author_id', 'hash'); SELECT master_create_distributed_table('articles_single_shard', 'author_id', 'hash'); - --- test when a table is distributed but no shards created yet -SELECT count(*) from articles; - SELECT master_create_worker_shards('articles', 2, 1); SELECT master_create_worker_shards('articles_single_shard', 1, 1); @@ -85,9 +81,6 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); -- insert a single row for the test INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); --- first, test zero-shard SELECT, which should return an empty row -SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2; - -- zero-shard modifications should fail UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; DELETE FROM articles WHERE author_id = 1 AND author_id = 2; @@ -116,17 +109,19 @@ SELECT title, author_id FROM articles WHERE author_id = 7 OR author_id = 8 ORDER BY author_id ASC, id; --- add in some grouping expressions, still on same shard +-- add in some grouping expressions. +-- it is supported if it is on the same shard, but not supported if it +-- involves multiple shards. -- having queries unsupported in Citus SELECT author_id, sum(word_count) AS corpus_size FROM articles - WHERE author_id = 1 OR author_id = 7 OR author_id = 8 OR author_id = 10 + WHERE author_id = 1 OR author_id = 2 OR author_id = 8 OR author_id = 10 GROUP BY author_id HAVING sum(word_count) > 40000 ORDER BY sum(word_count) DESC; --- UNION/INTERSECT queries are unsupported +-- UNION/INTERSECT queries are unsupported if on multiple shards SELECT * FROM articles WHERE author_id = 10 UNION -SELECT * FROM articles WHERE author_id = 1; +SELECT * FROM articles WHERE author_id = 2; -- queries using CTEs are unsupported WITH long_names AS ( SELECT id FROM authors WHERE char_length(name) > 15 )