Merge pull request #1474 from citusdata/update_where_false

Execute UPDATE/DELETE statements with 0 shards
pull/1535/head
Marco Slot 2017-08-07 17:48:52 +04:00 committed by GitHub
commit 7e4b2c1595
12 changed files with 399 additions and 313 deletions

View File

@ -158,8 +158,8 @@ RouterCreateScan(CustomScan *scan)
isModificationQuery = IsModifyMultiPlan(multiPlan);
/* check if this is a single shard query */
if (list_length(taskList) == 1)
/* check whether query has at most one shard */
if (list_length(taskList) <= 1)
{
if (isModificationQuery)
{

View File

@ -73,8 +73,7 @@ bool AllModificationsCommutative = false;
bool EnableDeadlockPrevention = true;
/* functions needed during run phase */
static void ReacquireMetadataLocks(List *taskList);
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
@ -101,22 +100,12 @@ static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
/*
* ReacquireMetadataLocks re-acquires the metadata locks that are normally
* acquired during planning.
*
* If we are executing a prepared statement, then planning might have
* happened in a separate transaction and advisory locks are no longer
* held. If a shard is currently being repaired/copied/moved, then
* obtaining the locks will fail and this function throws an error to
* prevent executing a stale plan.
*
* If we are executing a non-prepared statement or planning happened in
* the same transaction, then we already have the locks and obtain them
* again here. Since we always release these locks at the end of the
* transaction, this is effectively a no-op.
* AcquireMetadataLocks acquires metadata locks on each of the anchor
* shards in the task list to prevent a shard being modified while it
* is being copied.
*/
static void
ReacquireMetadataLocks(List *taskList)
AcquireMetadataLocks(List *taskList)
{
ListCell *taskCell = NULL;
@ -131,28 +120,7 @@ ReacquireMetadataLocks(List *taskList)
{
Task *task = (Task *) lfirst(taskCell);
/*
* Only obtain metadata locks for modifications to allow reads to
* proceed during shard copy.
*/
if (task->taskType == MODIFY_TASK &&
!TryLockShardDistributionMetadata(task->anchorShardId, ShareLock))
{
/*
* We could error out immediately to give quick feedback to the
* client, but this might complicate flow control and our default
* behaviour during shard copy is to block.
*
* Block until the lock becomes available such that the next command
* will likely succeed and use the serialization failure error code
* to signal to the client that it should retry the current command.
*/
LockShardDistributionMetadata(task->anchorShardId, ShareLock);
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("prepared modifications cannot be executed on "
"a shard while it is being copied")));
}
LockShardDistributionMetadata(task->anchorShardId, ShareLock);
}
}
@ -427,75 +395,41 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
if (workerJob->requiresMasterEvaluation)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
EState *executorState = planState->state;
ExecuteMasterEvaluableFunctions(jobQuery, planState);
/*
* We've processed parameters in ExecuteMasterEvaluableFunctions and
* don't need to send their values to workers, since they will be
* represented as constants in the deparsed query. To avoid sending
* parameter values, we set the parameter list to NULL.
*/
executorState->es_param_list_info = NULL;
if (deferredPruning)
{
AssignInsertTaskShardId(jobQuery, taskList);
DeferredErrorMessage *planningError = NULL;
/* need to perform shard pruning, rebuild the task list from scratch */
taskList = RouterModifyTaskList(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
workerJob->taskList = taskList;
}
RebuildQueryStrings(jobQuery, taskList);
}
/*
* If we are executing a prepared statement, then we may not yet have obtained
* the metadata locks in this transaction. To prevent a concurrent shard copy,
* we re-obtain them here or error out if a shard copy has already started.
*
* If a shard copy finishes in between fetching a plan from cache and
* re-acquiring the locks, then we might still run a stale plan, which could
* cause shard placements to diverge. To minimize this window, we take the
* locks as early as possible.
*/
ReacquireMetadataLocks(taskList);
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
/*
* If we deferred shard pruning to the executor, then we still need to assign
* shard placements. We do this after acquiring the metadata locks to ensure
* we can't get stale metadata. At some point, we may want to load all
* placement metadata here.
*/
if (deferredPruning)
{
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}
}
/*
* AssignInsertTaskShardId performs shard pruning for an insert and sets
* anchorShardId accordingly.
*/
static void
AssignInsertTaskShardId(Query *jobQuery, List *taskList)
{
ShardInterval *shardInterval = NULL;
Task *insertTask = NULL;
DeferredErrorMessage *planningError = NULL;
Assert(jobQuery->commandType == CMD_INSERT);
/*
* We skipped shard pruning in the planner because the partition
* column contained an expression. Perform shard pruning now.
*/
shardInterval = FindShardForInsert(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
else if (shardInterval == NULL)
{
/* expression could not be evaluated */
ereport(ERROR, (errmsg("parameters in the partition column are not "
"allowed")));
}
/* assign a shard ID to the task */
insertTask = (Task *) linitial(taskList);
insertTask->anchorShardId = shardInterval->shardId;
/* assign task placements */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}
@ -515,9 +449,13 @@ RouterSingleModifyExecScan(CustomScanState *node)
bool hasReturning = multiPlan->hasReturning;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = (Task *) linitial(taskList);
ExecuteSingleModifyTask(scanState, task, hasReturning);
if (list_length(taskList) > 0)
{
Task *task = (Task *) linitial(taskList);
ExecuteSingleModifyTask(scanState, task, hasReturning);
}
scanState->finishedRemoteScan = true;
}
@ -574,9 +512,13 @@ RouterSelectExecScan(CustomScanState *node)
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = (Task *) linitial(taskList);
ExecuteSingleSelectTask(scanState, task);
if (list_length(taskList) > 0)
{
Task *task = (Task *) linitial(taskList);
ExecuteSingleSelectTask(scanState, task);
}
scanState->finishedRemoteScan = true;
}

View File

@ -77,13 +77,23 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
}
else if (task->upsertQuery)
{
RangeTblEntry *rangeTableEntry = NULL;
/* setting an alias simplifies deparsing of UPSERTs */
rangeTableEntry = linitial(query->rtable);
if (rangeTableEntry->alias == NULL)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
rangeTableEntry->alias = alias;
}
}
deparse_shard_query(query, relationId, task->anchorShardId,
newQueryString);
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
task->queryString)));
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ereport(DEBUG4, (errmsg("distributed statement: %s",
newQueryString->data)));
task->queryString = newQueryString->data;

View File

@ -93,22 +93,30 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
FromExpr *joinTree);
static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery,
ShardInterval *shardInterval);
static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query,
DeferredErrorMessage **planningError);
static Job * RouterModifyJob(Query *originalQuery, Query *query,
DeferredErrorMessage **planningError);
static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry);
static bool CanShardPrune(Oid distributedTableId, Query *query);
static List * RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError);
static List * RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError);
static Job * CreateJob(Query *query);
static Task * CreateTask(TaskType taskType);
static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError);
static ShardInterval * FindShardForUpdateOrDelete(Query *query,
DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError);
static List * QueryRestrictList(Query *query, char partitionMethod);
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
List **placementList);
static Job * RouterSelectJob(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
bool *queryRoutable);
static bool RelationPrunesToMultipleShards(List *relationShardList);
static List * TargetShardIntervalsForSelect(Query *query,
RelationRestrictionContext *restrictionContext);
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
static bool MultiRouterPlannableQuery(Query *query,
RelationRestrictionContext *restrictionContext);
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
@ -151,11 +159,7 @@ MultiPlan *
CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *plannerRestrictionContext)
{
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
ShardInterval *targetShardInterval = NULL;
Task *task = NULL;
Job *job = NULL;
List *placementList = NIL;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType;
@ -166,18 +170,14 @@ CreateModifyPlan(Query *originalQuery, Query *query,
return multiPlan;
}
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&multiPlan->planningError);
job = RouterModifyJob(originalQuery, query, &multiPlan->planningError);
if (multiPlan->planningError != NULL)
{
return multiPlan;
}
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
@ -204,8 +204,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
Job *job = NULL;
Task *task = NULL;
List *placementList = NIL;
bool queryRoutable = false;
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
multiPlan->operation = query->commandType;
@ -217,16 +216,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
return multiPlan;
}
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
if (task == NULL)
job = RouterSelectJob(originalQuery, restrictionContext, &queryRoutable);
if (!queryRoutable)
{
/* query cannot be handled by this planner */
return NULL;
}
ereport(DEBUG2, (errmsg("Creating router plan")));
job = RouterQueryJob(originalQuery, task, placementList);
multiPlan->workerJob = job;
multiPlan->masterQuery = NULL;
multiPlan->routerExecutable = true;
@ -1015,83 +1013,151 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
/*
* RouterModifyTask builds a Task to represent a modification performed by
* RouterModifyJob builds a Job to represent a modification performed by
* the provided query against the provided shard interval. This task contains
* shard-extended deparsed SQL to be run during execution.
*/
static Task *
RouterModifyTask(Oid distributedTableId, Query *originalQuery,
ShardInterval *shardInterval)
static Job *
RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
{
Task *modifyTask = NULL;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
Oid distributedTableId = ExtractFirstDistributedTableId(query);
List *taskList = NIL;
Job *job = NULL;
bool requiresMasterEvaluation = false;
bool deferredPruning = false;
modifyTask = CitusMakeNode(Task);
modifyTask->jobId = INVALID_JOB_ID;
modifyTask->taskId = INVALID_TASK_ID;
modifyTask->taskType = MODIFY_TASK;
modifyTask->queryString = NULL;
modifyTask->anchorShardId = INVALID_SHARD_ID;
modifyTask->dependedTaskList = NIL;
modifyTask->replicationModel = cacheEntry->replicationModel;
if (originalQuery->onConflict != NULL)
if (!CanShardPrune(distributedTableId, query))
{
RangeTblEntry *rangeTableEntry = NULL;
/* there is a non-constant in the partition column, cannot prune yet */
taskList = NIL;
deferredPruning = true;
/* set the flag */
modifyTask->upsertQuery = true;
/* setting an alias simplifies deparsing of UPSERTs */
rangeTableEntry = linitial(originalQuery->rtable);
if (rangeTableEntry->alias == NULL)
/* must evaluate the non-constant in the partition column */
requiresMasterEvaluation = true;
}
else
{
taskList = RouterModifyTaskList(query, planningError);
if (*planningError)
{
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
rangeTableEntry->alias = alias;
return NULL;
}
/* determine whether there are function calls to evaluate */
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
}
if (shardInterval != NULL)
if (!requiresMasterEvaluation)
{
uint64 shardId = shardInterval->shardId;
StringInfo queryString = makeStringInfo();
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
deparse_shard_query(originalQuery, shardInterval->relationId, shardId,
queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
modifyTask->queryString = queryString->data;
modifyTask->anchorShardId = shardId;
/* no functions or parameters, build the query strings upfront */
RebuildQueryStrings(originalQuery, taskList);
}
return modifyTask;
job = CreateJob(originalQuery);
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
return job;
}
/*
* TargetShardIntervalForModify determines the single shard targeted by a provided
* modify command. If no matching shards exist, it throws an error. Otherwise, it
* delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the
* command type.
* CreateJob returns a new Job for the given query.
*/
static ShardInterval *
TargetShardIntervalForModify(Oid distributedTableId, Query *query,
DeferredErrorMessage **planningError)
static Job *
CreateJob(Query *query)
{
ShardInterval *shardInterval = NULL;
Job *job = NULL;
job = CitusMakeNode(Job);
job->jobId = INVALID_JOB_ID;
job->jobQuery = query;
job->taskList = NIL;
job->dependedJobList = NIL;
job->subqueryPushdown = false;
job->requiresMasterEvaluation = false;
job->deferredPruning = false;
return job;
}
/*
* CanShardPrune determines whether a query is ready for shard pruning
* by checking whether there is a constant value in the partition column.
*/
static bool
CanShardPrune(Oid distributedTableId, Query *query)
{
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
Expr *partitionValueExpr = NULL;
if (query->commandType != CMD_INSERT)
{
/* we assume UPDATE/DELETE is always prunable */
return true;
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
if (partitionColumn == NULL)
{
/* can always do shard pruning for reference tables */
return true;
}
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
if (IsA(partitionValueExpr, Const))
{
/* can do shard pruning if the partition column is constant */
return true;
}
return false;
}
/*
* RouterModifyTaskList builds a list of tasks for a given query.
*/
List *
RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
CmdType commandType = query->commandType;
int shardCount = 0;
List *taskList = NIL;
Assert(commandType != CMD_SELECT);
ErrorIfNoShardsExist(cacheEntry);
/* error out if no shards exist for the table */
shardCount = cacheEntry->shardIntervalArrayLength;
if (commandType == CMD_INSERT)
{
taskList = RouterInsertTaskList(query, cacheEntry, planningError);
}
else
{
taskList = RouterUpdateOrDeleteTaskList(query, cacheEntry, planningError);
if (*planningError)
{
return NIL;
}
}
return taskList;
}
/*
* ErrorIfNoShardsExist throws an error if the given table has no shards.
*/
static void
ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
{
int shardCount = cacheEntry->shardIntervalArrayLength;
if (shardCount == 0)
{
Oid distributedTableId = cacheEntry->relationId;
char *relationName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@ -1101,17 +1167,110 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query,
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
}
if (commandType == CMD_INSERT)
/*
* RouterInsertTaskList generates a list of tasks for performing an INSERT on
* a distributed table via the router executor.
*/
static List *
RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError)
{
ShardInterval *shardInterval = NULL;
Task *modifyTask = NULL;
Assert(query->commandType == CMD_INSERT);
shardInterval = FindShardForInsert(query, cacheEntry, planningError);
if (*planningError != NULL)
{
shardInterval = FindShardForInsert(query, planningError);
}
else
{
shardInterval = FindShardForUpdateOrDelete(query, planningError);
return NIL;
}
return shardInterval;
/* an INSERT always routes to exactly one shard */
Assert(shardInterval != NULL);
modifyTask = CreateTask(MODIFY_TASK);
modifyTask->anchorShardId = shardInterval->shardId;
modifyTask->replicationModel = cacheEntry->replicationModel;
if (query->onConflict != NULL)
{
modifyTask->upsertQuery = true;
}
return list_make1(modifyTask);
}
/*
* RouterUpdateOrDeleteTaskList returns a list of tasks for executing an UPDATE
* or DELETE command on a distributed table via the router executor.
*/
static List *
RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError)
{
ShardInterval *shardInterval = NULL;
List *taskList = NIL;
Assert(query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE);
shardInterval = FindShardForUpdateOrDelete(query, cacheEntry, planningError);
if (*planningError != NULL)
{
return NIL;
}
if (shardInterval != NULL)
{
Task *modifyTask = NULL;
modifyTask = CreateTask(MODIFY_TASK);
modifyTask->anchorShardId = shardInterval->shardId;
modifyTask->replicationModel = cacheEntry->replicationModel;
taskList = lappend(taskList, modifyTask);
}
return taskList;
}
/*
* CreateTask returns a new Task with the given type.
*/
static Task *
CreateTask(TaskType taskType)
{
Task *task = NULL;
task = CitusMakeNode(Task);
task->taskType = taskType;
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->queryString = NULL;
task->anchorShardId = INVALID_SHARD_ID;
task->taskPlacementList = NIL;
task->dependedTaskList = NIL;
task->partitionId = 0;
task->upstreamTaskId = INVALID_TASK_ID;
task->shardInterval = NULL;
task->assignmentConstrained = false;
task->shardId = INVALID_SHARD_ID;
task->taskExecution = NULL;
task->upsertQuery = false;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->insertSelectQuery = false;
task->relationShardList = NIL;
return task;
}
@ -1121,45 +1280,42 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query,
* evaluated. If the partition column value falls within 0 or multiple
* (overlapping) shards, the planningError is set.
*/
ShardInterval *
FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
static ShardInterval *
FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
Oid distributedTableId = cacheEntry->relationId;
char partitionMethod = cacheEntry->partitionMethod;
uint32 rangeTableId = 1;
Var *partitionColumn = NULL;
Expr *partitionValueExpr = NULL;
Const *partitionValueConst = NULL;
List *shardIntervalList = NIL;
List *prunedShardList = NIL;
int prunedShardCount = 0;
List *prunedShardList = NIL;
Assert(query->commandType == CMD_INSERT);
/* reference tables can only have one shard */
/* reference tables do not have a partition column, but can only have one shard */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
int shardCount = 0;
shardIntervalList = LoadShardIntervalList(distributedTableId);
shardCount = list_length(shardIntervalList);
int shardCount = cacheEntry->shardIntervalArrayLength;
if (shardCount != 1)
{
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
}
return (ShardInterval *) linitial(shardIntervalList);
return cacheEntry->sortedShardIntervalArray[0];
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
/* non-constants should have been caught by CanShardPrune */
if (!IsA(partitionValueExpr, Const))
{
/* shard pruning not possible right now */
return NULL;
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT with a non-constant in the "
"partition column")));
}
partitionValueConst = (Const *) partitionValueExpr;
@ -1173,7 +1329,6 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
Datum partitionValue = partitionValueConst->constvalue;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
if (shardInterval != NULL)
@ -1251,10 +1406,10 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
* needs to be applied to multiple or no shards.
*/
static ShardInterval *
FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
FindShardForUpdateOrDelete(Query *query, DistTableCacheEntry *cacheEntry,
DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
Oid distributedTableId = cacheEntry->relationId;
char partitionMethod = cacheEntry->partitionMethod;
CmdType commandType = query->commandType;
List *restrictClauseList = NIL;
@ -1268,7 +1423,7 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
if (prunedShardCount > 1)
{
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
@ -1276,7 +1431,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *commandName = NULL;
const char *targetCountType = NULL;
if (commandType == CMD_UPDATE)
{
@ -1287,15 +1441,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
commandName = "DELETE";
}
if (prunedShardCount == 0)
{
targetCountType = "no";
}
else
{
targetCountType = "multiple";
}
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a "
"single shard. If you'd like to run a "
@ -1310,8 +1455,9 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
"all shards satisfying delete criteria.");
}
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
commandName, targetCountType);
appendStringInfo(errorMessage,
"cannot run %s command which targets multiple shards",
commandName);
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
@ -1319,6 +1465,10 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
return NULL;
}
else if (prunedShardCount == 0)
{
return NULL;
}
return (ShardInterval *) linitial(prunedShardList);
}
@ -1404,16 +1554,17 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
}
/* RouterSelectTask builds a Task to represent a single shard select query */
static Task *
RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext,
List **placementList)
/* RouterSelectJob builds a Job to represent a single shard select query */
static Job *
RouterSelectJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
bool *returnQueryRoutable)
{
Job *job = NULL;
Task *task = NULL;
bool queryRoutable = false;
StringInfo queryString = makeStringInfo();
bool upsertQuery = false;
uint64 shardId = INVALID_SHARD_ID;
List *placementList = NIL;
List *relationShardList = NIL;
bool replacePrunedQueryWithDummy = false;
@ -1421,29 +1572,31 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
replacePrunedQueryWithDummy = true;
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
placementList, &shardId, &relationShardList,
&placementList, &shardId, &relationShardList,
replacePrunedQueryWithDummy);
if (!queryRoutable)
{
*returnQueryRoutable = false;
return NULL;
}
job = CreateJob(originalQuery);
pg_get_query_def(originalQuery, queryString);
task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->taskType = ROUTER_TASK;
task = CreateTask(ROUTER_TASK);
task->queryString = queryString->data;
task->anchorShardId = shardId;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NIL;
task->upsertQuery = upsertQuery;
task->taskPlacementList = placementList;
task->relationShardList = relationShardList;
return task;
job->taskList = list_make1(task);
*returnQueryRoutable = true;
return job;
}
@ -1771,67 +1924,6 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
}
/*
* RouterQueryJob creates a Job for the specified query to execute the
* provided single shard select task.
*/
static Job *
RouterQueryJob(Query *query, Task *task, List *placementList)
{
Job *job = NULL;
List *taskList = NIL;
TaskType taskType = task->taskType;
bool requiresMasterEvaluation = false;
bool deferredPruning = false;
if (taskType == MODIFY_TASK)
{
if (task->anchorShardId != INVALID_SHARD_ID)
{
/*
* We were able to assign a shard ID. Generate task
* placement list using the first-replica assignment
* policy (modify placements in placement ID order).
*/
taskList = FirstReplicaAssignTaskList(list_make1(task));
requiresMasterEvaluation = RequiresMasterEvaluation(query);
}
else
{
/*
* We were unable to assign a shard ID yet, meaning
* the partition column value is an expression.
*/
taskList = list_make1(task);
requiresMasterEvaluation = true;
deferredPruning = true;
}
}
else
{
/*
* For selects we get the placement list during shard
* pruning.
*/
Assert(placementList != NIL);
task->taskPlacementList = placementList;
taskList = list_make1(task);
}
job = CitusMakeNode(Job);
job->dependedJobList = NIL;
job->jobId = INVALID_JOB_ID;
job->subqueryPushdown = false;
job->jobQuery = query;
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
return job;
}
/*
* MultiRouterPlannableQuery returns true if given query can be router plannable.
* The query is router plannable if it is a modify query, or if its is a select

View File

@ -46,8 +46,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval);
extern ShardInterval * FindShardForInsert(Query *query,
DeferredErrorMessage **planningError);
extern List * RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -78,7 +78,7 @@ step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
@ -90,7 +90,7 @@ step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
@ -122,15 +122,15 @@ step s2-commit:
COMMIT;
step s1-prepared-insertone: <... completed>
error in steps s2-commit s1-prepared-insertone: ERROR: prepared modifications cannot be executed on a shard while it is being copied
step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
1 1
1 1
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
@ -139,10 +139,11 @@ step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
1 1
1 1
starting permutation: s2-invalidate-57637 s1-insertone s1-prepared-insertall s2-begin s2-repair s1-prepared-insertall s2-commit s2-invalidate-57638 s1-display s2-invalidate-57637 s2-revalidate-57638 s1-display
@ -174,17 +175,18 @@ step s2-commit:
COMMIT;
step s1-prepared-insertall: <... completed>
error in steps s2-commit s1-prepared-insertall: ERROR: prepared modifications cannot be executed on a shard while it is being copied
step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
1 1
1 2
1 2
1 3
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
@ -192,9 +194,11 @@ step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
test_id data
1 1
1 2
1 2
1 3

View File

@ -334,6 +334,21 @@ Custom Scan (Citus Router)
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
Index Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-- Test zero-shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_orderkey = 0;
Custom Scan (Citus Router)
Task Count: 0
Tasks Shown: All
-- Test zero-shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_orderkey = 0;
Custom Scan (Citus Router)
Task Count: 0
Tasks Shown: All
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;

View File

@ -334,6 +334,21 @@ Custom Scan (Citus Router)
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
Index Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-- Test zero-shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_orderkey = 0;
Custom Scan (Citus Router)
Task Count: 0
Tasks Shown: All
-- Test zero-shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_orderkey = 0;
Custom Scan (Citus Router)
Task Count: 0
Tasks Shown: All
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;

View File

@ -89,13 +89,10 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
-- insert a single row for the test
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
-- zero-shard modifications should fail
-- zero-shard modifications should succeed
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
ERROR: cannot run UPDATE command which targets no shards
HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
UPDATE articles SET title = '' WHERE 0 = 1;
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
ERROR: cannot run DELETE command which targets no shards
HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
-- single-shard tests
-- test simple select for a single row
SELECT * FROM articles WHERE author_id = 10 AND id = 50;

View File

@ -47,7 +47,7 @@ step "s1-prepared-insertall"
step "s1-display"
{
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1 ORDER BY test_id;
}
step "s1-commit"
@ -102,8 +102,8 @@ permutation "s1-insertone" "s2-invalidate-57637" "s1-begin" "s1-insertall" "s2-r
# verify that modifications wait for shard repair
permutation "s2-invalidate-57637" "s2-begin" "s2-repair" "s1-insertone" "s2-commit" "s2-invalidate-57638" "s1-display" "s2-invalidate-57637" "s2-revalidate-57638" "s1-display"
# verify that prepared plain modifications wait for shard repair (and then fail to avoid race)
# verify that prepared plain modifications wait for shard repair
permutation "s2-invalidate-57637" "s1-prepared-insertone" "s2-begin" "s2-repair" "s1-prepared-insertone" "s2-commit" "s2-invalidate-57638" "s1-display" "s2-invalidate-57637" "s2-revalidate-57638" "s1-display"
# verify that prepared INSERT ... SELECT waits for shard repair (and then fail to avoid race)
# verify that prepared INSERT ... SELECT waits for shard repair
permutation "s2-invalidate-57637" "s1-insertone" "s1-prepared-insertall" "s2-begin" "s2-repair" "s1-prepared-insertall" "s2-commit" "s2-invalidate-57638" "s1-display" "s2-invalidate-57637" "s2-revalidate-57638" "s1-display"

View File

@ -100,6 +100,17 @@ EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
-- Test zero-shard update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_orderkey = 0;
-- Test zero-shard delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_orderkey = 0;
-- Test single-shard SELECT
EXPLAIN (COSTS FALSE)
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;

View File

@ -80,8 +80,9 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
-- insert a single row for the test
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
-- zero-shard modifications should fail
-- zero-shard modifications should succeed
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
UPDATE articles SET title = '' WHERE 0 = 1;
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
-- single-shard tests