Merge pull request #2013 from citusdata/subquery_pruning

Prune shards once per relation in subquery pushdown
pull/2089/head
Marco Slot 2018-04-10 20:19:09 +02:00 committed by GitHub
commit 6df6d841c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 153 additions and 140 deletions

View File

@ -561,6 +561,7 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval)
destInterval->minValueExists = srcInterval->minValueExists;
destInterval->maxValueExists = srcInterval->maxValueExists;
destInterval->shardId = srcInterval->shardId;
destInterval->shardIndex = srcInterval->shardIndex;
destInterval->minValue = 0;
if (destInterval->minValueExists)

View File

@ -33,6 +33,7 @@
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_planner.h"
@ -130,7 +131,7 @@ static void ErrorIfUnsupportedShardDistribution(Query *query);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
ShardInterval *secondInterval);
static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
static Task * SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext,
uint32 taskId);
static List * SqlTaskList(Job *job);
@ -2054,85 +2055,116 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
Query *subquery = job->jobQuery;
uint64 jobId = job->jobId;
List *sqlTaskList = NIL;
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;
ListCell *restrictionCell = NULL;
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
Oid relationId = 0;
int shardCount = 0;
int shardOffset = 0;
DistTableCacheEntry *targetCacheEntry = NULL;
int minShardOffset = 0;
int maxShardOffset = 0;
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool *taskRequiredForShardIndex = NULL;
List *prunedRelationShardList = NIL;
ListCell *prunedRelationShardCell = NULL;
bool isMultiShardQuery = false;
/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(subquery);
/* get list of all range tables in subquery tree */
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
/*
* Find the first relation that is not a reference table. We'll use the shards
* of that relation as the target shards.
*/
foreach(rangeTableCell, rangeTableList)
if (list_length(relationRestrictionContext->relationRestrictionList) == 0)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
ereport(ERROR, (errmsg("cannot handle complex subqueries when the "
"router executor is disabled")));
}
/* defaults to be used if this is a reference table-only query */
minShardOffset = 0;
maxShardOffset = 0;
prunedRelationShardList = TargetShardIntervalsForQuery(subquery,
relationRestrictionContext,
&isMultiShardQuery);
forboth(prunedRelationShardCell, prunedRelationShardList,
restrictionCell, relationRestrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
List *prunedShardList = (List *) lfirst(prunedRelationShardCell);
ListCell *shardIntervalCell = NULL;
DistTableCacheEntry *cacheEntry = NULL;
relationId = rangeTableEntry->relid;
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
continue;
}
targetCacheEntry = DistributedTableCacheEntry(relationId);
break;
/* we expect distributed tables to have the same shard count */
if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength)
{
ereport(ERROR, (errmsg("shard counts of co-located tables do not "
"match")));
}
if (taskRequiredForShardIndex == NULL)
{
shardCount = cacheEntry->shardIntervalArrayLength;
taskRequiredForShardIndex = (bool *) palloc0(shardCount);
/* there is a distributed table, find the shard range */
minShardOffset = shardCount;
maxShardOffset = -1;
}
foreach(shardIntervalCell, prunedShardList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int shardIndex = shardInterval->shardIndex;
taskRequiredForShardIndex[shardIndex] = true;
if (shardIndex < minShardOffset)
{
minShardOffset = shardIndex;
}
if (shardIndex > maxShardOffset)
{
maxShardOffset = shardIndex;
}
}
}
/*
* That means all tables are reference tables and we can pick any any of them
* as an anchor table.
* To avoid iterating through all shards indexes we keep the minimum and maximum
* offsets of shards that were not pruned away. This optimisation is primarily
* relevant for queries on range-distributed tables that, due to range filters,
* prune to a small number of adjacent shards.
*
* In other cases, such as an OR condition on a hash-distributed table, we may
* still visit most or all shards even if some of them were pruned away. However,
* given that hash-distributed tables typically only have a few shards the
* iteration is still very fast.
*/
if (targetCacheEntry == NULL)
for (shardOffset = minShardOffset; shardOffset <= maxShardOffset; shardOffset++)
{
RangeTblEntry *rangeTableEntry = NULL;
if (list_length(rangeTableList) == 0)
{
/*
* User disabled the router planner and forced planner go through
* subquery pushdown, but we cannot continue anymore.
*/
ereport(ERROR, (errmsg("cannot handle complex subqueries when the "
"router executor is disabled")));
}
rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
relationId = rangeTableEntry->relid;
targetCacheEntry = DistributedTableCacheEntry(relationId);
}
shardCount = targetCacheEntry->shardIntervalArrayLength;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
Task *subqueryTask = NULL;
subqueryTask = SubqueryTaskCreate(subquery, targetShardInterval,
relationRestrictionContext, taskIdIndex);
/* add the task if it could be created */
if (subqueryTask != NULL)
if (taskRequiredForShardIndex != NULL && !taskRequiredForShardIndex[shardOffset])
{
/* this shard index is pruned away for all relations */
continue;
}
subqueryTask = SubqueryTaskCreate(subquery, shardOffset,
relationRestrictionContext, taskIdIndex);
subqueryTask->jobId = jobId;
sqlTaskList = lappend(sqlTaskList, subqueryTask);
++taskIdIndex;
}
}
return sqlTaskList;
}
@ -2347,97 +2379,77 @@ ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
/*
* SubqueryTaskCreate creates a sql task by replacing the target
* shardInterval's boundary value.. Then performs the normal
* shard pruning on the subquery via RouterSelectQuery().
*
* The function errors out if the subquery is not router select query (i.e.,
* subqueries with non equi-joins.).
* shardInterval's boundary value.
*/
static Task *
SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
RelationRestrictionContext *restrictionContext,
uint32 taskId)
SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, uint32 taskId)
{
Query *taskQuery = copyObject(originalQuery);
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL;
Task *subqueryTask = NULL;
List *selectPlacementList = NIL;
uint64 selectAnchorShardId = INVALID_SHARD_ID;
List *taskShardList = NIL;
List *relationShardList = NIL;
List *selectPlacementList = NIL;
uint64 jobId = INVALID_JOB_ID;
bool replacePrunedQueryWithDummy = false;
RelationRestrictionContext *copiedRestrictionContext =
CopyRelationRestrictionContext(restrictionContext);
List *shardOpExpressions = NIL;
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
bool multiShardModifQuery = false;
uint64 anchorShardId = INVALID_SHARD_ID;
/*
* Add the restriction qual parameter value in all baserestrictinfos.
* Note that this has to be done on a copy, as the originals are needed
* per target shard interval.
* Find the relevant shard out of each relation for this task.
*/
foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList)
foreach(restrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *restriction = lfirst(restrictionCell);
Index rteIndex = restriction->index;
List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo;
List *extendedBaseRestrictInfo = originalBaseRestrictInfo;
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
DistTableCacheEntry *cacheEntry = NULL;
ShardInterval *shardInterval = NULL;
RelationShard *relationShard = NULL;
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
/* means it is a reference table and do not add any shard interval info */
if (shardOpExpressions == NIL)
cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
continue;
/* reference table only has one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;
}
}
else
{
/* use the shard from a specific index */
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
/* use a shard from a distributed table as the anchor shard */
anchorShardId = shardInterval->shardId;
}
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions);
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);
taskShardList = lappend(taskShardList, list_make1(shardInterval));
restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo;
relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
}
/* mark that we don't want the router planner to generate dummy hosts/queries */
replacePrunedQueryWithDummy = false;
/*
* Use router select planner to decide on whether we can push down the query
* or not. If we can, we also rely on the side-effects that all RTEs have been
* updated to point to the relevant nodes and selectPlacementList is determined.
*/
planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext,
&selectPlacementList, &selectAnchorShardId,
&relationShardList, replacePrunedQueryWithDummy,
&multiShardModifQuery);
Assert(!multiShardModifQuery);
/* we don't expect to this this error but keeping it as a precaution for future changes */
if (planningError)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given "
"query"),
errdetail("Select query cannot be pushed down to the worker.")));
}
/* ensure that we do not send queries where select is pruned away completely */
selectPlacementList = WorkersContainingAllShards(taskShardList);
if (list_length(selectPlacementList) == 0)
{
ereport(DEBUG2, (errmsg("Skipping the target shard interval " UINT64_FORMAT
" because SELECT query is pruned away for the interval",
shardId)));
return NULL;
ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
"shards in the query")));
}
/*
* Augment the relations in the query with the shard IDs.
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/*
* Ands are made implicit during shard pruning, as predicate comparison and
* refutation depend on it being so. We need to make them explicit again so
@ -2448,13 +2460,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
(Node *) make_ands_explicit((List *) taskQuery->jointree->quals);
/* and generate the full query string */
deparse_shard_query(taskQuery, distributedTableId, shardInterval->shardId,
queryString);
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data);
subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = shardInterval->shardId;
subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList;

View File

@ -127,10 +127,6 @@ static Job * RouterJob(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
DeferredErrorMessage **planningError);
static bool RelationPrunesToMultipleShards(List *relationShardList);
static List * TargetShardIntervalsForRouter(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static void NormalizeMultiRowInsertTargetList(Query *query);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList);
@ -1678,7 +1674,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
bool isMultiShardModifyQuery = false;
*placementList = NIL;
prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery,
prunedRelationShardList = TargetShardIntervalsForQuery(originalQuery,
restrictionContext,
&isMultiShardQuery);
@ -1851,7 +1847,7 @@ GetInitialShardId(List *relationShardList)
/*
* TargetShardIntervalsForRouter performs shard pruning for all referenced relations
* TargetShardIntervalsForQuery performs shard pruning for all referenced relations
* in the query and returns list of shards per relation. Shard pruning is done based
* on provided restriction context per relation. The function sets multiShardQuery
* to true if any of the relations pruned down to more than one active shard. It
@ -1860,8 +1856,8 @@ GetInitialShardId(List *relationShardList)
* 'and 1=0', such queries are treated as if all of the shards of joining
* relations are pruned out.
*/
static List *
TargetShardIntervalsForRouter(Query *query,
List *
TargetShardIntervalsForQuery(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery)
{
@ -1949,7 +1945,7 @@ RelationPrunesToMultipleShards(List *relationShardList)
* exists. The caller should check if there are any shard intervals exist for
* placement check prior to calling this function.
*/
static List *
List *
WorkersContainingAllShards(List *prunedShardIntervalsList)
{
ListCell *prunedShardIntervalCell = NULL;

View File

@ -157,6 +157,7 @@ CopyNodeShardInterval(COPYFUNC_ARGS)
}
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardIndex);
}

View File

@ -359,6 +359,7 @@ OutShardInterval(OUTFUNC_ARGS)
outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
WRITE_UINT64_FIELD(shardId);
WRITE_INT_FIELD(shardIndex);
}

View File

@ -259,6 +259,7 @@ ReadShardInterval(READFUNC_ARGS)
local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId);
READ_INT_FIELD(shardIndex);
READ_DONE();
}

View File

@ -1128,6 +1128,9 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
/* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex;
}
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;

View File

@ -67,6 +67,7 @@ typedef struct ShardInterval
Datum minValue; /* a shard's typed min value datum */
Datum maxValue; /* a shard's typed max value datum */
uint64 shardId;
int shardIndex;
} ShardInterval;

View File

@ -39,6 +39,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
replacePrunedQueryWithDummy,
bool *multiShardModifyQuery);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern List * TargetShardIntervalsForQuery(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery,
bool multiShardQuery);

View File

@ -812,9 +812,6 @@ SET client_min_messages TO DEBUG2;
SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
AS foo;
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570034 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
count
-------
(0 rows)
@ -822,9 +819,6 @@ DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned
SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
AS foo;
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570035 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
count
-------
(0 rows)