mirror of https://github.com/citusdata/citus.git
PlanRouterQuery: don't store list of list of shard intervals in relationShardList
parent
9b4ba2f5b2
commit
b77c52f95b
|
@ -440,6 +440,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
List *selectPlacementList = NIL;
|
List *selectPlacementList = NIL;
|
||||||
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
uint64 selectAnchorShardId = INVALID_SHARD_ID;
|
||||||
List *relationShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
|
List *prunedShardIntervalListList = NIL;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
List *insertShardPlacementList = NULL;
|
List *insertShardPlacementList = NULL;
|
||||||
List *intersectedPlacementList = NULL;
|
List *intersectedPlacementList = NULL;
|
||||||
|
@ -519,7 +520,8 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
*/
|
*/
|
||||||
planningError = PlanRouterQuery(copiedSubquery, copyOfPlannerRestrictionContext,
|
planningError = PlanRouterQuery(copiedSubquery, copyOfPlannerRestrictionContext,
|
||||||
&selectPlacementList, &selectAnchorShardId,
|
&selectPlacementList, &selectAnchorShardId,
|
||||||
&relationShardList, replacePrunedQueryWithDummy,
|
&relationShardList, &prunedShardIntervalListList,
|
||||||
|
replacePrunedQueryWithDummy,
|
||||||
&multiShardModifyQuery, NULL);
|
&multiShardModifyQuery, NULL);
|
||||||
|
|
||||||
Assert(!multiShardModifyQuery);
|
Assert(!multiShardModifyQuery);
|
||||||
|
|
|
@ -2066,7 +2066,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* QueryPushdownSqlTaskList creates a list of SQL tasks to execute the given subquery
|
* QueryPushdownSqlTaskList creates a list of SQL tasks to execute the given subquery
|
||||||
* pushdown job. For this, the it is being checked whether the query is router
|
* pushdown job. For this, it is being checked whether the query is router
|
||||||
* plannable per target shard interval. For those router plannable worker
|
* plannable per target shard interval. For those router plannable worker
|
||||||
* queries, we create a SQL task and append the task to the task list that is going
|
* queries, we create a SQL task and append the task to the task list that is going
|
||||||
* to be executed.
|
* to be executed.
|
||||||
|
|
|
@ -1607,6 +1607,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
List *placementList = NIL;
|
List *placementList = NIL;
|
||||||
List *relationShardList = NIL;
|
List *relationShardList = NIL;
|
||||||
|
List *prunedShardIntervalListList = NIL;
|
||||||
bool replacePrunedQueryWithDummy = false;
|
bool replacePrunedQueryWithDummy = false;
|
||||||
bool requiresMasterEvaluation = false;
|
bool requiresMasterEvaluation = false;
|
||||||
RangeTblEntry *updateOrDeleteRTE = NULL;
|
RangeTblEntry *updateOrDeleteRTE = NULL;
|
||||||
|
@ -1621,6 +1622,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
|
|
||||||
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
|
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
|
||||||
&placementList, &shardId, &relationShardList,
|
&placementList, &shardId, &relationShardList,
|
||||||
|
&prunedShardIntervalListList,
|
||||||
replacePrunedQueryWithDummy,
|
replacePrunedQueryWithDummy,
|
||||||
&isMultiShardModifyQuery,
|
&isMultiShardModifyQuery,
|
||||||
&partitionKeyValue);
|
&partitionKeyValue);
|
||||||
|
@ -1673,7 +1675,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
|
job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
|
||||||
plannerRestrictionContext->
|
plannerRestrictionContext->
|
||||||
relationRestrictionContext,
|
relationRestrictionContext,
|
||||||
relationShardList, MODIFY_TASK,
|
prunedShardIntervalListList,
|
||||||
|
MODIFY_TASK,
|
||||||
requiresMasterEvaluation);
|
requiresMasterEvaluation);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1899,7 +1902,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
||||||
* filled with the list of worker nodes that has all the required shard placements
|
* filled with the list of worker nodes that has all the required shard placements
|
||||||
* for the query execution. anchorShardId is set to the first pruned shardId of
|
* for the query execution. anchorShardId is set to the first pruned shardId of
|
||||||
* the given query. Finally, relationShardList is filled with the list of
|
* the given query. Finally, relationShardList is filled with the list of
|
||||||
* relation-to-shard mappings for the query.
|
* relation-to-shard mappings for the query.
|
||||||
*
|
*
|
||||||
* If the given query is not routable, it fills planningError with the related
|
* If the given query is not routable, it fills planningError with the related
|
||||||
* DeferredErrorMessage. The caller can check this error message to see if query
|
* DeferredErrorMessage. The caller can check this error message to see if query
|
||||||
|
@ -1917,15 +1920,15 @@ DeferredErrorMessage *
|
||||||
PlanRouterQuery(Query *originalQuery,
|
PlanRouterQuery(Query *originalQuery,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext,
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
List **placementList, uint64 *anchorShardId, List **relationShardList,
|
||||||
|
List **prunedShardIntervalListList,
|
||||||
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
|
||||||
Const **partitionValueConst)
|
Const **partitionValueConst)
|
||||||
{
|
{
|
||||||
static uint32 zeroShardQueryRoundRobin = 0;
|
static uint32 zeroShardQueryRoundRobin = 0;
|
||||||
|
|
||||||
bool isMultiShardQuery = false;
|
bool isMultiShardQuery = false;
|
||||||
List *prunedRelationShardList = NIL;
|
|
||||||
DeferredErrorMessage *planningError = NULL;
|
DeferredErrorMessage *planningError = NULL;
|
||||||
ListCell *prunedRelationShardListCell = NULL;
|
ListCell *prunedShardIntervalListCell = NULL;
|
||||||
List *workerList = NIL;
|
List *workerList = NIL;
|
||||||
bool shardsPresent = false;
|
bool shardsPresent = false;
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
|
@ -1957,7 +1960,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
return planningError;
|
return planningError;
|
||||||
}
|
}
|
||||||
|
|
||||||
prunedRelationShardList = list_make1(shardIntervalList);
|
*prunedShardIntervalListList = list_make1(shardIntervalList);
|
||||||
|
|
||||||
if (!isMultiShardQuery)
|
if (!isMultiShardQuery)
|
||||||
{
|
{
|
||||||
|
@ -1967,7 +1970,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
prunedRelationShardList =
|
*prunedShardIntervalListList =
|
||||||
TargetShardIntervalsForRestrictInfo(plannerRestrictionContext->
|
TargetShardIntervalsForRestrictInfo(plannerRestrictionContext->
|
||||||
relationRestrictionContext,
|
relationRestrictionContext,
|
||||||
&isMultiShardQuery,
|
&isMultiShardQuery,
|
||||||
|
@ -1999,31 +2002,25 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* If the modify query uses multiple shards and update/delete query, relation
|
|
||||||
* shard list should be returned as list of shard list for each table. Check
|
|
||||||
* the implementation of QueryPushdownSqlTaskList.
|
|
||||||
*/
|
|
||||||
*relationShardList = prunedRelationShardList;
|
|
||||||
*multiShardModifyQuery = true;
|
*multiShardModifyQuery = true;
|
||||||
return planningError;
|
return planningError;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(prunedRelationShardListCell, prunedRelationShardList)
|
foreach(prunedShardIntervalListCell, *prunedShardIntervalListList)
|
||||||
{
|
{
|
||||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell);
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
|
||||||
/* no shard is present or all shards are pruned out case will be handled later */
|
/* no shard is present or all shards are pruned out case will be handled later */
|
||||||
if (prunedShardList == NIL)
|
if (prunedShardIntervalList == NIL)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardsPresent = true;
|
shardsPresent = true;
|
||||||
|
|
||||||
foreach(shardIntervalCell, prunedShardList)
|
foreach(shardIntervalCell, prunedShardIntervalList)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||||
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
||||||
|
@ -2048,7 +2045,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we need anchor shard id for select queries with router planner */
|
/* we need anchor shard id for select queries with router planner */
|
||||||
shardId = GetAnchorShardId(prunedRelationShardList);
|
shardId = GetAnchorShardId(*prunedShardIntervalListList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determine the worker that has all shard placements if a shard placement found.
|
* Determine the worker that has all shard placements if a shard placement found.
|
||||||
|
@ -2058,7 +2055,7 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
*/
|
*/
|
||||||
if (shardsPresent)
|
if (shardsPresent)
|
||||||
{
|
{
|
||||||
workerList = WorkersContainingAllShards(prunedRelationShardList);
|
workerList = WorkersContainingAllShards(*prunedShardIntervalListList);
|
||||||
}
|
}
|
||||||
else if (replacePrunedQueryWithDummy)
|
else if (replacePrunedQueryWithDummy)
|
||||||
{
|
{
|
||||||
|
@ -2129,23 +2126,23 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
* - Return INVALID_SHARD_ID on empty lists
|
* - Return INVALID_SHARD_ID on empty lists
|
||||||
*/
|
*/
|
||||||
static uint64
|
static uint64
|
||||||
GetAnchorShardId(List *relationShardList)
|
GetAnchorShardId(List *prunedShardIntervalListList)
|
||||||
{
|
{
|
||||||
ListCell *prunedRelationShardListCell = NULL;
|
ListCell *prunedShardIntervalListCell = NULL;
|
||||||
uint64 referenceShardId = INVALID_SHARD_ID;
|
uint64 referenceShardId = INVALID_SHARD_ID;
|
||||||
|
|
||||||
foreach(prunedRelationShardListCell, relationShardList)
|
foreach(prunedShardIntervalListCell, prunedShardIntervalListList)
|
||||||
{
|
{
|
||||||
List *prunedShardList = (List *) lfirst(prunedRelationShardListCell);
|
List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell);
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
/* no shard is present or all shards are pruned out case will be handled later */
|
/* no shard is present or all shards are pruned out case will be handled later */
|
||||||
if (prunedShardList == NIL)
|
if (prunedShardIntervalList == NIL)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
shardInterval = linitial(prunedShardList);
|
shardInterval = linitial(prunedShardIntervalList);
|
||||||
|
|
||||||
if (ReferenceTableShardId(shardInterval->shardId))
|
if (ReferenceTableShardId(shardInterval->shardId))
|
||||||
{
|
{
|
||||||
|
@ -2216,7 +2213,7 @@ List *
|
||||||
TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionContext,
|
TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionContext,
|
||||||
bool *multiShardQuery, Const **partitionValueConst)
|
bool *multiShardQuery, Const **partitionValueConst)
|
||||||
{
|
{
|
||||||
List *prunedRelationShardList = NIL;
|
List *prunedShardIntervalListList = NIL;
|
||||||
ListCell *restrictionCell = NULL;
|
ListCell *restrictionCell = NULL;
|
||||||
bool multiplePartitionValuesExist = false;
|
bool multiplePartitionValuesExist = false;
|
||||||
Const *queryPartitionValueConst = NULL;
|
Const *queryPartitionValueConst = NULL;
|
||||||
|
@ -2233,7 +2230,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
||||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||||
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
||||||
List *prunedShardList = NIL;
|
List *prunedShardIntervalList = NIL;
|
||||||
List *joinInfoList = relationRestriction->relOptInfo->joininfo;
|
List *joinInfoList = relationRestriction->relOptInfo->joininfo;
|
||||||
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
|
List *pseudoRestrictionList = extract_actual_clauses(joinInfoList, true);
|
||||||
bool whereFalseQuery = false;
|
bool whereFalseQuery = false;
|
||||||
|
@ -2250,10 +2247,10 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
||||||
if (!whereFalseQuery && shardCount > 0)
|
if (!whereFalseQuery && shardCount > 0)
|
||||||
{
|
{
|
||||||
Const *restrictionPartitionValueConst = NULL;
|
Const *restrictionPartitionValueConst = NULL;
|
||||||
prunedShardList = PruneShards(relationId, tableId, restrictClauseList,
|
prunedShardIntervalList = PruneShards(relationId, tableId, restrictClauseList,
|
||||||
&restrictionPartitionValueConst);
|
&restrictionPartitionValueConst);
|
||||||
|
|
||||||
if (list_length(prunedShardList) > 1)
|
if (list_length(prunedShardIntervalList) > 1)
|
||||||
{
|
{
|
||||||
(*multiShardQuery) = true;
|
(*multiShardQuery) = true;
|
||||||
}
|
}
|
||||||
|
@ -2269,8 +2266,9 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
relationRestriction->prunedShardIntervalList = prunedShardList;
|
relationRestriction->prunedShardIntervalList = prunedShardIntervalList;
|
||||||
prunedRelationShardList = lappend(prunedRelationShardList, prunedShardList);
|
prunedShardIntervalListList = lappend(prunedShardIntervalListList,
|
||||||
|
prunedShardIntervalList);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2288,7 +2286,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
||||||
*partitionValueConst = queryPartitionValueConst;
|
*partitionValueConst = queryPartitionValueConst;
|
||||||
}
|
}
|
||||||
|
|
||||||
return prunedRelationShardList;
|
return prunedShardIntervalListList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2452,8 +2450,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
||||||
Const *partitionValueConst = NULL;
|
Const *partitionValueConst = NULL;
|
||||||
List *prunedShardList = NIL;
|
List *prunedShardIntervalList = NIL;
|
||||||
int prunedShardCount = 0;
|
int prunedShardIntervalCount = 0;
|
||||||
ShardInterval *targetShard = NULL;
|
ShardInterval *targetShard = NULL;
|
||||||
|
|
||||||
if (!IsA(insertValues->partitionValueExpr, Const))
|
if (!IsA(insertValues->partitionValueExpr, Const))
|
||||||
|
@ -2480,7 +2478,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
||||||
if (shardInterval != NULL)
|
if (shardInterval != NULL)
|
||||||
{
|
{
|
||||||
prunedShardList = list_make1(shardInterval);
|
prunedShardIntervalList = list_make1(shardInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -2500,12 +2498,12 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
|
|
||||||
restrictClauseList = list_make1(equalityExpr);
|
restrictClauseList = list_make1(equalityExpr);
|
||||||
|
|
||||||
prunedShardList = PruneShards(distributedTableId, tableId,
|
prunedShardIntervalList = PruneShards(distributedTableId, tableId,
|
||||||
restrictClauseList, NULL);
|
restrictClauseList, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
prunedShardCount = list_length(prunedShardList);
|
prunedShardIntervalCount = list_length(prunedShardIntervalList);
|
||||||
if (prunedShardCount != 1)
|
if (prunedShardIntervalCount != 1)
|
||||||
{
|
{
|
||||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||||
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
||||||
|
@ -2514,7 +2512,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
StringInfo errorHint = makeStringInfo();
|
StringInfo errorHint = makeStringInfo();
|
||||||
const char *targetCountType = NULL;
|
const char *targetCountType = NULL;
|
||||||
|
|
||||||
if (prunedShardCount == 0)
|
if (prunedShardIntervalCount == 0)
|
||||||
{
|
{
|
||||||
targetCountType = "no";
|
targetCountType = "no";
|
||||||
}
|
}
|
||||||
|
@ -2523,7 +2521,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
targetCountType = "multiple";
|
targetCountType = "multiple";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (prunedShardCount == 0)
|
if (prunedShardIntervalCount == 0)
|
||||||
{
|
{
|
||||||
appendStringInfo(errorHint, "Make sure you have created a shard which "
|
appendStringInfo(errorHint, "Make sure you have created a shard which "
|
||||||
"can receive this partition column value.");
|
"can receive this partition column value.");
|
||||||
|
@ -2545,7 +2543,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
targetShard = (ShardInterval *) linitial(prunedShardList);
|
targetShard = (ShardInterval *) linitial(prunedShardIntervalList);
|
||||||
insertValues->shardId = targetShard->shardId;
|
insertValues->shardId = targetShard->shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,8 +37,9 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext,
|
plannerRestrictionContext,
|
||||||
List **placementList, uint64 *anchorShardId,
|
List **placementList, uint64 *anchorShardId,
|
||||||
List **relationShardList, bool
|
List **relationShardList,
|
||||||
replacePrunedQueryWithDummy,
|
List **prunedShardIntervalListList,
|
||||||
|
bool replacePrunedQueryWithDummy,
|
||||||
bool *multiShardModifyQuery,
|
bool *multiShardModifyQuery,
|
||||||
Const **partitionValueConst);
|
Const **partitionValueConst);
|
||||||
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
|
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
|
||||||
|
|
Loading…
Reference in New Issue