mirror of https://github.com/citusdata/citus.git
Execute UPDATE/DELETE statements with 0 shards
parent
3248f1a2b7
commit
aa7ca81548
|
@ -158,8 +158,8 @@ RouterCreateScan(CustomScan *scan)
|
|||
|
||||
isModificationQuery = IsModifyMultiPlan(multiPlan);
|
||||
|
||||
/* check if this is a single shard query */
|
||||
if (list_length(taskList) == 1)
|
||||
/* check whether query has at most one shard */
|
||||
if (list_length(taskList) <= 1)
|
||||
{
|
||||
if (isModificationQuery)
|
||||
{
|
||||
|
|
|
@ -74,7 +74,6 @@ bool EnableDeadlockPrevention = true;
|
|||
|
||||
/* functions needed during run phase */
|
||||
static void ReacquireMetadataLocks(List *taskList);
|
||||
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
|
||||
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
||||
ShardPlacementAccessType accessType);
|
||||
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||
|
@ -427,75 +426,41 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
|||
if (workerJob->requiresMasterEvaluation)
|
||||
{
|
||||
PlanState *planState = &(scanState->customScanState.ss.ps);
|
||||
EState *executorState = planState->state;
|
||||
|
||||
ExecuteMasterEvaluableFunctions(jobQuery, planState);
|
||||
|
||||
/*
|
||||
* We've processed parameters in ExecuteMasterEvaluableFunctions and
|
||||
* don't need to send their values to workers, since they will be
|
||||
* represented as constants in the deparsed query. To avoid sending
|
||||
* parameter values, we set the parameter list to NULL.
|
||||
*/
|
||||
executorState->es_param_list_info = NULL;
|
||||
|
||||
if (deferredPruning)
|
||||
{
|
||||
AssignInsertTaskShardId(jobQuery, taskList);
|
||||
DeferredErrorMessage *planningError = NULL;
|
||||
|
||||
/* need to perform shard pruning, rebuild the task list from scratch */
|
||||
taskList = RouterModifyTaskList(jobQuery, &planningError);
|
||||
|
||||
if (planningError != NULL)
|
||||
{
|
||||
RaiseDeferredError(planningError, ERROR);
|
||||
}
|
||||
|
||||
workerJob->taskList = taskList;
|
||||
}
|
||||
|
||||
RebuildQueryStrings(jobQuery, taskList);
|
||||
}
|
||||
|
||||
/*
|
||||
* If we are executing a prepared statement, then we may not yet have obtained
|
||||
* the metadata locks in this transaction. To prevent a concurrent shard copy,
|
||||
* we re-obtain them here or error out if a shard copy has already started.
|
||||
*
|
||||
* If a shard copy finishes in between fetching a plan from cache and
|
||||
* re-acquiring the locks, then we might still run a stale plan, which could
|
||||
* cause shard placements to diverge. To minimize this window, we take the
|
||||
* locks as early as possible.
|
||||
*/
|
||||
/* prevent concurrent placement changes */
|
||||
ReacquireMetadataLocks(taskList);
|
||||
|
||||
/*
|
||||
* If we deferred shard pruning to the executor, then we still need to assign
|
||||
* shard placements. We do this after acquiring the metadata locks to ensure
|
||||
* we can't get stale metadata. At some point, we may want to load all
|
||||
* placement metadata here.
|
||||
*/
|
||||
if (deferredPruning)
|
||||
{
|
||||
/* modify tasks are always assigned using first-replica policy */
|
||||
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AssignInsertTaskShardId performs shard pruning for an insert and sets
|
||||
* anchorShardId accordingly.
|
||||
*/
|
||||
static void
|
||||
AssignInsertTaskShardId(Query *jobQuery, List *taskList)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
Task *insertTask = NULL;
|
||||
DeferredErrorMessage *planningError = NULL;
|
||||
|
||||
Assert(jobQuery->commandType == CMD_INSERT);
|
||||
|
||||
/*
|
||||
* We skipped shard pruning in the planner because the partition
|
||||
* column contained an expression. Perform shard pruning now.
|
||||
*/
|
||||
shardInterval = FindShardForInsert(jobQuery, &planningError);
|
||||
if (planningError != NULL)
|
||||
{
|
||||
RaiseDeferredError(planningError, ERROR);
|
||||
}
|
||||
else if (shardInterval == NULL)
|
||||
{
|
||||
/* expression could not be evaluated */
|
||||
ereport(ERROR, (errmsg("parameters in the partition column are not "
|
||||
"allowed")));
|
||||
}
|
||||
|
||||
/* assign a shard ID to the task */
|
||||
insertTask = (Task *) linitial(taskList);
|
||||
insertTask->anchorShardId = shardInterval->shardId;
|
||||
/* assign task placements */
|
||||
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
||||
}
|
||||
|
||||
|
||||
|
@ -515,9 +480,13 @@ RouterSingleModifyExecScan(CustomScanState *node)
|
|||
bool hasReturning = multiPlan->hasReturning;
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
List *taskList = workerJob->taskList;
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
|
||||
ExecuteSingleModifyTask(scanState, task, hasReturning);
|
||||
if (list_length(taskList) > 0)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
|
||||
ExecuteSingleModifyTask(scanState, task, hasReturning);
|
||||
}
|
||||
|
||||
scanState->finishedRemoteScan = true;
|
||||
}
|
||||
|
@ -574,9 +543,13 @@ RouterSelectExecScan(CustomScanState *node)
|
|||
MultiPlan *multiPlan = scanState->multiPlan;
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
List *taskList = workerJob->taskList;
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
|
||||
ExecuteSingleSelectTask(scanState, task);
|
||||
if (list_length(taskList) > 0)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
|
||||
ExecuteSingleSelectTask(scanState, task);
|
||||
}
|
||||
|
||||
scanState->finishedRemoteScan = true;
|
||||
}
|
||||
|
|
|
@ -77,13 +77,23 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
|
|||
|
||||
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
|
||||
}
|
||||
else if (task->upsertQuery)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
|
||||
/* setting an alias simplifies deparsing of UPSERTs */
|
||||
rangeTableEntry = linitial(query->rtable);
|
||||
if (rangeTableEntry->alias == NULL)
|
||||
{
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
rangeTableEntry->alias = alias;
|
||||
}
|
||||
}
|
||||
|
||||
deparse_shard_query(query, relationId, task->anchorShardId,
|
||||
newQueryString);
|
||||
|
||||
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
||||
task->queryString)));
|
||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
||||
newQueryString->data)));
|
||||
|
||||
task->queryString = newQueryString->data;
|
||||
|
|
|
@ -93,22 +93,30 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
|||
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
|
||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||
FromExpr *joinTree);
|
||||
static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery,
|
||||
ShardInterval *shardInterval);
|
||||
static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query,
|
||||
DeferredErrorMessage **planningError);
|
||||
static Job * RouterModifyJob(Query *originalQuery, Query *query,
|
||||
DeferredErrorMessage **planningError);
|
||||
static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry);
|
||||
static bool CanShardPrune(Oid distributedTableId, Query *query);
|
||||
static List * RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError);
|
||||
static List * RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError);
|
||||
static Job * CreateJob(Query *query);
|
||||
static Task * CreateTask(TaskType taskType);
|
||||
static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError);
|
||||
static ShardInterval * FindShardForUpdateOrDelete(Query *query,
|
||||
DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError);
|
||||
static List * QueryRestrictList(Query *query, char partitionMethod);
|
||||
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||
static Task * RouterSelectTask(Query *originalQuery,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
List **placementList);
|
||||
static Job * RouterSelectJob(Query *originalQuery,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
bool *queryRoutable);
|
||||
static bool RelationPrunesToMultipleShards(List *relationShardList);
|
||||
static List * TargetShardIntervalsForSelect(Query *query,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||
static Job * RouterQueryJob(Query *query, Task *task, List *placementList);
|
||||
static bool MultiRouterPlannableQuery(Query *query,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||
|
@ -151,11 +159,7 @@ MultiPlan *
|
|||
CreateModifyPlan(Query *originalQuery, Query *query,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
|
||||
ShardInterval *targetShardInterval = NULL;
|
||||
Task *task = NULL;
|
||||
Job *job = NULL;
|
||||
List *placementList = NIL;
|
||||
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
||||
|
||||
multiPlan->operation = query->commandType;
|
||||
|
@ -166,18 +170,14 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
|||
return multiPlan;
|
||||
}
|
||||
|
||||
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
|
||||
&multiPlan->planningError);
|
||||
job = RouterModifyJob(originalQuery, query, &multiPlan->planningError);
|
||||
if (multiPlan->planningError != NULL)
|
||||
{
|
||||
return multiPlan;
|
||||
}
|
||||
|
||||
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
|
||||
|
||||
ereport(DEBUG2, (errmsg("Creating router plan")));
|
||||
|
||||
job = RouterQueryJob(originalQuery, task, placementList);
|
||||
multiPlan->workerJob = job;
|
||||
multiPlan->masterQuery = NULL;
|
||||
multiPlan->routerExecutable = true;
|
||||
|
@ -204,8 +204,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
|||
RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
Job *job = NULL;
|
||||
Task *task = NULL;
|
||||
List *placementList = NIL;
|
||||
bool queryRoutable = false;
|
||||
MultiPlan *multiPlan = CitusMakeNode(MultiPlan);
|
||||
|
||||
multiPlan->operation = query->commandType;
|
||||
|
@ -217,16 +216,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
|||
return multiPlan;
|
||||
}
|
||||
|
||||
task = RouterSelectTask(originalQuery, restrictionContext, &placementList);
|
||||
if (task == NULL)
|
||||
job = RouterSelectJob(originalQuery, restrictionContext, &queryRoutable);
|
||||
if (!queryRoutable)
|
||||
{
|
||||
/* query cannot be handled by this planner */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("Creating router plan")));
|
||||
|
||||
job = RouterQueryJob(originalQuery, task, placementList);
|
||||
|
||||
multiPlan->workerJob = job;
|
||||
multiPlan->masterQuery = NULL;
|
||||
multiPlan->routerExecutable = true;
|
||||
|
@ -1015,83 +1013,151 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
|
||||
|
||||
/*
|
||||
* RouterModifyTask builds a Task to represent a modification performed by
|
||||
* RouterModifyJob builds a Job to represent a modification performed by
|
||||
* the provided query against the provided shard interval. This task contains
|
||||
* shard-extended deparsed SQL to be run during execution.
|
||||
*/
|
||||
static Task *
|
||||
RouterModifyTask(Oid distributedTableId, Query *originalQuery,
|
||||
ShardInterval *shardInterval)
|
||||
static Job *
|
||||
RouterModifyJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
||||
{
|
||||
Task *modifyTask = NULL;
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
List *taskList = NIL;
|
||||
Job *job = NULL;
|
||||
bool requiresMasterEvaluation = false;
|
||||
bool deferredPruning = false;
|
||||
|
||||
modifyTask = CitusMakeNode(Task);
|
||||
modifyTask->jobId = INVALID_JOB_ID;
|
||||
modifyTask->taskId = INVALID_TASK_ID;
|
||||
modifyTask->taskType = MODIFY_TASK;
|
||||
modifyTask->queryString = NULL;
|
||||
modifyTask->anchorShardId = INVALID_SHARD_ID;
|
||||
modifyTask->dependedTaskList = NIL;
|
||||
modifyTask->replicationModel = cacheEntry->replicationModel;
|
||||
|
||||
if (originalQuery->onConflict != NULL)
|
||||
if (!CanShardPrune(distributedTableId, query))
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
/* there is a non-constant in the partition column, cannot prune yet */
|
||||
taskList = NIL;
|
||||
deferredPruning = true;
|
||||
|
||||
/* set the flag */
|
||||
modifyTask->upsertQuery = true;
|
||||
|
||||
/* setting an alias simplifies deparsing of UPSERTs */
|
||||
rangeTableEntry = linitial(originalQuery->rtable);
|
||||
if (rangeTableEntry->alias == NULL)
|
||||
/* must evaluate the non-constant in the partition column */
|
||||
requiresMasterEvaluation = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
taskList = RouterModifyTaskList(query, planningError);
|
||||
if (*planningError)
|
||||
{
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
rangeTableEntry->alias = alias;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* determine whether there are function calls to evaluate */
|
||||
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||
}
|
||||
|
||||
if (shardInterval != NULL)
|
||||
if (!requiresMasterEvaluation)
|
||||
{
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
StringInfo queryString = makeStringInfo();
|
||||
|
||||
/* grab shared metadata lock to stop concurrent placement additions */
|
||||
LockShardDistributionMetadata(shardId, ShareLock);
|
||||
|
||||
deparse_shard_query(originalQuery, shardInterval->relationId, shardId,
|
||||
queryString);
|
||||
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
||||
|
||||
modifyTask->queryString = queryString->data;
|
||||
modifyTask->anchorShardId = shardId;
|
||||
/* no functions or parameters, build the query strings upfront */
|
||||
RebuildQueryStrings(originalQuery, taskList);
|
||||
}
|
||||
|
||||
return modifyTask;
|
||||
job = CreateJob(originalQuery);
|
||||
job->taskList = taskList;
|
||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||
job->deferredPruning = deferredPruning;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
||||
* modify command. If no matching shards exist, it throws an error. Otherwise, it
|
||||
* delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the
|
||||
* command type.
|
||||
* CreateJob returns a new Job for the given query.
|
||||
*/
|
||||
static ShardInterval *
|
||||
TargetShardIntervalForModify(Oid distributedTableId, Query *query,
|
||||
DeferredErrorMessage **planningError)
|
||||
static Job *
|
||||
CreateJob(Query *query)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
Job *job = NULL;
|
||||
|
||||
job = CitusMakeNode(Job);
|
||||
job->jobId = INVALID_JOB_ID;
|
||||
job->jobQuery = query;
|
||||
job->taskList = NIL;
|
||||
job->dependedJobList = NIL;
|
||||
job->subqueryPushdown = false;
|
||||
job->requiresMasterEvaluation = false;
|
||||
job->deferredPruning = false;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CanShardPrune determines whether a query is ready for shard pruning
|
||||
* by checking whether there is a constant value in the partition column.
|
||||
*/
|
||||
static bool
|
||||
CanShardPrune(Oid distributedTableId, Query *query)
|
||||
{
|
||||
uint32 rangeTableId = 1;
|
||||
Var *partitionColumn = NULL;
|
||||
Expr *partitionValueExpr = NULL;
|
||||
|
||||
if (query->commandType != CMD_INSERT)
|
||||
{
|
||||
/* we assume UPDATE/DELETE is always prunable */
|
||||
return true;
|
||||
}
|
||||
|
||||
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||
if (partitionColumn == NULL)
|
||||
{
|
||||
/* can always do shard pruning for reference tables */
|
||||
return true;
|
||||
}
|
||||
|
||||
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
|
||||
if (IsA(partitionValueExpr, Const))
|
||||
{
|
||||
/* can do shard pruning if the partition column is constant */
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterModifyTaskList builds a list of tasks for a given query.
|
||||
*/
|
||||
List *
|
||||
RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError)
|
||||
{
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
CmdType commandType = query->commandType;
|
||||
int shardCount = 0;
|
||||
List *taskList = NIL;
|
||||
|
||||
Assert(commandType != CMD_SELECT);
|
||||
ErrorIfNoShardsExist(cacheEntry);
|
||||
|
||||
/* error out if no shards exist for the table */
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
if (commandType == CMD_INSERT)
|
||||
{
|
||||
taskList = RouterInsertTaskList(query, cacheEntry, planningError);
|
||||
}
|
||||
else
|
||||
{
|
||||
taskList = RouterUpdateOrDeleteTaskList(query, cacheEntry, planningError);
|
||||
if (*planningError)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
}
|
||||
|
||||
return taskList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfNoShardsExist throws an error if the given table has no shards.
|
||||
*/
|
||||
static void
|
||||
ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
|
||||
{
|
||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
if (shardCount == 0)
|
||||
{
|
||||
Oid distributedTableId = cacheEntry->relationId;
|
||||
char *relationName = get_rel_name(distributedTableId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
|
@ -1101,17 +1167,110 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query,
|
|||
errhint("Run master_create_worker_shards to create shards "
|
||||
"and try again.")));
|
||||
}
|
||||
}
|
||||
|
||||
if (commandType == CMD_INSERT)
|
||||
|
||||
/*
|
||||
* RouterInsertTaskList generates a list of tasks for performing an INSERT on
|
||||
* a distributed table via the router executor.
|
||||
*/
|
||||
static List *
|
||||
RouterInsertTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
Task *modifyTask = NULL;
|
||||
|
||||
Assert(query->commandType == CMD_INSERT);
|
||||
|
||||
shardInterval = FindShardForInsert(query, cacheEntry, planningError);
|
||||
|
||||
if (*planningError != NULL)
|
||||
{
|
||||
shardInterval = FindShardForInsert(query, planningError);
|
||||
}
|
||||
else
|
||||
{
|
||||
shardInterval = FindShardForUpdateOrDelete(query, planningError);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
return shardInterval;
|
||||
/* an INSERT always routes to exactly one shard */
|
||||
Assert(shardInterval != NULL);
|
||||
|
||||
modifyTask = CreateTask(MODIFY_TASK);
|
||||
modifyTask->anchorShardId = shardInterval->shardId;
|
||||
modifyTask->replicationModel = cacheEntry->replicationModel;
|
||||
|
||||
if (query->onConflict != NULL)
|
||||
{
|
||||
modifyTask->upsertQuery = true;
|
||||
}
|
||||
|
||||
return list_make1(modifyTask);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterUpdateOrDeleteTaskList returns a list of tasks for executing an UPDATE
|
||||
* or DELETE command on a distributed table via the router executor.
|
||||
*/
|
||||
static List *
|
||||
RouterUpdateOrDeleteTaskList(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
ShardInterval *shardInterval = NULL;
|
||||
List *taskList = NIL;
|
||||
|
||||
Assert(query->commandType == CMD_UPDATE || query->commandType == CMD_DELETE);
|
||||
|
||||
shardInterval = FindShardForUpdateOrDelete(query, cacheEntry, planningError);
|
||||
|
||||
if (*planningError != NULL)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (shardInterval != NULL)
|
||||
{
|
||||
Task *modifyTask = NULL;
|
||||
|
||||
modifyTask = CreateTask(MODIFY_TASK);
|
||||
modifyTask->anchorShardId = shardInterval->shardId;
|
||||
modifyTask->replicationModel = cacheEntry->replicationModel;
|
||||
|
||||
taskList = lappend(taskList, modifyTask);
|
||||
}
|
||||
|
||||
return taskList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTask returns a new Task with the given type.
|
||||
*/
|
||||
static Task *
|
||||
CreateTask(TaskType taskType)
|
||||
{
|
||||
Task *task = NULL;
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->taskType = taskType;
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
task->taskId = INVALID_TASK_ID;
|
||||
task->queryString = NULL;
|
||||
task->anchorShardId = INVALID_SHARD_ID;
|
||||
task->taskPlacementList = NIL;
|
||||
task->dependedTaskList = NIL;
|
||||
|
||||
task->partitionId = 0;
|
||||
task->upstreamTaskId = INVALID_TASK_ID;
|
||||
task->shardInterval = NULL;
|
||||
task->assignmentConstrained = false;
|
||||
task->shardId = INVALID_SHARD_ID;
|
||||
task->taskExecution = NULL;
|
||||
task->upsertQuery = false;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
|
||||
task->insertSelectQuery = false;
|
||||
task->relationShardList = NIL;
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1121,45 +1280,42 @@ TargetShardIntervalForModify(Oid distributedTableId, Query *query,
|
|||
* evaluated. If the partition column value falls within 0 or multiple
|
||||
* (overlapping) shards, the planningError is set.
|
||||
*/
|
||||
ShardInterval *
|
||||
FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||
static ShardInterval *
|
||||
FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
Oid distributedTableId = cacheEntry->relationId;
|
||||
char partitionMethod = cacheEntry->partitionMethod;
|
||||
uint32 rangeTableId = 1;
|
||||
Var *partitionColumn = NULL;
|
||||
Expr *partitionValueExpr = NULL;
|
||||
Const *partitionValueConst = NULL;
|
||||
List *shardIntervalList = NIL;
|
||||
List *prunedShardList = NIL;
|
||||
int prunedShardCount = 0;
|
||||
List *prunedShardList = NIL;
|
||||
|
||||
Assert(query->commandType == CMD_INSERT);
|
||||
|
||||
/* reference tables can only have one shard */
|
||||
/* reference tables do not have a partition column, but can only have one shard */
|
||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
int shardCount = 0;
|
||||
|
||||
shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||
shardCount = list_length(shardIntervalList);
|
||||
|
||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
if (shardCount != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
|
||||
}
|
||||
|
||||
return (ShardInterval *) linitial(shardIntervalList);
|
||||
return cacheEntry->sortedShardIntervalArray[0];
|
||||
}
|
||||
|
||||
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||
|
||||
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
|
||||
|
||||
/* non-constants should have been caught by CanShardPrune */
|
||||
if (!IsA(partitionValueExpr, Const))
|
||||
{
|
||||
/* shard pruning not possible right now */
|
||||
return NULL;
|
||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||
errmsg("cannot perform an INSERT with a non-constant in the "
|
||||
"partition column")));
|
||||
}
|
||||
|
||||
partitionValueConst = (Const *) partitionValueExpr;
|
||||
|
@ -1173,7 +1329,6 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
Datum partitionValue = partitionValueConst->constvalue;
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
||||
|
||||
if (shardInterval != NULL)
|
||||
|
@ -1251,10 +1406,10 @@ FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
|
|||
* needs to be applied to multiple or no shards.
|
||||
*/
|
||||
static ShardInterval *
|
||||
FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
||||
FindShardForUpdateOrDelete(Query *query, DistTableCacheEntry *cacheEntry,
|
||||
DeferredErrorMessage **planningError)
|
||||
{
|
||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||
Oid distributedTableId = cacheEntry->relationId;
|
||||
char partitionMethod = cacheEntry->partitionMethod;
|
||||
CmdType commandType = query->commandType;
|
||||
List *restrictClauseList = NIL;
|
||||
|
@ -1268,7 +1423,7 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
|||
prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList);
|
||||
|
||||
prunedShardCount = list_length(prunedShardList);
|
||||
if (prunedShardCount != 1)
|
||||
if (prunedShardCount > 1)
|
||||
{
|
||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
||||
|
@ -1276,7 +1431,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
|||
StringInfo errorMessage = makeStringInfo();
|
||||
StringInfo errorHint = makeStringInfo();
|
||||
const char *commandName = NULL;
|
||||
const char *targetCountType = NULL;
|
||||
|
||||
if (commandType == CMD_UPDATE)
|
||||
{
|
||||
|
@ -1287,15 +1441,6 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
|||
commandName = "DELETE";
|
||||
}
|
||||
|
||||
if (prunedShardCount == 0)
|
||||
{
|
||||
targetCountType = "no";
|
||||
}
|
||||
else
|
||||
{
|
||||
targetCountType = "multiple";
|
||||
}
|
||||
|
||||
appendStringInfo(errorHint, "Consider using an equality filter on "
|
||||
"partition column \"%s\" to target a "
|
||||
"single shard. If you'd like to run a "
|
||||
|
@ -1310,8 +1455,9 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
|||
"all shards satisfying delete criteria.");
|
||||
}
|
||||
|
||||
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
|
||||
commandName, targetCountType);
|
||||
appendStringInfo(errorMessage,
|
||||
"cannot run %s command which targets multiple shards",
|
||||
commandName);
|
||||
|
||||
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
errorMessage->data, NULL,
|
||||
|
@ -1319,6 +1465,10 @@ FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
|||
|
||||
return NULL;
|
||||
}
|
||||
else if (prunedShardCount == 0)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return (ShardInterval *) linitial(prunedShardList);
|
||||
}
|
||||
|
@ -1404,16 +1554,17 @@ ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
|
|||
}
|
||||
|
||||
|
||||
/* RouterSelectTask builds a Task to represent a single shard select query */
|
||||
static Task *
|
||||
RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
||||
List **placementList)
|
||||
/* RouterSelectJob builds a Job to represent a single shard select query */
|
||||
static Job *
|
||||
RouterSelectJob(Query *originalQuery, RelationRestrictionContext *restrictionContext,
|
||||
bool *returnQueryRoutable)
|
||||
{
|
||||
Job *job = NULL;
|
||||
Task *task = NULL;
|
||||
bool queryRoutable = false;
|
||||
StringInfo queryString = makeStringInfo();
|
||||
bool upsertQuery = false;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *placementList = NIL;
|
||||
List *relationShardList = NIL;
|
||||
bool replacePrunedQueryWithDummy = false;
|
||||
|
||||
|
@ -1421,29 +1572,31 @@ RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionCo
|
|||
replacePrunedQueryWithDummy = true;
|
||||
|
||||
queryRoutable = RouterSelectQuery(originalQuery, restrictionContext,
|
||||
placementList, &shardId, &relationShardList,
|
||||
&placementList, &shardId, &relationShardList,
|
||||
replacePrunedQueryWithDummy);
|
||||
|
||||
|
||||
if (!queryRoutable)
|
||||
{
|
||||
*returnQueryRoutable = false;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
job = CreateJob(originalQuery);
|
||||
|
||||
pg_get_query_def(originalQuery, queryString);
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
task->taskId = INVALID_TASK_ID;
|
||||
task->taskType = ROUTER_TASK;
|
||||
task = CreateTask(ROUTER_TASK);
|
||||
task->queryString = queryString->data;
|
||||
task->anchorShardId = shardId;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
task->dependedTaskList = NIL;
|
||||
task->upsertQuery = upsertQuery;
|
||||
task->taskPlacementList = placementList;
|
||||
task->relationShardList = relationShardList;
|
||||
|
||||
return task;
|
||||
job->taskList = list_make1(task);
|
||||
|
||||
*returnQueryRoutable = true;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1771,67 +1924,6 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterQueryJob creates a Job for the specified query to execute the
|
||||
* provided single shard select task.
|
||||
*/
|
||||
static Job *
|
||||
RouterQueryJob(Query *query, Task *task, List *placementList)
|
||||
{
|
||||
Job *job = NULL;
|
||||
List *taskList = NIL;
|
||||
TaskType taskType = task->taskType;
|
||||
bool requiresMasterEvaluation = false;
|
||||
bool deferredPruning = false;
|
||||
|
||||
if (taskType == MODIFY_TASK)
|
||||
{
|
||||
if (task->anchorShardId != INVALID_SHARD_ID)
|
||||
{
|
||||
/*
|
||||
* We were able to assign a shard ID. Generate task
|
||||
* placement list using the first-replica assignment
|
||||
* policy (modify placements in placement ID order).
|
||||
*/
|
||||
taskList = FirstReplicaAssignTaskList(list_make1(task));
|
||||
requiresMasterEvaluation = RequiresMasterEvaluation(query);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We were unable to assign a shard ID yet, meaning
|
||||
* the partition column value is an expression.
|
||||
*/
|
||||
taskList = list_make1(task);
|
||||
requiresMasterEvaluation = true;
|
||||
deferredPruning = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* For selects we get the placement list during shard
|
||||
* pruning.
|
||||
*/
|
||||
Assert(placementList != NIL);
|
||||
|
||||
task->taskPlacementList = placementList;
|
||||
taskList = list_make1(task);
|
||||
}
|
||||
|
||||
job = CitusMakeNode(Job);
|
||||
job->dependedJobList = NIL;
|
||||
job->jobId = INVALID_JOB_ID;
|
||||
job->subqueryPushdown = false;
|
||||
job->jobQuery = query;
|
||||
job->taskList = taskList;
|
||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||
job->deferredPruning = deferredPruning;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MultiRouterPlannableQuery returns true if given query can be router plannable.
|
||||
* The query is router plannable if it is a modify query, or if its is a select
|
||||
|
|
|
@ -46,8 +46,7 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
|||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||
ShardInterval *shardInterval);
|
||||
extern ShardInterval * FindShardForInsert(Query *query,
|
||||
DeferredErrorMessage **planningError);
|
||||
extern List * RouterModifyTaskList(Query *query, DeferredErrorMessage **planningError);
|
||||
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -334,6 +334,21 @@ Custom Scan (Citus Router)
|
|||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
||||
Index Cond: (l_orderkey = 1)
|
||||
Filter: (l_partkey = 0)
|
||||
-- Test zero-shard update
|
||||
EXPLAIN (COSTS FALSE)
|
||||
UPDATE lineitem
|
||||
SET l_suppkey = 12
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
Custom Scan (Citus Router)
|
||||
Task Count: 0
|
||||
Tasks Shown: All
|
||||
-- Test zero-shard delete
|
||||
EXPLAIN (COSTS FALSE)
|
||||
DELETE FROM lineitem
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
Custom Scan (Citus Router)
|
||||
Task Count: 0
|
||||
Tasks Shown: All
|
||||
-- Test single-shard SELECT
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
|
||||
|
|
|
@ -334,6 +334,21 @@ Custom Scan (Citus Router)
|
|||
-> Index Scan using lineitem_pkey_290000 on lineitem_290000
|
||||
Index Cond: (l_orderkey = 1)
|
||||
Filter: (l_partkey = 0)
|
||||
-- Test zero-shard update
|
||||
EXPLAIN (COSTS FALSE)
|
||||
UPDATE lineitem
|
||||
SET l_suppkey = 12
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
Custom Scan (Citus Router)
|
||||
Task Count: 0
|
||||
Tasks Shown: All
|
||||
-- Test zero-shard delete
|
||||
EXPLAIN (COSTS FALSE)
|
||||
DELETE FROM lineitem
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
Custom Scan (Citus Router)
|
||||
Task Count: 0
|
||||
Tasks Shown: All
|
||||
-- Test single-shard SELECT
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
|
||||
|
|
|
@ -89,13 +89,10 @@ INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
|
|||
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
|
||||
-- insert a single row for the test
|
||||
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
|
||||
-- zero-shard modifications should fail
|
||||
-- zero-shard modifications should succeed
|
||||
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
|
||||
ERROR: cannot run UPDATE command which targets no shards
|
||||
HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
||||
UPDATE articles SET title = '' WHERE 0 = 1;
|
||||
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
|
||||
ERROR: cannot run DELETE command which targets no shards
|
||||
HINT: Consider using an equality filter on partition column "author_id" to target a single shard. If you'd like to run a multi-shard operation, use master_modify_multiple_shards().
|
||||
-- single-shard tests
|
||||
-- test simple select for a single row
|
||||
SELECT * FROM articles WHERE author_id = 10 AND id = 50;
|
||||
|
|
|
@ -100,6 +100,17 @@ EXPLAIN (COSTS FALSE)
|
|||
DELETE FROM lineitem
|
||||
WHERE l_orderkey = 1 AND l_partkey = 0;
|
||||
|
||||
-- Test zero-shard update
|
||||
EXPLAIN (COSTS FALSE)
|
||||
UPDATE lineitem
|
||||
SET l_suppkey = 12
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
|
||||
-- Test zero-shard delete
|
||||
EXPLAIN (COSTS FALSE)
|
||||
DELETE FROM lineitem
|
||||
WHERE l_orderkey = 1 AND l_orderkey = 0;
|
||||
|
||||
-- Test single-shard SELECT
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
|
||||
|
|
|
@ -80,8 +80,9 @@ INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
|
|||
-- insert a single row for the test
|
||||
INSERT INTO articles_single_shard VALUES (50, 10, 'anjanette', 19519);
|
||||
|
||||
-- zero-shard modifications should fail
|
||||
-- zero-shard modifications should succeed
|
||||
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
|
||||
UPDATE articles SET title = '' WHERE 0 = 1;
|
||||
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
|
||||
|
||||
-- single-shard tests
|
||||
|
|
Loading…
Reference in New Issue