From aa7ca815485a100a24c66d388a83a9385888198b Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 3 Jul 2017 11:12:45 +0200 Subject: [PATCH] Execute UPDATE/DELETE statements with 0 shards --- .../distributed/executor/multi_executor.c | 4 +- .../executor/multi_router_executor.c | 97 ++-- .../distributed/planner/deparse_shard_query.c | 16 +- .../planner/multi_router_planner.c | 470 +++++++++++------- .../distributed/multi_router_planner.h | 3 +- src/test/regress/expected/multi_explain.out | 15 + src/test/regress/expected/multi_explain_0.out | 15 + .../regress/expected/multi_simple_queries.out | 7 +- src/test/regress/sql/multi_explain.sql | 11 + src/test/regress/sql/multi_simple_queries.sql | 3 +- 10 files changed, 377 insertions(+), 264 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 4c848c560..d24d59008 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -158,8 +158,8 @@ RouterCreateScan(CustomScan *scan) isModificationQuery = IsModifyMultiPlan(multiPlan); - /* check if this is a single shard query */ - if (list_length(taskList) == 1) + /* check whether query has at most one shard */ + if (list_length(taskList) <= 1) { if (isModificationQuery) { diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 1a5dc4621..dbdf85327 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -74,7 +74,6 @@ bool EnableDeadlockPrevention = true; /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); -static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, @@ -427,75 +426,41 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) if (workerJob->requiresMasterEvaluation) { PlanState *planState = &(scanState->customScanState.ss.ps); + EState *executorState = planState->state; ExecuteMasterEvaluableFunctions(jobQuery, planState); + /* + * We've processed parameters in ExecuteMasterEvaluableFunctions and + * don't need to send their values to workers, since they will be + * represented as constants in the deparsed query. To avoid sending + * parameter values, we set the parameter list to NULL. + */ + executorState->es_param_list_info = NULL; + if (deferredPruning) { - AssignInsertTaskShardId(jobQuery, taskList); + DeferredErrorMessage *planningError = NULL; + + /* need to perform shard pruning, rebuild the task list from scratch */ + taskList = RouterModifyTaskList(jobQuery, &planningError); + + if (planningError != NULL) + { + RaiseDeferredError(planningError, ERROR); + } + + workerJob->taskList = taskList; } RebuildQueryStrings(jobQuery, taskList); } - /* - * If we are executing a prepared statement, then we may not yet have obtained - * the metadata locks in this transaction. To prevent a concurrent shard copy, - * we re-obtain them here or error out if a shard copy has already started. - * - * If a shard copy finishes in between fetching a plan from cache and - * re-acquiring the locks, then we might still run a stale plan, which could - * cause shard placements to diverge. To minimize this window, we take the - * locks as early as possible. - */ + /* prevent concurrent placement changes */ ReacquireMetadataLocks(taskList); - /* - * If we deferred shard pruning to the executor, then we still need to assign - * shard placements. We do this after acquiring the metadata locks to ensure - * we can't get stale metadata. At some point, we may want to load all - * placement metadata here. - */ - if (deferredPruning) - { - /* modify tasks are always assigned using first-replica policy */ - workerJob->taskList = FirstReplicaAssignTaskList(taskList); - } -} - - -/* - * AssignInsertTaskShardId performs shard pruning for an insert and sets - * anchorShardId accordingly. - */ -static void -AssignInsertTaskShardId(Query *jobQuery, List *taskList) -{ - ShardInterval *shardInterval = NULL; - Task *insertTask = NULL; - DeferredErrorMessage *planningError = NULL; - - Assert(jobQuery->commandType == CMD_INSERT); - - /* - * We skipped shard pruning in the planner because the partition - * column contained an expression. Perform shard pruning now. - */ - shardInterval = FindShardForInsert(jobQuery, &planningError); - if (planningError != NULL) - { - RaiseDeferredError(planningError, ERROR); - } - else if (shardInterval == NULL) - { - /* expression could not be evaluated */ - ereport(ERROR, (errmsg("parameters in the partition column are not " - "allowed"))); - } - - /* assign a shard ID to the task */ - insertTask = (Task *) linitial(taskList); - insertTask->anchorShardId = shardInterval->shardId; + /* assign task placements */ + workerJob->taskList = FirstReplicaAssignTaskList(taskList); } @@ -515,9 +480,13 @@ RouterSingleModifyExecScan(CustomScanState *node) bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - Task *task = (Task *) linitial(taskList); - ExecuteSingleModifyTask(scanState, task, hasReturning); + if (list_length(taskList) > 0) + { + Task *task = (Task *) linitial(taskList); + + ExecuteSingleModifyTask(scanState, task, hasReturning); + } scanState->finishedRemoteScan = true; } @@ -574,9 +543,13 @@ RouterSelectExecScan(CustomScanState *node) MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - Task *task = (Task *) linitial(taskList); - ExecuteSingleSelectTask(scanState, task); + if (list_length(taskList) > 0) + { + Task *task = (Task *) linitial(taskList); + + ExecuteSingleSelectTask(scanState, task); + } scanState->finishedRemoteScan = true; } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 318dae558..caf24bde8 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -77,13 +77,23 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); } + else if (task->upsertQuery) + { + RangeTblEntry *rangeTableEntry = NULL; + + /* setting an alias simplifies deparsing of UPSERTs */ + rangeTableEntry = linitial(query->rtable); + if (rangeTableEntry->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + rangeTableEntry->alias = alias; + } + } deparse_shard_query(query, relationId, task->anchorShardId, newQueryString); - ereport(DEBUG4, (errmsg("query before rebuilding: %s", - task->queryString))); - ereport(DEBUG4, (errmsg("query after rebuilding: %s", + ereport(DEBUG4, (errmsg("distributed statement: %s", newQueryString->data))); task->queryString = newQueryString->data; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 9cf75e2b7..46e411863 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -93,22 +93,30 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context); static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree); -static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery, - ShardInterval *shardInterval); -static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query, - DeferredErrorMessage **planningError); +static Job * RouterModifyJob(Query *originalQuery, Query *query, + DeferredErrorMessage **planningError); +static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry); +static bool CanShardPrune(Oid distributedTableId, Query *query); +static List * RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError); +static List * RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError); +static Job * CreateJob(Query *query); +static Task * CreateTask(TaskType taskType); +static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError); static ShardInterval * FindShardForUpdateOrDelete(Query *query, + DistTableCacheEntry *cacheEntry, DeferredErrorMessage **planningError); static List * QueryRestrictList(Query *query, char partitionMethod); static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); -static Task * RouterSelectTask(Query *originalQuery, - RelationRestrictionContext *restrictionContext, - List **placementList); +static Job * RouterSelectJob(Query *originalQuery, + RelationRestrictionContext *restrictionContext, + bool *queryRoutable); static bool RelationPrunesToMultipleShards(List *relationShardList); static List * TargetShardIntervalsForSelect(Query *query, RelationRestrictionContext *restrictionContext); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); -static Job * RouterQueryJob(Query *query, Task *task, List *placementList); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -151,11 +159,7 @@ MultiPlan * CreateModifyPlan(Query *originalQuery, Query *query, PlannerRestrictionContext *plannerRestrictionContext) { - Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery); - ShardInterval *targetShardInterval = NULL; - Task *task = NULL; Job *job = NULL; - List *placementList = NIL; MultiPlan *multiPlan = CitusMakeNode(MultiPlan); multiPlan->operation = query->commandType; @@ -166,18 +170,14 @@ CreateModifyPlan(Query *originalQuery, Query *query, return multiPlan; } - targetShardInterval = TargetShardIntervalForModify(distributedTableId, query, - &multiPlan->planningError); + job = RouterModifyJob(originalQuery, query, &multiPlan->planningError); if (multiPlan->planningError != NULL) { return multiPlan; } - task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval); - ereport(DEBUG2, (errmsg("Creating router plan"))); - job = RouterQueryJob(originalQuery, task, placementList); multiPlan->workerJob = job; multiPlan->masterQuery = NULL; multiPlan->routerExecutable = true; @@ -204,8 +204,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, RelationRestrictionContext *restrictionContext) { Job *job = NULL; - Task *task = NULL; - List *placementList = NIL; + bool queryRoutable = false; MultiPlan *multiPlan = CitusMakeNode(MultiPlan); multiPlan->operation = query->commandType; @@ -217,16 +216,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, return multiPlan; } - task = RouterSelectTask(originalQuery, restrictionContext, &placementList); - if (task == NULL) + job = RouterSelectJob(originalQuery, restrictionContext, &queryRoutable); + if (!queryRoutable) { + /* query cannot be handled by this planner */ return NULL; } ereport(DEBUG2, (errmsg("Creating router plan"))); - job = RouterQueryJob(originalQuery, task, placementList); - multiPlan->workerJob = job; multiPlan->masterQuery = NULL; multiPlan->routerExecutable = true; @@ -1015,83 +1013,151 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre /* - * RouterModifyTask builds a Task to represent a modification performed by + * RouterModifyJob builds a Job to represent a modification performed by * the provided query against the provided shard interval. This task contains * shard-extended deparsed SQL to be run during execution. */ -static Task * -RouterModifyTask(Oid distributedTableId, Query *originalQuery, - ShardInterval *shardInterval) +static Job * +RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError) { - Task *modifyTask = NULL; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + Oid distributedTableId = ExtractFirstDistributedTableId(query); + List *taskList = NIL; + Job *job = NULL; + bool requiresMasterEvaluation = false; + bool deferredPruning = false; - modifyTask = CitusMakeNode(Task); - modifyTask->jobId = INVALID_JOB_ID; - modifyTask->taskId = INVALID_TASK_ID; - modifyTask->taskType = MODIFY_TASK; - modifyTask->queryString = NULL; - modifyTask->anchorShardId = INVALID_SHARD_ID; - modifyTask->dependedTaskList = NIL; - modifyTask->replicationModel = cacheEntry->replicationModel; - - if (originalQuery->onConflict != NULL) + if (!CanShardPrune(distributedTableId, query)) { - RangeTblEntry *rangeTableEntry = NULL; + /* there is a non-constant in the partition column, cannot prune yet */ + taskList = NIL; + deferredPruning = true; - /* set the flag */ - modifyTask->upsertQuery = true; - - /* setting an alias simplifies deparsing of UPSERTs */ - rangeTableEntry = linitial(originalQuery->rtable); - if (rangeTableEntry->alias == NULL) + /* must evaluate the non-constant in the partition column */ + requiresMasterEvaluation = true; + } + else + { + taskList = RouterModifyTaskList(query, planningError); + if (*planningError) { - Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); - rangeTableEntry->alias = alias; + return NULL; } + + /* determine whether there are function calls to evaluate */ + requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); } - if (shardInterval != NULL) + if (!requiresMasterEvaluation) { - uint64 shardId = shardInterval->shardId; - StringInfo queryString = makeStringInfo(); - - /* grab shared metadata lock to stop concurrent placement additions */ - LockShardDistributionMetadata(shardId, ShareLock); - - deparse_shard_query(originalQuery, shardInterval->relationId, shardId, - queryString); - ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); - - modifyTask->queryString = queryString->data; - modifyTask->anchorShardId = shardId; + /* no functions or parameters, build the query strings upfront */ + RebuildQueryStrings(originalQuery, taskList); } - return modifyTask; + job = CreateJob(originalQuery); + job->taskList = taskList; + job->requiresMasterEvaluation = requiresMasterEvaluation; + job->deferredPruning = deferredPruning; + + return job; } /* - * TargetShardIntervalForModify determines the single shard targeted by a provided - * modify command. If no matching shards exist, it throws an error. Otherwise, it - * delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the - * command type. + * CreateJob returns a new Job for the given query. */ -static ShardInterval * -TargetShardIntervalForModify(Oid distributedTableId, Query *query, - DeferredErrorMessage **planningError) +static Job * +CreateJob(Query *query) { - ShardInterval *shardInterval = NULL; + Job *job = NULL; + + job = CitusMakeNode(Job); + job->jobId = INVALID_JOB_ID; + job->jobQuery = query; + job->taskList = NIL; + job->dependedJobList = NIL; + job->subqueryPushdown = false; + job->requiresMasterEvaluation = false; + job->deferredPruning = false; + + return job; +} + + +/* + * CanShardPrune determines whether a query is ready for shard pruning + * by checking whether there is a constant value in the partition column. + */ +static bool +CanShardPrune(Oid distributedTableId, Query *query) +{ + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + Expr *partitionValueExpr = NULL; + + if (query->commandType != CMD_INSERT) + { + /* we assume UPDATE/DELETE is always prunable */ + return true; + } + + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + if (partitionColumn == NULL) + { + /* can always do shard pruning for reference tables */ + return true; + } + + partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn); + if (IsA(partitionValueExpr, Const)) + { + /* can do shard pruning if the partition column is constant */ + return true; + } + + return false; +} + + +/* + * RouterModifyTaskList builds a list of tasks for a given query. + */ +List * +RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError) +{ + Oid distributedTableId = ExtractFirstDistributedTableId(query); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); CmdType commandType = query->commandType; - int shardCount = 0; + List *taskList = NIL; - Assert(commandType != CMD_SELECT); + ErrorIfNoShardsExist(cacheEntry); - /* error out if no shards exist for the table */ - shardCount = cacheEntry->shardIntervalArrayLength; + if (commandType == CMD_INSERT) + { + taskList = RouterInsertTaskList(query, cacheEntry, planningError); + } + else + { + taskList = RouterUpdateOrDeleteTaskList(query, cacheEntry, planningError); + if (*planningError) + { + return NIL; + } + } + + return taskList; +} + + +/* + * ErrorIfNoShardsExist throws an error if the given table has no shards. + */ +static void +ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry) +{ + int shardCount = cacheEntry->shardIntervalArrayLength; if (shardCount == 0) { + Oid distributedTableId = cacheEntry->relationId; char *relationName = get_rel_name(distributedTableId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1101,17 +1167,110 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query, errhint("Run master_create_worker_shards to create shards " "and try again."))); } +} - if (commandType == CMD_INSERT) + +/* + * RouterInsertTaskList generates a list of tasks for performing an INSERT on + * a distributed table via the router executor. + */ +static List * +RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError) +{ + ShardInterval *shardInterval = NULL; + Task *modifyTask = NULL; + + Assert(query->commandType == CMD_INSERT); + + shardInterval = FindShardForInsert(query, cacheEntry, planningError); + + if (*planningError != NULL) { - shardInterval = FindShardForInsert(query, planningError); - } - else - { - shardInterval = FindShardForUpdateOrDelete(query, planningError); + return NIL; } - return shardInterval; + /* an INSERT always routes to exactly one shard */ + Assert(shardInterval != NULL); + + modifyTask = CreateTask(MODIFY_TASK); + modifyTask->anchorShardId = shardInterval->shardId; + modifyTask->replicationModel = cacheEntry->replicationModel; + + if (query->onConflict != NULL) + { + modifyTask->upsertQuery = true; + } + + return list_make1(modifyTask); +} + + +/* + * RouterUpdateOrDeleteTaskList returns a list of tasks for executing an UPDATE + * or DELETE command on a distributed table via the router executor. + */ +static List * +RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError) +{ + ShardInterval *shardInterval = NULL; + List *taskList = NIL; + + Assert(query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE); + + shardInterval = FindShardForUpdateOrDelete(query, cacheEntry, planningError); + + if (*planningError != NULL) + { + return NIL; + } + + if (shardInterval != NULL) + { + Task *modifyTask = NULL; + + modifyTask = CreateTask(MODIFY_TASK); + modifyTask->anchorShardId = shardInterval->shardId; + modifyTask->replicationModel = cacheEntry->replicationModel; + + taskList = lappend(taskList, modifyTask); + } + + return taskList; +} + + +/* + * CreateTask returns a new Task with the given type. + */ +static Task * +CreateTask(TaskType taskType) +{ + Task *task = NULL; + + task = CitusMakeNode(Task); + task->taskType = taskType; + task->jobId = INVALID_JOB_ID; + task->taskId = INVALID_TASK_ID; + task->queryString = NULL; + task->anchorShardId = INVALID_SHARD_ID; + task->taskPlacementList = NIL; + task->dependedTaskList = NIL; + + task->partitionId = 0; + task->upstreamTaskId = INVALID_TASK_ID; + task->shardInterval = NULL; + task->assignmentConstrained = false; + task->shardId = INVALID_SHARD_ID; + task->taskExecution = NULL; + task->upsertQuery = false; + task->replicationModel = REPLICATION_MODEL_INVALID; + + task->insertSelectQuery = false; + task->relationShardList = NIL; + + return task; } @@ -1121,45 +1280,42 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query, * evaluated. If the partition column value falls within 0 or multiple * (overlapping) shards, the planningError is set. */ -ShardInterval * -FindShardForInsert(Query *query, DeferredErrorMessage **planningError) +static ShardInterval * +FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError) { - Oid distributedTableId = ExtractFirstDistributedTableId(query); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + Oid distributedTableId = cacheEntry->relationId; char partitionMethod = cacheEntry->partitionMethod; uint32 rangeTableId = 1; Var *partitionColumn = NULL; Expr *partitionValueExpr = NULL; Const *partitionValueConst = NULL; - List *shardIntervalList = NIL; - List *prunedShardList = NIL; int prunedShardCount = 0; + List *prunedShardList = NIL; Assert(query->commandType == CMD_INSERT); - /* reference tables can only have one shard */ + /* reference tables do not have a partition column, but can only have one shard */ if (partitionMethod == DISTRIBUTE_BY_NONE) { - int shardCount = 0; - - shardIntervalList = LoadShardIntervalList(distributedTableId); - shardCount = list_length(shardIntervalList); - + int shardCount = cacheEntry->shardIntervalArrayLength; if (shardCount != 1) { ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount))); } - return (ShardInterval *) linitial(shardIntervalList); + return cacheEntry->sortedShardIntervalArray[0]; } partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn); + + /* non-constants should have been caught by CanShardPrune */ if (!IsA(partitionValueExpr, Const)) { - /* shard pruning not possible right now */ - return NULL; + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot perform an INSERT with a non-constant in the " + "partition column"))); } partitionValueConst = (Const *) partitionValueExpr; @@ -1173,7 +1329,6 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError) if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) { Datum partitionValue = partitionValueConst->constvalue; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry); if (shardInterval != NULL) @@ -1251,10 +1406,10 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError) * needs to be applied to multiple or no shards. */ static ShardInterval * -FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) +FindShardForUpdateOrDelete(Query *query, DistTableCacheEntry *cacheEntry, + DeferredErrorMessage **planningError) { - Oid distributedTableId = ExtractFirstDistributedTableId(query); - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + Oid distributedTableId = cacheEntry->relationId; char partitionMethod = cacheEntry->partitionMethod; CmdType commandType = query->commandType; List *restrictClauseList = NIL; @@ -1268,7 +1423,7 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList); prunedShardCount = list_length(prunedShardList); - if (prunedShardCount != 1) + if (prunedShardCount > 1) { char *partitionKeyString = cacheEntry->partitionKeyString; char *partitionColumnName = ColumnNameToColumn(distributedTableId, @@ -1276,7 +1431,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) StringInfo errorMessage = makeStringInfo(); StringInfo errorHint = makeStringInfo(); const char *commandName = NULL; - const char *targetCountType = NULL; if (commandType == CMD_UPDATE) { @@ -1287,15 +1441,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) commandName = "DELETE"; } - if (prunedShardCount == 0) - { - targetCountType = "no"; - } - else - { - targetCountType = "multiple"; - } - appendStringInfo(errorHint, "Consider using an equality filter on " "partition column \"%s\" to target a " "single shard. If you'd like to run a " @@ -1310,8 +1455,9 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) "all shards satisfying delete criteria."); } - appendStringInfo(errorMessage, "cannot run %s command which targets %s shards", - commandName, targetCountType); + appendStringInfo(errorMessage, + "cannot run %s command which targets multiple shards", + commandName); (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, NULL, @@ -1319,6 +1465,10 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) return NULL; } + else if (prunedShardCount == 0) + { + return NULL; + } return (ShardInterval *) linitial(prunedShardList); } @@ -1404,16 +1554,17 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn) } -/* RouterSelectTask builds a Task to represent a single shard select query */ -static Task * -RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext, - List **placementList) +/* RouterSelectJob builds a Job to represent a single shard select query */ +static Job * +RouterSelectJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, + bool *returnQueryRoutable) { + Job *job = NULL; Task *task = NULL; bool queryRoutable = false; StringInfo queryString = makeStringInfo(); - bool upsertQuery = false; uint64 shardId = INVALID_SHARD_ID; + List *placementList = NIL; List *relationShardList = NIL; bool replacePrunedQueryWithDummy = false; @@ -1421,29 +1572,31 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo replacePrunedQueryWithDummy = true; queryRoutable = RouterSelectQuery(originalQuery, restrictionContext, - placementList, &shardId, &relationShardList, + &placementList, &shardId, &relationShardList, replacePrunedQueryWithDummy); if (!queryRoutable) { + *returnQueryRoutable = false; return NULL; } + job = CreateJob(originalQuery); + pg_get_query_def(originalQuery, queryString); - task = CitusMakeNode(Task); - task->jobId = INVALID_JOB_ID; - task->taskId = INVALID_TASK_ID; - task->taskType = ROUTER_TASK; + task = CreateTask(ROUTER_TASK); task->queryString = queryString->data; task->anchorShardId = shardId; - task->replicationModel = REPLICATION_MODEL_INVALID; - task->dependedTaskList = NIL; - task->upsertQuery = upsertQuery; + task->taskPlacementList = placementList; task->relationShardList = relationShardList; - return task; + job->taskList = list_make1(task); + + *returnQueryRoutable = true; + + return job; } @@ -1771,67 +1924,6 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList) } -/* - * RouterQueryJob creates a Job for the specified query to execute the - * provided single shard select task. - */ -static Job * -RouterQueryJob(Query *query, Task *task, List *placementList) -{ - Job *job = NULL; - List *taskList = NIL; - TaskType taskType = task->taskType; - bool requiresMasterEvaluation = false; - bool deferredPruning = false; - - if (taskType == MODIFY_TASK) - { - if (task->anchorShardId != INVALID_SHARD_ID) - { - /* - * We were able to assign a shard ID. Generate task - * placement list using the first-replica assignment - * policy (modify placements in placement ID order). - */ - taskList = FirstReplicaAssignTaskList(list_make1(task)); - requiresMasterEvaluation = RequiresMasterEvaluation(query); - } - else - { - /* - * We were unable to assign a shard ID yet, meaning - * the partition column value is an expression. - */ - taskList = list_make1(task); - requiresMasterEvaluation = true; - deferredPruning = true; - } - } - else - { - /* - * For selects we get the placement list during shard - * pruning. - */ - Assert(placementList != NIL); - - task->taskPlacementList = placementList; - taskList = list_make1(task); - } - - job = CitusMakeNode(Job); - job->dependedJobList = NIL; - job->jobId = INVALID_JOB_ID; - job->subqueryPushdown = false; - job->jobQuery = query; - job->taskList = taskList; - job->requiresMasterEvaluation = requiresMasterEvaluation; - job->deferredPruning = deferredPruning; - - return job; -} - - /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 80ee4eb5e..a297e8f5c 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -46,8 +46,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); -extern ShardInterval * FindShardForInsert(Query *query, - DeferredErrorMessage **planningError); +extern List * RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 4f822e05b..54b1584c9 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -334,6 +334,21 @@ Custom Scan (Citus Router) -> Index Scan using lineitem_pkey_290000 on lineitem_290000 Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) +-- Test zero-shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem + SET l_suppkey = 12 + WHERE l_orderkey = 1 AND l_orderkey = 0; +Custom Scan (Citus Router) + Task Count: 0 + Tasks Shown: All +-- Test zero-shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem + WHERE l_orderkey = 1 AND l_orderkey = 0; +Custom Scan (Citus Router) + Task Count: 0 + Tasks Shown: All -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index c74d4c756..d5765679e 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -334,6 +334,21 @@ Custom Scan (Citus Router) -> Index Scan using lineitem_pkey_290000 on lineitem_290000 Index Cond: (l_orderkey = 1) Filter: (l_partkey = 0) +-- Test zero-shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem + SET l_suppkey = 12 + WHERE l_orderkey = 1 AND l_orderkey = 0; +Custom Scan (Citus Router) + Task Count: 0 + Tasks Shown: All +-- Test zero-shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem + WHERE l_orderkey = 1 AND l_orderkey = 0; +Custom Scan (Citus Router) + Task Count: 0 + Tasks Shown: All -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 22ea5c7a8..dd995c160 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -89,13 +89,10 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681); INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); -- insert a single row for the test INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); --- zero-shard modifications should fail +-- zero-shard modifications should succeed UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; -ERROR: cannot run UPDATE command which targets no shards -HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). +UPDATE articles SET title = '' WHERE 0 = 1; DELETE FROM articles WHERE author_id = 1 AND author_id = 2; -ERROR: cannot run DELETE command which targets no shards -HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards(). -- single-shard tests -- test simple select for a single row SELECT * FROM articles WHERE author_id = 10 AND id = 50; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index fe08fd483..9f3035d0d 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -100,6 +100,17 @@ EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; +-- Test zero-shard update +EXPLAIN (COSTS FALSE) + UPDATE lineitem + SET l_suppkey = 12 + WHERE l_orderkey = 1 AND l_orderkey = 0; + +-- Test zero-shard delete +EXPLAIN (COSTS FALSE) + DELETE FROM lineitem + WHERE l_orderkey = 1 AND l_orderkey = 0; + -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 5a1eb76ce..eaaa915aa 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -80,8 +80,9 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519); -- insert a single row for the test INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519); --- zero-shard modifications should fail +-- zero-shard modifications should succeed UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2; +UPDATE articles SET title = '' WHERE 0 = 1; DELETE FROM articles WHERE author_id = 1 AND author_id = 2; -- single-shard tests