mirror of https://github.com/citusdata/citus.git
Merge pull request #2861 from citusdata/less_polymorphic_plan_router_query
PlanRouterQuery: don't store list of list of shard intervals in relationShardListpull/2863/head
commit
a1a7d95c0a
|
@ -440,6 +440,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
|||
List *selectPlacementList = NIL;
|
||||
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
||||
List *relationShardList = NIL;
|
||||
List *prunedShardIntervalListList = NIL;
|
||||
uint64 jobId = INVALID_JOB_ID;
|
||||
List *insertShardPlacementList = NULL;
|
||||
List *intersectedPlacementList = NULL;
|
||||
|
@ -519,7 +520,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
|||
*/
|
||||
planningError = PlanRouterQuery(copiedSubquery, copyOfPlannerRestrictionContext,
|
||||
&selectPlacementList, &selectAnchorShardId,
|
||||
&relationShardList, replacePrunedQueryWithDummy,
|
||||
&relationShardList, &prunedShardIntervalListList,
|
||||
replacePrunedQueryWithDummy,
|
||||
&multiShardModifyQuery, NULL);
|
||||
|
||||
Assert(!multiShardModifyQuery);
|
||||
|
|
|
@ -2066,7 +2066,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
|||
|
||||
/*
|
||||
* QueryPushdownSqlTaskList creates a list of SQL tasks to execute the given subquery
|
||||
* pushdown job. For this, the it is being checked whether the query is router
|
||||
* pushdown job. For this, it is being checked whether the query is router
|
||||
* plannable per target shard interval. For those router plannable worker
|
||||
* queries, we create a SQL task and append the task to the task list that is going
|
||||
* to be executed.
|
||||
|
|
|
@ -1607,6 +1607,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *placementList = NIL;
|
||||
List *relationShardList = NIL;
|
||||
List *prunedShardIntervalListList = NIL;
|
||||
bool replacePrunedQueryWithDummy = false;
|
||||
bool requiresMasterEvaluation = false;
|
||||
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||
|
@ -1621,6 +1622,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
|
||||
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
|
||||
&placementList, &shardId, &relationShardList,
|
||||
&prunedShardIntervalListList,
|
||||
replacePrunedQueryWithDummy,
|
||||
&isMultiShardModifyQuery,
|
||||
&partitionKeyValue);
|
||||
|
@ -1673,7 +1675,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
|
||||
plannerRestrictionContext->
|
||||
relationRestrictionContext,
|
||||
relationShardList, MODIFY_TASK,
|
||||
prunedShardIntervalListList,
|
||||
MODIFY_TASK,
|
||||
requiresMasterEvaluation);
|
||||
}
|
||||
else
|
||||
|
@ -1899,7 +1902,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
|||
* filled with the list of worker nodes that has all the required shard placements
|
||||
* for the query execution. anchorShardId is set to the first pruned shardId of
|
||||
* the given query. Finally, relationShardList is filled with the list of
|
||||
* relation-to-shard mappings for the query.
|
||||
* relation-to-shard mappings for the query.
|
||||
*
|
||||
* If the given query is not routable, it fills planningError with the related
|
||||
* DeferredErrorMessage. The caller can check this error message to see if query
|
||||
|
@ -1917,15 +1920,15 @@ DeferredErrorMessage *
|
|||
PlanRouterQuery(Query *originalQuery,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
||||
List **prunedShardIntervalListList,
|
||||
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
||||
Const **partitionValueConst)
|
||||
{
|
||||
static uint32 zeroShardQueryRoundRobin = 0;
|
||||
|
||||
bool isMultiShardQuery = false;
|
||||
List *prunedRelationShardList = NIL;
|
||||
DeferredErrorMessage *planningError = NULL;
|
||||
ListCell *prunedRelationShardListCell = NULL;
|
||||
ListCell *prunedShardIntervalListCell = NULL;
|
||||
List *workerList = NIL;
|
||||
bool shardsPresent = false;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
|
@ -1957,7 +1960,7 @@ PlanRouterQuery(Query *originalQuery,
|
|||
return planningError;
|
||||
}
|
||||
|
||||
prunedRelationShardList = list_make1(shardIntervalList);
|
||||
*prunedShardIntervalListList = list_make1(shardIntervalList);
|
||||
|
||||
if (!isMultiShardQuery)
|
||||
{
|
||||
|
@ -1967,7 +1970,7 @@ PlanRouterQuery(Query *originalQuery,
|
|||
}
|
||||
else
|
||||
{
|
||||
prunedRelationShardList =
|
||||
*prunedShardIntervalListList =
|
||||
TargetShardIntervalsForRestrictInfo(plannerRestrictionContext->
|
||||
relationRestrictionContext,
|
||||
&isMultiShardQuery,
|
||||
|
@ -1999,31 +2002,25 @@ PlanRouterQuery(Query *originalQuery,
|
|||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* If the modify query uses multiple shards and update/delete query, relation
|
||||
* shard list should be returned as list of shard list for each table. Check
|
||||
* the implementation of QueryPushdownSqlTaskList.
|
||||
*/
|
||||
*relationShardList = prunedRelationShardList;
|
||||
*multiShardModifyQuery = true;
|
||||
return planningError;
|
||||
}
|
||||
}
|
||||
|
||||
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
||||
foreach(prunedShardIntervalListCell, *prunedShardIntervalListList)
|
||||
{
|
||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
||||
List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell);
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
/* no shard is present or all shards are pruned out case will be handled later */
|
||||
if (prunedShardList == NIL)
|
||||
if (prunedShardIntervalList == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
shardsPresent = true;
|
||||
|
||||
foreach(shardIntervalCell, prunedShardList)
|
||||
foreach(shardIntervalCell, prunedShardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
||||
|
@ -2048,7 +2045,7 @@ PlanRouterQuery(Query *originalQuery,
|
|||
}
|
||||
|
||||
/* we need anchor shard id for select queries with router planner */
|
||||
shardId = GetAnchorShardId(prunedRelationShardList);
|
||||
shardId = GetAnchorShardId(*prunedShardIntervalListList);
|
||||
|
||||
/*
|
||||
* Determine the worker that has all shard placements if a shard placement found.
|
||||
|
@ -2058,7 +2055,7 @@ PlanRouterQuery(Query *originalQuery,
|
|||
*/
|
||||
if (shardsPresent)
|
||||
{
|
||||
workerList = WorkersContainingAllShards(prunedRelationShardList);
|
||||
workerList = WorkersContainingAllShards(*prunedShardIntervalListList);
|
||||
}
|
||||
else if (replacePrunedQueryWithDummy)
|
||||
{
|
||||
|
@ -2129,23 +2126,23 @@ PlanRouterQuery(Query *originalQuery,
|
|||
* - Return INVALID_SHARD_ID on empty lists
|
||||
*/
|
||||
static uint64
|
||||
GetAnchorShardId(List *relationShardList)
|
||||
GetAnchorShardId(List *prunedShardIntervalListList)
|
||||
{
|
||||
ListCell *prunedRelationShardListCell = NULL;
|
||||
ListCell *prunedShardIntervalListCell = NULL;
|
||||
uint64 referenceShardId = INVALID_SHARD_ID;
|
||||
|
||||
foreach(prunedRelationShardListCell, relationShardList)
|
||||
foreach(prunedShardIntervalListCell, prunedShardIntervalListList)
|
||||
{
|
||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
||||
List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell);
|
||||
ShardInterval *shardInterval = NULL;
|
||||
|
||||
/* no shard is present or all shards are pruned out case will be handled later */
|
||||
if (prunedShardList == NIL)
|
||||
if (prunedShardIntervalList == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
shardInterval = linitial(prunedShardList);
|
||||
shardInterval = linitial(prunedShardIntervalList);
|
||||
|
||||
if (ReferenceTableShardId(shardInterval->shardId))
|
||||
{
|
||||
|
@ -2216,7 +2213,7 @@ List *
|
|||
TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionContext,
|
||||
bool *multiShardQuery, Const **partitionValueConst)
|
||||
{
|
||||
List *prunedRelationShardList = NIL;
|
||||
List *prunedShardIntervalListList = NIL;
|
||||
ListCell *restrictionCell = NULL;
|
||||
bool multiplePartitionValuesExist = false;
|
||||
Const *queryPartitionValueConst = NULL;
|
||||
|
@ -2233,7 +2230,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
|||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
||||
List *prunedShardList = NIL;
|
||||
List *prunedShardIntervalList = NIL;
|
||||
List *joinInfoList = relationRestriction->relOptInfo->joininfo;
|
||||
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
|
||||
bool whereFalseQuery = false;
|
||||
|
@ -2250,10 +2247,10 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
|||
if (!whereFalseQuery && shardCount > 0)
|
||||
{
|
||||
Const *restrictionPartitionValueConst = NULL;
|
||||
prunedShardList = PruneShards(relationId, tableId, restrictClauseList,
|
||||
&restrictionPartitionValueConst);
|
||||
prunedShardIntervalList = PruneShards(relationId, tableId, restrictClauseList,
|
||||
&restrictionPartitionValueConst);
|
||||
|
||||
if (list_length(prunedShardList) > 1)
|
||||
if (list_length(prunedShardIntervalList) > 1)
|
||||
{
|
||||
(*multiShardQuery) = true;
|
||||
}
|
||||
|
@ -2269,8 +2266,9 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
|||
}
|
||||
}
|
||||
|
||||
relationRestriction->prunedShardIntervalList = prunedShardList;
|
||||
prunedRelationShardList = lappend(prunedRelationShardList, prunedShardList);
|
||||
relationRestriction->prunedShardIntervalList = prunedShardIntervalList;
|
||||
prunedShardIntervalListList = lappend(prunedShardIntervalListList,
|
||||
prunedShardIntervalList);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -2288,7 +2286,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
|||
*partitionValueConst = queryPartitionValueConst;
|
||||
}
|
||||
|
||||
return prunedRelationShardList;
|
||||
return prunedShardIntervalListList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2452,8 +2450,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
{
|
||||
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
||||
Const *partitionValueConst = NULL;
|
||||
List *prunedShardList = NIL;
|
||||
int prunedShardCount = 0;
|
||||
List *prunedShardIntervalList = NIL;
|
||||
int prunedShardIntervalCount = 0;
|
||||
ShardInterval *targetShard = NULL;
|
||||
|
||||
if (!IsA(insertValues->partitionValueExpr, Const))
|
||||
|
@ -2480,7 +2478,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
||||
if (shardInterval != NULL)
|
||||
{
|
||||
prunedShardList = list_make1(shardInterval);
|
||||
prunedShardIntervalList = list_make1(shardInterval);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -2500,12 +2498,12 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
|
||||
restrictClauseList = list_make1(equalityExpr);
|
||||
|
||||
prunedShardList = PruneShards(distributedTableId, tableId,
|
||||
restrictClauseList, NULL);
|
||||
prunedShardIntervalList = PruneShards(distributedTableId, tableId,
|
||||
restrictClauseList, NULL);
|
||||
}
|
||||
|
||||
prunedShardCount = list_length(prunedShardList);
|
||||
if (prunedShardCount != 1)
|
||||
prunedShardIntervalCount = list_length(prunedShardIntervalList);
|
||||
if (prunedShardIntervalCount != 1)
|
||||
{
|
||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
||||
|
@ -2514,7 +2512,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
StringInfo errorHint = makeStringInfo();
|
||||
const char *targetCountType = NULL;
|
||||
|
||||
if (prunedShardCount == 0)
|
||||
if (prunedShardIntervalCount == 0)
|
||||
{
|
||||
targetCountType = "no";
|
||||
}
|
||||
|
@ -2523,7 +2521,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
targetCountType = "multiple";
|
||||
}
|
||||
|
||||
if (prunedShardCount == 0)
|
||||
if (prunedShardIntervalCount == 0)
|
||||
{
|
||||
appendStringInfo(errorHint, "Make sure you have created a shard which "
|
||||
"can receive this partition column value.");
|
||||
|
@ -2545,7 +2543,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
return NIL;
|
||||
}
|
||||
|
||||
targetShard = (ShardInterval *) linitial(prunedShardList);
|
||||
targetShard = (ShardInterval *) linitial(prunedShardIntervalList);
|
||||
insertValues->shardId = targetShard->shardId;
|
||||
}
|
||||
|
||||
|
|
|
@ -37,8 +37,9 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
|
|||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext,
|
||||
List **placementList, uint64 *anchorShardId,
|
||||
List **relationShardList, bool
|
||||
replacePrunedQueryWithDummy,
|
||||
List **relationShardList,
|
||||
List **prunedShardIntervalListList,
|
||||
bool replacePrunedQueryWithDummy,
|
||||
bool *multiShardModifyQuery,
|
||||
Const **partitionValueConst);
|
||||
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
|
||||
|
|
Loading…
Reference in New Issue