Prune shards once per relation in subquery pushdown

pull/2013/head
Marco Slot 2018-02-15 14:58:02 +01:00
parent 3873d6858d
commit ee132c5ead
10 changed files with 153 additions and 140 deletions

View File

@ -561,6 +561,7 @@ CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInterval)
destInterval->minValueExists = srcInterval->minValueExists; destInterval->minValueExists = srcInterval->minValueExists;
destInterval->maxValueExists = srcInterval->maxValueExists; destInterval->maxValueExists = srcInterval->maxValueExists;
destInterval->shardId = srcInterval->shardId; destInterval->shardId = srcInterval->shardId;
destInterval->shardIndex = srcInterval->shardIndex;
destInterval->minValue = 0; destInterval->minValue = 0;
if (destInterval->minValueExists) if (destInterval->minValueExists)

View File

@ -33,6 +33,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
@ -130,7 +131,7 @@ static void ErrorIfUnsupportedShardDistribution(Query *query);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval, ShardInterval *firstInterval,
ShardInterval *secondInterval); ShardInterval *secondInterval);
static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, static Task * SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
uint32 taskId); uint32 taskId);
static List * SqlTaskList(Job *job); static List * SqlTaskList(Job *job);
@ -2054,85 +2055,116 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
Query *subquery = job->jobQuery; Query *subquery = job->jobQuery;
uint64 jobId = job->jobId; uint64 jobId = job->jobId;
List *sqlTaskList = NIL; List *sqlTaskList = NIL;
List *rangeTableList = NIL; ListCell *restrictionCell = NULL;
ListCell *rangeTableCell = NULL;
uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */ uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
Oid relationId = 0;
int shardCount = 0; int shardCount = 0;
int shardOffset = 0; int shardOffset = 0;
DistTableCacheEntry *targetCacheEntry = NULL; int minShardOffset = 0;
int maxShardOffset = 0;
RelationRestrictionContext *relationRestrictionContext = RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext; plannerRestrictionContext->relationRestrictionContext;
bool *taskRequiredForShardIndex = NULL;
List *prunedRelationShardList = NIL;
ListCell *prunedRelationShardCell = NULL;
bool isMultiShardQuery = false;
/* error if shards are not co-partitioned */ /* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(subquery); ErrorIfUnsupportedShardDistribution(subquery);
/* get list of all range tables in subquery tree */ if (list_length(relationRestrictionContext->relationRestrictionList) == 0)
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
/*
* Find the first relation that is not a reference table. We'll use the shards
* of that relation as the target shards.
*/
foreach(rangeTableCell, rangeTableList)
{ {
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); ereport(ERROR, (errmsg("cannot handle complex subqueries when the "
"router executor is disabled")));
}
/* defaults to be used if this is a reference table-only query */
minShardOffset = 0;
maxShardOffset = 0;
prunedRelationShardList = TargetShardIntervalsForQuery(subquery,
relationRestrictionContext,
&isMultiShardQuery);
forboth(prunedRelationShardCell, prunedRelationShardList,
restrictionCell, relationRestrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell);
Oid relationId = relationRestriction->relationId;
List *prunedShardList = (List *) lfirst(prunedRelationShardCell);
ListCell *shardIntervalCell = NULL;
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
relationId = rangeTableEntry->relid;
cacheEntry = DistributedTableCacheEntry(relationId); cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{ {
continue; continue;
} }
targetCacheEntry = DistributedTableCacheEntry(relationId); /* we expect distributed tables to have the same shard count */
break; if (shardCount > 0 && shardCount != cacheEntry->shardIntervalArrayLength)
{
ereport(ERROR, (errmsg("shard counts of co-located tables do not "
"match")));
}
if (taskRequiredForShardIndex == NULL)
{
shardCount = cacheEntry->shardIntervalArrayLength;
taskRequiredForShardIndex = (bool *) palloc0(shardCount);
/* there is a distributed table, find the shard range */
minShardOffset = shardCount;
maxShardOffset = -1;
}
foreach(shardIntervalCell, prunedShardList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int shardIndex = shardInterval->shardIndex;
taskRequiredForShardIndex[shardIndex] = true;
if (shardIndex < minShardOffset)
{
minShardOffset = shardIndex;
}
if (shardIndex > maxShardOffset)
{
maxShardOffset = shardIndex;
}
}
} }
/* /*
* That means all tables are reference tables and we can pick any any of them * To avoid iterating through all shards indexes we keep the minimum and maximum
* as an anchor table. * offsets of shards that were not pruned away. This optimisation is primarily
* relevant for queries on range-distributed tables that, due to range filters,
* prune to a small number of adjacent shards.
*
* In other cases, such as an OR condition on a hash-distributed table, we may
* still visit most or all shards even if some of them were pruned away. However,
* given that hash-distributed tables typically only have a few shards the
* iteration is still very fast.
*/ */
if (targetCacheEntry == NULL) for (shardOffset = minShardOffset; shardOffset <= maxShardOffset; shardOffset++)
{ {
RangeTblEntry *rangeTableEntry = NULL;
if (list_length(rangeTableList) == 0)
{
/*
* User disabled the router planner and forced planner go through
* subquery pushdown, but we cannot continue anymore.
*/
ereport(ERROR, (errmsg("cannot handle complex subqueries when the "
"router executor is disabled")));
}
rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
relationId = rangeTableEntry->relid;
targetCacheEntry = DistributedTableCacheEntry(relationId);
}
shardCount = targetCacheEntry->shardIntervalArrayLength;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =
targetCacheEntry->sortedShardIntervalArray[shardOffset];
Task *subqueryTask = NULL; Task *subqueryTask = NULL;
subqueryTask = SubqueryTaskCreate(subquery, targetShardInterval, if (taskRequiredForShardIndex != NULL && !taskRequiredForShardIndex[shardOffset])
relationRestrictionContext, taskIdIndex);
/* add the task if it could be created */
if (subqueryTask != NULL)
{ {
/* this shard index is pruned away for all relations */
continue;
}
subqueryTask = SubqueryTaskCreate(subquery, shardOffset,
relationRestrictionContext, taskIdIndex);
subqueryTask->jobId = jobId; subqueryTask->jobId = jobId;
sqlTaskList = lappend(sqlTaskList, subqueryTask); sqlTaskList = lappend(sqlTaskList, subqueryTask);
++taskIdIndex; ++taskIdIndex;
} }
}
return sqlTaskList; return sqlTaskList;
} }
@ -2347,97 +2379,77 @@ ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
/* /*
* SubqueryTaskCreate creates a sql task by replacing the target * SubqueryTaskCreate creates a sql task by replacing the target
* shardInterval's boundary value.. Then performs the normal * shardInterval's boundary value.
* shard pruning on the subquery via RouterSelectQuery().
*
* The function errors out if the subquery is not router select query (i.e.,
* subqueries with non equi-joins.).
*/ */
static Task * static Task *
SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, SubqueryTaskCreate(Query *originalQuery, int shardIndex,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext, uint32 taskId)
uint32 taskId)
{ {
Query *taskQuery = copyObject(originalQuery); Query *taskQuery = copyObject(originalQuery);
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
StringInfo queryString = makeStringInfo(); StringInfo queryString = makeStringInfo();
ListCell *restrictionCell = NULL; ListCell *restrictionCell = NULL;
Task *subqueryTask = NULL; Task *subqueryTask = NULL;
List *selectPlacementList = NIL; List *taskShardList = NIL;
uint64 selectAnchorShardId = INVALID_SHARD_ID;
List *relationShardList = NIL; List *relationShardList = NIL;
List *selectPlacementList = NIL;
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
bool replacePrunedQueryWithDummy = false; uint64 anchorShardId = INVALID_SHARD_ID;
RelationRestrictionContext *copiedRestrictionContext =
CopyRelationRestrictionContext(restrictionContext);
List *shardOpExpressions = NIL;
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
bool multiShardModifQuery = false;
/* /*
* Add the restriction qual parameter value in all baserestrictinfos. * Find the relevant shard out of each relation for this task.
* Note that this has to be done on a copy, as the originals are needed
* per target shard interval.
*/ */
foreach(restrictionCell, copiedRestrictionContext->relationRestrictionList) foreach(restrictionCell, restrictionContext->relationRestrictionList)
{ {
RelationRestriction *restriction = lfirst(restrictionCell); RelationRestriction *relationRestriction =
Index rteIndex = restriction->index; (RelationRestriction *) lfirst(restrictionCell);
List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo; Oid relationId = relationRestriction->relationId;
List *extendedBaseRestrictInfo = originalBaseRestrictInfo; DistTableCacheEntry *cacheEntry = NULL;
ShardInterval *shardInterval = NULL;
RelationShard *relationShard = NULL;
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
/* means it is a reference table and do not add any shard interval info */
if (shardOpExpressions == NIL)
{ {
continue; /* reference table only has one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;
}
}
else
{
/* use the shard from a specific index */
shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
/* use a shard from a distributed table as the anchor shard */
anchorShardId = shardInterval->shardId;
} }
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); taskShardList = lappend(taskShardList, list_make1(shardInterval));
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);
restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo; relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
} }
/* mark that we don't want the router planner to generate dummy hosts/queries */ selectPlacementList = WorkersContainingAllShards(taskShardList);
replacePrunedQueryWithDummy = false;
/*
* Use router select planner to decide on whether we can push down the query
* or not. If we can, we also rely on the side-effects that all RTEs have been
* updated to point to the relevant nodes and selectPlacementList is determined.
*/
planningError = PlanRouterQuery(taskQuery, copiedRestrictionContext,
&selectPlacementList, &selectAnchorShardId,
&relationShardList, replacePrunedQueryWithDummy,
&multiShardModifQuery);
Assert(!multiShardModifQuery);
/* we don't expect to this this error but keeping it as a precaution for future changes */
if (planningError)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given "
"query"),
errdetail("Select query cannot be pushed down to the worker.")));
}
/* ensure that we do not send queries where select is pruned away completely */
if (list_length(selectPlacementList) == 0) if (list_length(selectPlacementList) == 0)
{ {
ereport(DEBUG2, (errmsg("Skipping the target shard interval " UINT64_FORMAT ereport(ERROR, (errmsg("cannot find a worker that has active placements for all "
" because SELECT query is pruned away for the interval", "shards in the query")));
shardId)));
return NULL;
} }
/*
* Augment the relations in the query with the shard IDs.
*/
UpdateRelationToShardNames((Node *) taskQuery, relationShardList);
/* /*
* Ands are made implicit during shard pruning, as predicate comparison and * Ands are made implicit during shard pruning, as predicate comparison and
* refutation depend on it being so. We need to make them explicit again so * refutation depend on it being so. We need to make them explicit again so
@ -2448,13 +2460,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
(Node *) make_ands_explicit((List *) taskQuery->jointree->quals); (Node *) make_ands_explicit((List *) taskQuery->jointree->quals);
/* and generate the full query string */ /* and generate the full query string */
deparse_shard_query(taskQuery, distributedTableId, shardInterval->shardId, pg_get_query_def(taskQuery, queryString);
queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data); subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data);
subqueryTask->dependedTaskList = NULL; subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = shardInterval->shardId; subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false; subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList; subqueryTask->relationShardList = relationShardList;

View File

@ -127,10 +127,6 @@ static Job * RouterJob(Query *originalQuery,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
static bool RelationPrunesToMultipleShards(List *relationShardList); static bool RelationPrunesToMultipleShards(List *relationShardList);
static List * TargetShardIntervalsForRouter(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static void NormalizeMultiRowInsertTargetList(Query *query); static void NormalizeMultiRowInsertTargetList(Query *query);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList); static List * GroupInsertValuesByShardId(List *insertValuesList);
@ -1678,7 +1674,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
bool isMultiShardModifyQuery = false; bool isMultiShardModifyQuery = false;
*placementList = NIL; *placementList = NIL;
prunedRelationShardList = TargetShardIntervalsForRouter(originalQuery, prunedRelationShardList = TargetShardIntervalsForQuery(originalQuery,
restrictionContext, restrictionContext,
&isMultiShardQuery); &isMultiShardQuery);
@ -1851,7 +1847,7 @@ GetInitialShardId(List *relationShardList)
/* /*
* TargetShardIntervalsForRouter performs shard pruning for all referenced relations * TargetShardIntervalsForQuery performs shard pruning for all referenced relations
* in the query and returns list of shards per relation. Shard pruning is done based * in the query and returns list of shards per relation. Shard pruning is done based
* on provided restriction context per relation. The function sets multiShardQuery * on provided restriction context per relation. The function sets multiShardQuery
* to true if any of the relations pruned down to more than one active shard. It * to true if any of the relations pruned down to more than one active shard. It
@ -1860,8 +1856,8 @@ GetInitialShardId(List *relationShardList)
* 'and 1=0', such queries are treated as if all of the shards of joining * 'and 1=0', such queries are treated as if all of the shards of joining
* relations are pruned out. * relations are pruned out.
*/ */
static List * List *
TargetShardIntervalsForRouter(Query *query, TargetShardIntervalsForQuery(Query *query,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
bool *multiShardQuery) bool *multiShardQuery)
{ {
@ -1949,7 +1945,7 @@ RelationPrunesToMultipleShards(List *relationShardList)
* exists. The caller should check if there are any shard intervals exist for * exists. The caller should check if there are any shard intervals exist for
* placement check prior to calling this function. * placement check prior to calling this function.
*/ */
static List * List *
WorkersContainingAllShards(List *prunedShardIntervalsList) WorkersContainingAllShards(List *prunedShardIntervalsList)
{ {
ListCell *prunedShardIntervalCell = NULL; ListCell *prunedShardIntervalCell = NULL;

View File

@ -157,6 +157,7 @@ CopyNodeShardInterval(COPYFUNC_ARGS)
} }
COPY_SCALAR_FIELD(shardId); COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardIndex);
} }

View File

@ -359,6 +359,7 @@ OutShardInterval(OUTFUNC_ARGS)
outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal); outDatum(str, node->maxValue, node->valueTypeLen, node->valueByVal);
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_INT_FIELD(shardIndex);
} }

View File

@ -259,6 +259,7 @@ ReadShardInterval(READFUNC_ARGS)
local_node->maxValue = readDatum(local_node->valueByVal); local_node->maxValue = readDatum(local_node->valueByVal);
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_INT_FIELD(shardIndex);
READ_DONE(); READ_DONE();
} }

View File

@ -1128,6 +1128,9 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray; cacheEntry->arrayOfPlacementArrays[shardIndex] = placementArray;
cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements; cacheEntry->arrayOfPlacementArrayLengths[shardIndex] = numberOfPlacements;
/* store the shard index in the ShardInterval */
shardInterval->shardIndex = shardIndex;
} }
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;

View File

@ -67,6 +67,7 @@ typedef struct ShardInterval
Datum minValue; /* a shard's typed min value datum */ Datum minValue; /* a shard's typed min value datum */
Datum maxValue; /* a shard's typed max value datum */ Datum maxValue; /* a shard's typed max value datum */
uint64 shardId; uint64 shardId;
int shardIndex;
} ShardInterval; } ShardInterval;

View File

@ -39,6 +39,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
replacePrunedQueryWithDummy, replacePrunedQueryWithDummy,
bool *multiShardModifyQuery); bool *multiShardModifyQuery);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern List * TargetShardIntervalsForQuery(Query *query,
RelationRestrictionContext *restrictionContext,
bool *multiShardQuery);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList); extern List * IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList);
extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery, extern DeferredErrorMessage * ModifyQuerySupported(Query *queryTree, Query *originalQuery,
bool multiShardQuery); bool multiShardQuery);

View File

@ -812,9 +812,6 @@ SET client_min_messages TO DEBUG2;
SELECT * FROM SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
AS foo; AS foo;
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570034 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
count count
------- -------
(0 rows) (0 rows)
@ -822,9 +819,6 @@ DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned
SELECT * FROM SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a) (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE 'eren' = a GROUP BY a)
AS foo; AS foo;
DEBUG: Skipping the target shard interval 570033 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570035 because SELECT query is pruned away for the interval
DEBUG: Skipping the target shard interval 570036 because SELECT query is pruned away for the interval
count count
------- -------
(0 rows) (0 rows)