From ee132c5ead1b668afdac6d233772bbad98526780 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 15 Feb 2018 14:58:02 +0100 Subject: [PATCH] Prune shards once per relation in subquery pushdown --- .../master/master_metadata_utility.c | 1 + .../planner/multi_physical_planner.c | 253 +++++++++--------- .../planner/multi_router_planner.c | 22 +- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + .../distributed/utils/metadata_cache.c | 3 + .../distributed/master_metadata_utility.h | 1 + .../distributed/multi_router_planner.h | 4 + src/test/regress/expected/multi_subquery.out | 6 - 10 files changed, 153 insertions(+), 140 deletions(-) diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index a647ae936..747899bfa 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -561,6 +561,7 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval) destInterval->minValueExists = srcInterval->minValueExists; destInterval->maxValueExists = srcInterval->maxValueExists; destInterval->shardId = srcInterval->shardId; + destInterval->shardIndex = srcInterval->shardIndex; destInterval->minValue = 0; if (destInterval->minValueExists) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a73d14b18..50fbbe976 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -33,6 +33,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" +#include "distributed/deparse_shard_query.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" @@ -130,7 +131,7 @@ static void ErrorIfUnsupportedShardDistribution(Query *query); static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, ShardInterval *secondInterval); -static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, +static Task * SubqueryTaskCreate(Query *originalQuery, int shardIndex, RelationRestrictionContext *restrictionContext, uint32 taskId); static List * SqlTaskList(Job *job); @@ -2054,84 +2055,115 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte Query *subquery = job->jobQuery; uint64 jobId = job->jobId; List *sqlTaskList = NIL; - List *rangeTableList = NIL; - ListCell *rangeTableCell = NULL; + ListCell *restrictionCell = NULL; uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ - Oid relationId = 0; int shardCount = 0; int shardOffset = 0; - DistTableCacheEntry *targetCacheEntry = NULL; + int minShardOffset = 0; + int maxShardOffset = 0; RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; + bool *taskRequiredForShardIndex = NULL; + List *prunedRelationShardList = NIL; + ListCell *prunedRelationShardCell = NULL; + bool isMultiShardQuery = false; /* error if shards are not co-partitioned */ ErrorIfUnsupportedShardDistribution(subquery); - /* get list of all range tables in subquery tree */ - ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); - - /* - * Find the first relation that is not a reference table. We'll use the shards - * of that relation as the target shards. - */ - foreach(rangeTableCell, rangeTableList) + if (list_length(relationRestrictionContext->relationRestrictionList) == 0) { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + ereport(ERROR, (errmsg("cannot handle complex subqueries when the " + "router executor is disabled"))); + } + + /* defaults to be used if this is a reference table-only query */ + minShardOffset = 0; + maxShardOffset = 0; + + prunedRelationShardList = TargetShardIntervalsForQuery(subquery, + relationRestrictionContext, + &isMultiShardQuery); + + forboth(prunedRelationShardCell, prunedRelationShardList, + restrictionCell, relationRestrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = + (RelationRestriction *) lfirst(restrictionCell); + Oid relationId = relationRestriction->relationId; + List *prunedShardList = (List *) lfirst(prunedRelationShardCell); + ListCell *shardIntervalCell = NULL; DistTableCacheEntry *cacheEntry = NULL; - relationId = rangeTableEntry->relid; cacheEntry = DistributedTableCacheEntry(relationId); if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { continue; } - targetCacheEntry = DistributedTableCacheEntry(relationId); - break; + /* we expect distributed tables to have the same shard count */ + if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength) + { + ereport(ERROR, (errmsg("shard counts of co-located tables do not " + "match"))); + } + + if (taskRequiredForShardIndex == NULL) + { + shardCount = cacheEntry->shardIntervalArrayLength; + taskRequiredForShardIndex = (bool *) palloc0(shardCount); + + /* there is a distributed table, find the shard range */ + minShardOffset = shardCount; + maxShardOffset = -1; + } + + foreach(shardIntervalCell, prunedShardList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int shardIndex = shardInterval->shardIndex; + + taskRequiredForShardIndex[shardIndex] = true; + + if (shardIndex < minShardOffset) + { + minShardOffset = shardIndex; + } + + if (shardIndex > maxShardOffset) + { + maxShardOffset = shardIndex; + } + } } /* - * That means all tables are reference tables and we can pick any any of them - * as an anchor table. + * To avoid iterating through all shards indexes we keep the minimum and maximum + * offsets of shards that were not pruned away. This optimisation is primarily + * relevant for queries on range-distributed tables that, due to range filters, + * prune to a small number of adjacent shards. + * + * In other cases, such as an OR condition on a hash-distributed table, we may + * still visit most or all shards even if some of them were pruned away. However, + * given that hash-distributed tables typically only have a few shards the + * iteration is still very fast. */ - if (targetCacheEntry == NULL) + for (shardOffset = minShardOffset; shardOffset <= maxShardOffset; shardOffset++) { - RangeTblEntry *rangeTableEntry = NULL; - - if (list_length(rangeTableList) == 0) - { - /* - * User disabled the router planner and forced planner go through - * subquery pushdown, but we cannot continue anymore. - */ - ereport(ERROR, (errmsg("cannot handle complex subqueries when the " - "router executor is disabled"))); - } - - rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); - relationId = rangeTableEntry->relid; - targetCacheEntry = DistributedTableCacheEntry(relationId); - } - - shardCount = targetCacheEntry->shardIntervalArrayLength; - for (shardOffset = 0; shardOffset < shardCount; shardOffset++) - { - ShardInterval *targetShardInterval = - targetCacheEntry->sortedShardIntervalArray[shardOffset]; Task *subqueryTask = NULL; - subqueryTask = SubqueryTaskCreate(subquery, targetShardInterval, - relationRestrictionContext, taskIdIndex); - - - /* add the task if it could be created */ - if (subqueryTask != NULL) + if (taskRequiredForShardIndex != NULL && !taskRequiredForShardIndex[shardOffset]) { - subqueryTask->jobId = jobId; - sqlTaskList = lappend(sqlTaskList, subqueryTask); - - ++taskIdIndex; + /* this shard index is pruned away for all relations */ + continue; } + + subqueryTask = SubqueryTaskCreate(subquery, shardOffset, + relationRestrictionContext, taskIdIndex); + subqueryTask->jobId = jobId; + sqlTaskList = lappend(sqlTaskList, subqueryTask); + + ++taskIdIndex; } return sqlTaskList; @@ -2347,97 +2379,77 @@ ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, /* * SubqueryTaskCreate creates a sql task by replacing the target - * shardInterval's boundary value.. Then performs the normal - * shard pruning on the subquery via RouterSelectQuery(). - * - * The function errors out if the subquery is not router select query (i.e., - * subqueries with non equi-joins.). + * shardInterval's boundary value. */ static Task * -SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, - RelationRestrictionContext *restrictionContext, - uint32 taskId) +SubqueryTaskCreate(Query *originalQuery, int shardIndex, + RelationRestrictionContext *restrictionContext, uint32 taskId) { Query *taskQuery = copyObject(originalQuery); - uint64 shardId = shardInterval->shardId; - Oid distributedTableId = shardInterval->relationId; StringInfo queryString = makeStringInfo(); ListCell *restrictionCell = NULL; Task *subqueryTask = NULL; - List *selectPlacementList = NIL; - uint64 selectAnchorShardId = INVALID_SHARD_ID; + List *taskShardList = NIL; List *relationShardList = NIL; + List *selectPlacementList = NIL; uint64 jobId = INVALID_JOB_ID; - bool replacePrunedQueryWithDummy = false; - RelationRestrictionContext *copiedRestrictionContext = - CopyRelationRestrictionContext(restrictionContext); - List *shardOpExpressions = NIL; - RestrictInfo *shardRestrictionList = NULL; - DeferredErrorMessage *planningError = NULL; - bool multiShardModifQuery = false; + uint64 anchorShardId = INVALID_SHARD_ID; /* - * Add the restriction qual parameter value in all baserestrictinfos. - * Note that this has to be done on a copy, as the originals are needed - * per target shard interval. + * Find the relevant shard out of each relation for this task. */ - foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) + foreach(restrictionCell, restrictionContext->relationRestrictionList) { - RelationRestriction *restriction = lfirst(restrictionCell); - Index rteIndex = restriction->index; - List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo; - List *extendedBaseRestrictInfo = originalBaseRestrictInfo; + RelationRestriction *relationRestriction = + (RelationRestriction *) lfirst(restrictionCell); + Oid relationId = relationRestriction->relationId; + DistTableCacheEntry *cacheEntry = NULL; + ShardInterval *shardInterval = NULL; + RelationShard *relationShard = NULL; - shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); - - /* means it is a reference table and do not add any shard interval info */ - if (shardOpExpressions == NIL) + cacheEntry = DistributedTableCacheEntry(relationId); + if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { - continue; + /* reference table only has one shard */ + shardInterval = cacheEntry->sortedShardIntervalArray[0]; + + /* only use reference table as anchor shard if none exists yet */ + if (anchorShardId == INVALID_SHARD_ID) + { + anchorShardId = shardInterval->shardId; + } + } + else + { + /* use the shard from a specific index */ + shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex]; + + /* use a shard from a distributed table as the anchor shard */ + anchorShardId = shardInterval->shardId; } - shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); - extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo, - shardRestrictionList); + taskShardList = lappend(taskShardList, list_make1(shardInterval)); - restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo; + relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = shardInterval->relationId; + relationShard->shardId = shardInterval->shardId; + + relationShardList = lappend(relationShardList, relationShard); } - /* mark that we don't want the router planner to generate dummy hosts/queries */ - replacePrunedQueryWithDummy = false; - - /* - * Use router select planner to decide on whether we can push down the query - * or not. If we can, we also rely on the side-effects that all RTEs have been - * updated to point to the relevant nodes and selectPlacementList is determined. - */ - planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext, - &selectPlacementList, &selectAnchorShardId, - &relationShardList, replacePrunedQueryWithDummy, - &multiShardModifQuery); - - Assert(!multiShardModifQuery); - - /* we don't expect to this this error but keeping it as a precaution for future changes */ - if (planningError) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given " - "query"), - errdetail("Select query cannot be pushed down to the worker."))); - } - - /* ensure that we do not send queries where select is pruned away completely */ + selectPlacementList = WorkersContainingAllShards(taskShardList); if (list_length(selectPlacementList) == 0) { - ereport(DEBUG2, (errmsg("Skipping the target shard interval " UINT64_FORMAT - " because SELECT query is pruned away for the interval", - shardId))); - - return NULL; + ereport(ERROR, (errmsg("cannot find a worker that has active placements for all " + "shards in the query"))); } + /* + * Augment the relations in the query with the shard IDs. + */ + UpdateRelationToShardNames((Node *) taskQuery, relationShardList); + /* * Ands are made implicit during shard pruning, as predicate comparison and * refutation depend on it being so. We need to make them explicit again so @@ -2448,13 +2460,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, (Node *) make_ands_explicit((List *) taskQuery->jointree->quals); /* and generate the full query string */ - deparse_shard_query(taskQuery, distributedTableId, shardInterval->shardId, - queryString); + pg_get_query_def(taskQuery, queryString); ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data); subqueryTask->dependedTaskList = NULL; - subqueryTask->anchorShardId = shardInterval->shardId; + subqueryTask->anchorShardId = anchorShardId; subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->upsertQuery = false; subqueryTask->relationShardList = relationShardList; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c8d92a902..a9a7f5c9d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -127,10 +127,6 @@ static Job * RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, DeferredErrorMessage **planningError); static bool RelationPrunesToMultipleShards(List *relationShardList); -static List * TargetShardIntervalsForRouter(Query *query, - RelationRestrictionContext *restrictionContext, - bool *multiShardQuery); -static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static void NormalizeMultiRowInsertTargetList(Query *query); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); @@ -1678,9 +1674,9 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon bool isMultiShardModifyQuery = false; *placementList = NIL; - prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery, - restrictionContext, - &isMultiShardQuery); + prunedRelationShardList = TargetShardIntervalsForQuery(originalQuery, + restrictionContext, + &isMultiShardQuery); if (isMultiShardQuery) { @@ -1851,7 +1847,7 @@ GetInitialShardId(List *relationShardList) /* - * TargetShardIntervalsForRouter performs shard pruning for all referenced relations + * TargetShardIntervalsForQuery performs shard pruning for all referenced relations * in the query and returns list of shards per relation. Shard pruning is done based * on provided restriction context per relation. The function sets multiShardQuery * to true if any of the relations pruned down to more than one active shard. It @@ -1860,10 +1856,10 @@ GetInitialShardId(List *relationShardList) * 'and 1=0', such queries are treated as if all of the shards of joining * relations are pruned out. */ -static List * -TargetShardIntervalsForRouter(Query *query, - RelationRestrictionContext *restrictionContext, - bool *multiShardQuery) +List * +TargetShardIntervalsForQuery(Query *query, + RelationRestrictionContext *restrictionContext, + bool *multiShardQuery) { List *prunedRelationShardList = NIL; ListCell *restrictionCell = NULL; @@ -1949,7 +1945,7 @@ RelationPrunesToMultipleShards(List *relationShardList) * exists. The caller should check if there are any shard intervals exist for * placement check prior to calling this function. */ -static List * +List * WorkersContainingAllShards(List *prunedShardIntervalsList) { ListCell *prunedShardIntervalCell = NULL; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index c4ab25d54..e3a3f5875 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -157,6 +157,7 @@ CopyNodeShardInterval(COPYFUNC_ARGS) } COPY_SCALAR_FIELD(shardId); + COPY_SCALAR_FIELD(shardIndex); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 45dad90f1..45709aafe 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -359,6 +359,7 @@ OutShardInterval(OUTFUNC_ARGS) outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal); WRITE_UINT64_FIELD(shardId); + WRITE_INT_FIELD(shardIndex); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index a06577477..b4847d5d5 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -259,6 +259,7 @@ ReadShardInterval(READFUNC_ARGS) local_node->maxValue = readDatum(local_node->valueByVal); READ_UINT64_FIELD(shardId); + READ_INT_FIELD(shardIndex); READ_DONE(); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b8b4cfc90..d22e7353d 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1128,6 +1128,9 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry) cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; + + /* store the shard index in the ShardInterval */ + shardInterval->shardIndex = shardIndex; } cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 9670d554b..9a19e1512 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -67,6 +67,7 @@ typedef struct ShardInterval Datum minValue; /* a shard's typed min value datum */ Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; + int shardIndex; } ShardInterval; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index d97cea3d5..7c7fbd8bc 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -39,6 +39,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, replacePrunedQueryWithDummy, bool *multiShardModifyQuery); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); +extern List * TargetShardIntervalsForQuery(Query *query, + RelationRestrictionContext *restrictionContext, + bool *multiShardQuery); +extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuery); diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 2b2d80ffd..9c7d872ba 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -812,9 +812,6 @@ SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) AS foo; -DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval -DEBUG: Skipping the target shard interval 570034 because SELECT query is pruned away for the interval -DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval count ------- (0 rows) @@ -822,9 +819,6 @@ DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) AS foo; -DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval -DEBUG: Skipping the target shard interval 570035 because SELECT query is pruned away for the interval -DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval count ------- (0 rows)