mirror of https://github.com/citusdata/citus.git
Merge pull request #1302 from citusdata/serial_partition_column
Support expressions in the partition column in INSERTspull/1294/head
commit
c8fec3be1b
|
@ -74,6 +74,7 @@ bool EnableDeadlockPrevention = true;
|
||||||
|
|
||||||
/* functions needed during run phase */
|
/* functions needed during run phase */
|
||||||
static void ReacquireMetadataLocks(List *taskList);
|
static void ReacquireMetadataLocks(List *taskList);
|
||||||
|
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
|
||||||
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||||
bool expectResults);
|
bool expectResults);
|
||||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||||
|
@ -87,7 +88,6 @@ static List * TaskShardIntervalList(List *taskList);
|
||||||
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
|
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
|
||||||
static void AcquireExecutorMultiShardLocks(List *taskList);
|
static void AcquireExecutorMultiShardLocks(List *taskList);
|
||||||
static bool RequiresConsistentSnapshot(Task *task);
|
static bool RequiresConsistentSnapshot(Task *task);
|
||||||
static void ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState);
|
|
||||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
|
@ -406,7 +406,11 @@ RequiresConsistentSnapshot(Task *task)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusModifyBeginScan checks the validity of the given custom scan node and
|
* CitusModifyBeginScan first evaluates expressions in the query and then
|
||||||
|
* performs shard pruning in case the partition column in an insert was
|
||||||
|
* defined as a function call.
|
||||||
|
*
|
||||||
|
* The function also checks the validity of the given custom scan node and
|
||||||
* gets locks on the shards involved in the task list of the distributed plan.
|
* gets locks on the shards involved in the task list of the distributed plan.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
|
@ -415,7 +419,23 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
CitusScanState *scanState = (CitusScanState *) node;
|
CitusScanState *scanState = (CitusScanState *) node;
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
|
bool deferredPruning = workerJob->deferredPruning;
|
||||||
|
|
||||||
|
if (workerJob->requiresMasterEvaluation)
|
||||||
|
{
|
||||||
|
PlanState *planState = &(scanState->customScanState.ss.ps);
|
||||||
|
|
||||||
|
ExecuteMasterEvaluableFunctions(jobQuery, planState);
|
||||||
|
|
||||||
|
if (deferredPruning)
|
||||||
|
{
|
||||||
|
AssignInsertTaskShardId(jobQuery, taskList);
|
||||||
|
}
|
||||||
|
|
||||||
|
RebuildQueryStrings(jobQuery, taskList);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are executing a prepared statement, then we may not yet have obtained
|
* If we are executing a prepared statement, then we may not yet have obtained
|
||||||
|
@ -428,6 +448,53 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
* locks as early as possible.
|
* locks as early as possible.
|
||||||
*/
|
*/
|
||||||
ReacquireMetadataLocks(taskList);
|
ReacquireMetadataLocks(taskList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we deferred shard pruning to the executor, then we still need to assign
|
||||||
|
* shard placements. We do this after acquiring the metadata locks to ensure
|
||||||
|
* we can't get stale metadata. At some point, we may want to load all
|
||||||
|
* placement metadata here.
|
||||||
|
*/
|
||||||
|
if (deferredPruning)
|
||||||
|
{
|
||||||
|
/* modify tasks are always assigned using first-replica policy */
|
||||||
|
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AssignInsertTaskShardId performs shard pruning for an insert and sets
|
||||||
|
* anchorShardId accordingly.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AssignInsertTaskShardId(Query *jobQuery, List *taskList)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
Task *insertTask = NULL;
|
||||||
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
|
Assert(jobQuery->commandType == CMD_INSERT);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We skipped shard pruning in the planner because the partition
|
||||||
|
* column contained an expression. Perform shard pruning now.
|
||||||
|
*/
|
||||||
|
shardInterval = FindShardForInsert(jobQuery, &planningError);
|
||||||
|
if (planningError != NULL)
|
||||||
|
{
|
||||||
|
RaiseDeferredError(planningError, ERROR);
|
||||||
|
}
|
||||||
|
else if (shardInterval == NULL)
|
||||||
|
{
|
||||||
|
/* expression could not be evaluated */
|
||||||
|
ereport(ERROR, (errmsg("parameters in the partition column are not "
|
||||||
|
"allowed")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* assign a shard ID to the task */
|
||||||
|
insertTask = (Task *) linitial(taskList);
|
||||||
|
insertTask->anchorShardId = shardInterval->shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -443,15 +510,12 @@ RouterSingleModifyExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
PlanState *planState = &(scanState->customScanState.ss.ps);
|
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
bool hasReturning = multiPlan->hasReturning;
|
bool hasReturning = multiPlan->hasReturning;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
Task *task = (Task *) linitial(taskList);
|
Task *task = (Task *) linitial(taskList);
|
||||||
|
|
||||||
ProcessMasterEvaluableFunctions(workerJob, planState);
|
|
||||||
|
|
||||||
ExecuteSingleModifyTask(scanState, task, hasReturning);
|
ExecuteSingleModifyTask(scanState, task, hasReturning);
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
|
@ -463,24 +527,6 @@ RouterSingleModifyExecScan(CustomScanState *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds
|
|
||||||
* the query strings in task lists.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState)
|
|
||||||
{
|
|
||||||
if (workerJob->requiresMasterEvaluation)
|
|
||||||
{
|
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
|
||||||
List *taskList = workerJob->taskList;
|
|
||||||
|
|
||||||
ExecuteMasterEvaluableFunctions(jobQuery, planState);
|
|
||||||
RebuildQueryStrings(jobQuery, taskList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
|
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
|
||||||
* the results and, if RETURNING is used, stores them in custom scan's tuple store.
|
* the results and, if RETURNING is used, stores them in custom scan's tuple store.
|
||||||
|
@ -494,15 +540,12 @@ RouterMultiModifyExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
PlanState *planState = &(scanState->customScanState.ss.ps);
|
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
bool hasReturning = multiPlan->hasReturning;
|
bool hasReturning = multiPlan->hasReturning;
|
||||||
bool isModificationQuery = true;
|
bool isModificationQuery = true;
|
||||||
|
|
||||||
ProcessMasterEvaluableFunctions(workerJob, planState);
|
|
||||||
|
|
||||||
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
|
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
|
@ -527,14 +570,11 @@ RouterSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
PlanState *planState = &(scanState->customScanState.ss.ps);
|
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
Task *task = (Task *) linitial(taskList);
|
Task *task = (Task *) linitial(taskList);
|
||||||
|
|
||||||
ProcessMasterEvaluableFunctions(workerJob, planState);
|
|
||||||
|
|
||||||
ExecuteSingleSelectTask(scanState, task);
|
ExecuteSingleSelectTask(scanState, task);
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
|
|
|
@ -101,12 +101,14 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
||||||
static char MostPermissiveVolatileFlag(char left, char right);
|
static char MostPermissiveVolatileFlag(char left, char right);
|
||||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||||
FromExpr *joinTree);
|
FromExpr *joinTree);
|
||||||
static Task * RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval);
|
static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery,
|
||||||
static ShardInterval * TargetShardIntervalForModify(Query *query,
|
ShardInterval *shardInterval);
|
||||||
|
static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
static List * QueryRestrictList(Query *query);
|
static ShardInterval * FindShardForUpdateOrDelete(Query *query,
|
||||||
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
|
DeferredErrorMessage **planningError);
|
||||||
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
static List * QueryRestrictList(Query *query, char partitionMethod);
|
||||||
|
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
|
||||||
static Task * RouterSelectTask(Query *originalQuery,
|
static Task * RouterSelectTask(Query *originalQuery,
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *restrictionContext,
|
||||||
List **placementList);
|
List **placementList);
|
||||||
|
@ -215,6 +217,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
|
|
||||||
if (modifyTask)
|
if (modifyTask)
|
||||||
{
|
{
|
||||||
|
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
|
||||||
ShardInterval *targetShardInterval = NULL;
|
ShardInterval *targetShardInterval = NULL;
|
||||||
DeferredErrorMessage *planningError = NULL;
|
DeferredErrorMessage *planningError = NULL;
|
||||||
|
|
||||||
|
@ -226,14 +229,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
targetShardInterval = TargetShardIntervalForModify(query, &planningError);
|
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
|
||||||
|
&planningError);
|
||||||
if (planningError != NULL)
|
if (planningError != NULL)
|
||||||
{
|
{
|
||||||
multiPlan->planningError = planningError;
|
multiPlan->planningError = planningError;
|
||||||
return multiPlan;
|
return multiPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
task = RouterModifyTask(originalQuery, targetShardInterval);
|
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
|
||||||
Assert(task);
|
Assert(task);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1373,15 +1377,6 @@ ModifyQuerySupported(Query *queryTree)
|
||||||
specifiesPartitionValue = true;
|
specifiesPartitionValue = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_INSERT && targetEntryPartitionColumn &&
|
|
||||||
!IsA(targetEntry->expr, Const))
|
|
||||||
{
|
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"values given for the partition column must be"
|
|
||||||
" constants or constant expressions",
|
|
||||||
NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commandType == CMD_UPDATE &&
|
if (commandType == CMD_UPDATE &&
|
||||||
MasterIrreducibleExpression((Node *) targetEntry->expr,
|
MasterIrreducibleExpression((Node *) targetEntry->expr,
|
||||||
&hasVarArgument, &hasBadCoalesce))
|
&hasVarArgument, &hasBadCoalesce))
|
||||||
|
@ -1826,24 +1821,27 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
||||||
* shard-extended deparsed SQL to be run during execution.
|
* shard-extended deparsed SQL to be run during execution.
|
||||||
*/
|
*/
|
||||||
static Task *
|
static Task *
|
||||||
RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
|
RouterModifyTask(Oid distributedTableId, Query *originalQuery,
|
||||||
|
ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
|
||||||
StringInfo queryString = makeStringInfo();
|
|
||||||
Task *modifyTask = NULL;
|
Task *modifyTask = NULL;
|
||||||
bool upsertQuery = false;
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
/* grab shared metadata lock to stop concurrent placement additions */
|
modifyTask = CitusMakeNode(Task);
|
||||||
LockShardDistributionMetadata(shardId, ShareLock);
|
modifyTask->jobId = INVALID_JOB_ID;
|
||||||
|
modifyTask->taskId = INVALID_TASK_ID;
|
||||||
|
modifyTask->taskType = MODIFY_TASK;
|
||||||
|
modifyTask->queryString = NULL;
|
||||||
|
modifyTask->anchorShardId = INVALID_SHARD_ID;
|
||||||
|
modifyTask->dependedTaskList = NIL;
|
||||||
|
modifyTask->replicationModel = cacheEntry->replicationModel;
|
||||||
|
|
||||||
if (originalQuery->onConflict != NULL)
|
if (originalQuery->onConflict != NULL)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = NULL;
|
RangeTblEntry *rangeTableEntry = NULL;
|
||||||
|
|
||||||
/* set the flag */
|
/* set the flag */
|
||||||
upsertQuery = true;
|
modifyTask->upsertQuery = true;
|
||||||
|
|
||||||
/* setting an alias simplifies deparsing of UPSERTs */
|
/* setting an alias simplifies deparsing of UPSERTs */
|
||||||
rangeTableEntry = linitial(originalQuery->rtable);
|
rangeTableEntry = linitial(originalQuery->rtable);
|
||||||
|
@ -1854,18 +1852,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString);
|
if (shardInterval != NULL)
|
||||||
|
{
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
StringInfo queryString = makeStringInfo();
|
||||||
|
|
||||||
|
/* grab shared metadata lock to stop concurrent placement additions */
|
||||||
|
LockShardDistributionMetadata(shardId, ShareLock);
|
||||||
|
|
||||||
|
deparse_shard_query(originalQuery, shardInterval->relationId, shardId,
|
||||||
|
queryString);
|
||||||
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
|
||||||
|
|
||||||
modifyTask = CitusMakeNode(Task);
|
|
||||||
modifyTask->jobId = INVALID_JOB_ID;
|
|
||||||
modifyTask->taskId = INVALID_TASK_ID;
|
|
||||||
modifyTask->taskType = MODIFY_TASK;
|
|
||||||
modifyTask->queryString = queryString->data;
|
modifyTask->queryString = queryString->data;
|
||||||
modifyTask->anchorShardId = shardId;
|
modifyTask->anchorShardId = shardId;
|
||||||
modifyTask->dependedTaskList = NIL;
|
}
|
||||||
modifyTask->upsertQuery = upsertQuery;
|
|
||||||
modifyTask->replicationModel = cacheEntry->replicationModel;
|
|
||||||
|
|
||||||
return modifyTask;
|
return modifyTask;
|
||||||
}
|
}
|
||||||
|
@ -1873,40 +1874,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
* TargetShardIntervalForModify determines the single shard targeted by a provided
|
||||||
* modify command. If no matching shards exist, it throws an error. If the modification
|
* modify command. If no matching shards exist, it throws an error. Otherwise, it
|
||||||
* targets more than one shard, this function sets the deferred error and returns NULL,
|
* delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the
|
||||||
* to handle cases in which we cannot prune down to one shard due to a parameter.
|
* command type.
|
||||||
*/
|
*/
|
||||||
static ShardInterval *
|
static ShardInterval *
|
||||||
TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
TargetShardIntervalForModify(Oid distributedTableId, Query *query,
|
||||||
|
DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
List *prunedShardList = NIL;
|
ShardInterval *shardInterval = NULL;
|
||||||
int prunedShardCount = 0;
|
|
||||||
int shardCount = 0;
|
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
bool fastShardPruningPossible = false;
|
|
||||||
CmdType commandType = query->commandType;
|
CmdType commandType = query->commandType;
|
||||||
const char *commandName = NULL;
|
int shardCount = 0;
|
||||||
|
|
||||||
Assert(commandType != CMD_SELECT);
|
Assert(commandType != CMD_SELECT);
|
||||||
|
|
||||||
|
|
||||||
if (commandType == CMD_INSERT)
|
|
||||||
{
|
|
||||||
commandName = "INSERT";
|
|
||||||
}
|
|
||||||
else if (commandType == CMD_UPDATE)
|
|
||||||
{
|
|
||||||
commandName = "UPDATE";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
Assert(commandType == CMD_DELETE);
|
|
||||||
commandName = "DELETE";
|
|
||||||
}
|
|
||||||
|
|
||||||
/* error out if no shards exist for the table */
|
/* error out if no shards exist for the table */
|
||||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
if (shardCount == 0)
|
if (shardCount == 0)
|
||||||
|
@ -1921,28 +1903,101 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||||
"and try again.")));
|
"and try again.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
fastShardPruningPossible = FastShardPruningPossible(query->commandType,
|
if (commandType == CMD_INSERT)
|
||||||
partitionMethod);
|
|
||||||
if (fastShardPruningPossible)
|
|
||||||
{
|
{
|
||||||
|
shardInterval = FindShardForInsert(query, planningError);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
shardInterval = FindShardForUpdateOrDelete(query, planningError);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindShardForInsert returns the shard interval for an INSERT query or NULL if
|
||||||
|
* the partition column value is defined as an expression that still needs to be
|
||||||
|
* evaluated. If the partition column value falls within 0 or multiple
|
||||||
|
* (overlapping) shards, the planningError is set.
|
||||||
|
*/
|
||||||
|
ShardInterval *
|
||||||
|
FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
|
{
|
||||||
|
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
Var *partitionColumn = NULL;
|
||||||
Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn);
|
Expr *partitionValueExpr = NULL;
|
||||||
|
Const *partitionValueConst = NULL;
|
||||||
|
List *shardIntervalList = NIL;
|
||||||
|
List *prunedShardList = NIL;
|
||||||
|
int prunedShardCount = 0;
|
||||||
|
|
||||||
|
Assert(query->commandType == CMD_INSERT);
|
||||||
|
|
||||||
|
/* reference tables can only have one shard */
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
int shardCount = 0;
|
||||||
|
|
||||||
|
shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||||
|
shardCount = list_length(shardIntervalList);
|
||||||
|
|
||||||
|
if (shardCount != 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return (ShardInterval *) linitial(shardIntervalList);
|
||||||
|
}
|
||||||
|
|
||||||
|
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||||
|
|
||||||
|
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
|
||||||
|
if (!IsA(partitionValueExpr, Const))
|
||||||
|
{
|
||||||
|
/* shard pruning not possible right now */
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
partitionValueConst = (Const *) partitionValueExpr;
|
||||||
|
if (partitionValueConst->constisnull)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
errmsg("cannot perform an INSERT with NULL in the partition "
|
||||||
|
"column")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||||
|
{
|
||||||
Datum partitionValue = partitionValueConst->constvalue;
|
Datum partitionValue = partitionValueConst->constvalue;
|
||||||
ShardInterval *shardInterval = FastShardPruning(distributedTableId,
|
ShardInterval *shardInterval = FastShardPruning(distributedTableId,
|
||||||
partitionValue);
|
partitionValue);
|
||||||
|
|
||||||
if (shardInterval != NULL)
|
if (shardInterval != NULL)
|
||||||
{
|
{
|
||||||
prunedShardList = lappend(prunedShardList, shardInterval);
|
prunedShardList = list_make1(shardInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
List *restrictClauseList = QueryRestrictList(query);
|
List *restrictClauseList = NIL;
|
||||||
Index tableId = 1;
|
Index tableId = 1;
|
||||||
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber);
|
||||||
|
Node *rightOp = get_rightop((Expr *) equalityExpr);
|
||||||
|
Const *rightConst = (Const *) rightOp;
|
||||||
|
|
||||||
|
Assert(IsA(rightOp, Const));
|
||||||
|
|
||||||
|
rightConst->constvalue = partitionValueConst->constvalue;
|
||||||
|
rightConst->constisnull = partitionValueConst->constisnull;
|
||||||
|
rightConst->constbyval = partitionValueConst->constbyval;
|
||||||
|
|
||||||
|
restrictClauseList = list_make1(equalityExpr);
|
||||||
|
|
||||||
|
shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||||
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
|
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
|
||||||
shardIntervalList);
|
shardIntervalList);
|
||||||
}
|
}
|
||||||
|
@ -1950,13 +2005,12 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||||
prunedShardCount = list_length(prunedShardList);
|
prunedShardCount = list_length(prunedShardList);
|
||||||
if (prunedShardCount != 1)
|
if (prunedShardCount != 1)
|
||||||
{
|
{
|
||||||
Oid relationId = cacheEntry->relationId;
|
|
||||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||||
char *partitionColumnName = ColumnNameToColumn(relationId, partitionKeyString);
|
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
||||||
|
partitionKeyString);
|
||||||
StringInfo errorMessage = makeStringInfo();
|
StringInfo errorMessage = makeStringInfo();
|
||||||
StringInfo errorHint = makeStringInfo();
|
StringInfo errorHint = makeStringInfo();
|
||||||
const char *targetCountType = NULL;
|
const char *targetCountType = NULL;
|
||||||
bool showHint = false;
|
|
||||||
|
|
||||||
if (prunedShardCount == 0)
|
if (prunedShardCount == 0)
|
||||||
{
|
{
|
||||||
|
@ -1967,42 +2021,24 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||||
targetCountType = "multiple";
|
targetCountType = "multiple";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_INSERT && prunedShardCount == 0)
|
if (prunedShardCount == 0)
|
||||||
{
|
{
|
||||||
appendStringInfo(errorHint, "Make sure you have created a shard which "
|
appendStringInfo(errorHint, "Make sure you have created a shard which "
|
||||||
"can receive this partition column value.");
|
"can receive this partition column value.");
|
||||||
}
|
}
|
||||||
else if (commandType == CMD_INSERT)
|
else
|
||||||
{
|
{
|
||||||
appendStringInfo(errorHint, "Make sure the value for partition column "
|
appendStringInfo(errorHint, "Make sure the value for partition column "
|
||||||
"\"%s\" falls into a single shard.",
|
"\"%s\" falls into a single shard.",
|
||||||
partitionColumnName);
|
partitionColumnName);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
appendStringInfo(errorHint, "Consider using an equality filter on "
|
|
||||||
"partition column \"%s\" to target a "
|
|
||||||
"single shard. If you'd like to run a "
|
|
||||||
"multi-shard operation, use "
|
|
||||||
"master_modify_multiple_shards().",
|
|
||||||
partitionColumnName);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND)
|
appendStringInfo(errorMessage, "cannot run INSERT command which targets %s "
|
||||||
{
|
"shards", targetCountType);
|
||||||
appendStringInfo(errorHint, " You can also use "
|
|
||||||
"master_apply_delete_command() to drop "
|
|
||||||
"all shards satisfying delete criteria.");
|
|
||||||
}
|
|
||||||
|
|
||||||
showHint = errorHint->len > 0;
|
|
||||||
|
|
||||||
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
|
|
||||||
commandName, targetCountType);
|
|
||||||
|
|
||||||
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
errorMessage->data, NULL,
|
errorMessage->data, NULL,
|
||||||
showHint ? errorHint->data : NULL);
|
errorHint->data);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2011,29 +2047,6 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UseFastShardPruning returns true if the commandType is INSERT and partition method
|
|
||||||
* is hash or range.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
FastShardPruningPossible(CmdType commandType, char partitionMethod)
|
|
||||||
{
|
|
||||||
/* we currently only support INSERTs */
|
|
||||||
if (commandType != CMD_INSERT)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* fast shard pruning is only supported for hash and range partitioned tables */
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FastShardPruning is a higher level API for FindShardInterval function. Given the
|
* FastShardPruning is a higher level API for FindShardInterval function. Given the
|
||||||
* relationId of the distributed table and partitionValue, FastShardPruning function finds
|
* relationId of the distributed table and partitionValue, FastShardPruning function finds
|
||||||
|
@ -2078,6 +2091,89 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindShardForUpdateOrDelete finds the shard interval in which an UPDATE or
|
||||||
|
* DELETE command should be applied, or sets planningError when the query
|
||||||
|
* needs to be applied to multiple or no shards.
|
||||||
|
*/
|
||||||
|
static ShardInterval *
|
||||||
|
FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
|
||||||
|
{
|
||||||
|
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
CmdType commandType = query->commandType;
|
||||||
|
List *shardIntervalList = NIL;
|
||||||
|
List *restrictClauseList = NIL;
|
||||||
|
Index tableId = 1;
|
||||||
|
List *prunedShardList = NIL;
|
||||||
|
int prunedShardCount = 0;
|
||||||
|
|
||||||
|
Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE);
|
||||||
|
|
||||||
|
shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||||
|
|
||||||
|
restrictClauseList = QueryRestrictList(query, partitionMethod);
|
||||||
|
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
|
||||||
|
shardIntervalList);
|
||||||
|
|
||||||
|
prunedShardCount = list_length(prunedShardList);
|
||||||
|
if (prunedShardCount != 1)
|
||||||
|
{
|
||||||
|
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||||
|
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
|
||||||
|
partitionKeyString);
|
||||||
|
StringInfo errorMessage = makeStringInfo();
|
||||||
|
StringInfo errorHint = makeStringInfo();
|
||||||
|
const char *commandName = NULL;
|
||||||
|
const char *targetCountType = NULL;
|
||||||
|
|
||||||
|
if (commandType == CMD_UPDATE)
|
||||||
|
{
|
||||||
|
commandName = "UPDATE";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
commandName = "DELETE";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (prunedShardCount == 0)
|
||||||
|
{
|
||||||
|
targetCountType = "no";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
targetCountType = "multiple";
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(errorHint, "Consider using an equality filter on "
|
||||||
|
"partition column \"%s\" to target a "
|
||||||
|
"single shard. If you'd like to run a "
|
||||||
|
"multi-shard operation, use "
|
||||||
|
"master_modify_multiple_shards().",
|
||||||
|
partitionColumnName);
|
||||||
|
|
||||||
|
if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||||
|
{
|
||||||
|
appendStringInfo(errorHint, " You can also use "
|
||||||
|
"master_apply_delete_command() to drop "
|
||||||
|
"all shards satisfying delete criteria.");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
|
||||||
|
commandName, targetCountType);
|
||||||
|
|
||||||
|
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
errorMessage->data, NULL,
|
||||||
|
errorHint->data);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (ShardInterval *) linitial(prunedShardList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* QueryRestrictList returns the restriction clauses for the query. For a SELECT
|
* QueryRestrictList returns the restriction clauses for the query. For a SELECT
|
||||||
* statement these are the where-clause expressions. For INSERT statements we
|
* statement these are the where-clause expressions. For INSERT statements we
|
||||||
|
@ -2088,12 +2184,9 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue)
|
||||||
* NIL for reference tables.
|
* NIL for reference tables.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
QueryRestrictList(Query *query)
|
QueryRestrictList(Query *query, char partitionMethod)
|
||||||
{
|
{
|
||||||
List *queryRestrictList = NIL;
|
List *queryRestrictList = NIL;
|
||||||
CmdType commandType = query->commandType;
|
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
|
||||||
char partitionMethod = PartitionMethod(distributedTableId);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reference tables do not have the notion of partition column. Thus,
|
* Reference tables do not have the notion of partition column. Thus,
|
||||||
|
@ -2104,30 +2197,7 @@ QueryRestrictList(Query *query)
|
||||||
return queryRestrictList;
|
return queryRestrictList;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandType == CMD_INSERT)
|
|
||||||
{
|
|
||||||
/* build equality expression based on partition column value for row */
|
|
||||||
uint32 rangeTableId = 1;
|
|
||||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
|
||||||
Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn);
|
|
||||||
|
|
||||||
OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber);
|
|
||||||
|
|
||||||
Node *rightOp = get_rightop((Expr *) equalityExpr);
|
|
||||||
Const *rightConst = (Const *) rightOp;
|
|
||||||
Assert(IsA(rightOp, Const));
|
|
||||||
|
|
||||||
rightConst->constvalue = partitionValue->constvalue;
|
|
||||||
rightConst->constisnull = partitionValue->constisnull;
|
|
||||||
rightConst->constbyval = partitionValue->constbyval;
|
|
||||||
|
|
||||||
queryRestrictList = list_make1(equalityExpr);
|
|
||||||
}
|
|
||||||
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE ||
|
|
||||||
commandType == CMD_SELECT)
|
|
||||||
{
|
|
||||||
queryRestrictList = WhereClauseList(query->jointree);
|
queryRestrictList = WhereClauseList(query->jointree);
|
||||||
}
|
|
||||||
|
|
||||||
return queryRestrictList;
|
return queryRestrictList;
|
||||||
}
|
}
|
||||||
|
@ -2168,27 +2238,19 @@ ExtractFirstDistributedTableId(Query *query)
|
||||||
* of an INSERT command. If a partition value is missing altogether or is
|
* of an INSERT command. If a partition value is missing altogether or is
|
||||||
* NULL, this function throws an error.
|
* NULL, this function throws an error.
|
||||||
*/
|
*/
|
||||||
static Const *
|
static Expr *
|
||||||
ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
|
ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
|
||||||
{
|
{
|
||||||
Const *partitionValue = NULL;
|
|
||||||
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
|
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
|
||||||
partitionColumn->varattno);
|
partitionColumn->varattno);
|
||||||
if (targetEntry != NULL)
|
if (targetEntry == NULL)
|
||||||
{
|
|
||||||
Assert(IsA(targetEntry->expr, Const));
|
|
||||||
|
|
||||||
partitionValue = (Const *) targetEntry->expr;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (partitionValue == NULL || partitionValue->constisnull)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
errmsg("cannot plan INSERT using row with NULL value "
|
errmsg("cannot perform an INSERT without a partition column "
|
||||||
"in partition column")));
|
"value")));
|
||||||
}
|
}
|
||||||
|
|
||||||
return partitionValue;
|
return targetEntry->expr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2580,19 +2642,37 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
TaskType taskType = task->taskType;
|
TaskType taskType = task->taskType;
|
||||||
bool requiresMasterEvaluation = false;
|
bool requiresMasterEvaluation = false;
|
||||||
|
bool deferredPruning = false;
|
||||||
|
|
||||||
/*
|
|
||||||
* We send modify task to the first replica, otherwise we choose the target shard
|
|
||||||
* according to task assignment policy. Placement list for select queries are
|
|
||||||
* provided as function parameter.
|
|
||||||
*/
|
|
||||||
if (taskType == MODIFY_TASK)
|
if (taskType == MODIFY_TASK)
|
||||||
{
|
{
|
||||||
|
if (task->anchorShardId != INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We were able to assign a shard ID. Generate task
|
||||||
|
* placement list using the first-replica assignment
|
||||||
|
* policy (modify placements in placement ID order).
|
||||||
|
*/
|
||||||
taskList = FirstReplicaAssignTaskList(list_make1(task));
|
taskList = FirstReplicaAssignTaskList(list_make1(task));
|
||||||
requiresMasterEvaluation = RequiresMasterEvaluation(query);
|
requiresMasterEvaluation = RequiresMasterEvaluation(query);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We were unable to assign a shard ID yet, meaning
|
||||||
|
* the partition column value is an expression.
|
||||||
|
*/
|
||||||
|
taskList = list_make1(task);
|
||||||
|
requiresMasterEvaluation = true;
|
||||||
|
deferredPruning = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For selects we get the placement list during shard
|
||||||
|
* pruning.
|
||||||
|
*/
|
||||||
Assert(placementList != NIL);
|
Assert(placementList != NIL);
|
||||||
|
|
||||||
task->taskPlacementList = placementList;
|
task->taskPlacementList = placementList;
|
||||||
|
@ -2606,6 +2686,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
|
||||||
job->jobQuery = query;
|
job->jobQuery = query;
|
||||||
job->taskList = taskList;
|
job->taskList = taskList;
|
||||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||||
|
job->deferredPruning = deferredPruning;
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,6 +298,17 @@ EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState)
|
||||||
planState);
|
planState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsA(expression, Param))
|
||||||
|
{
|
||||||
|
Param *param = (Param *) expression;
|
||||||
|
|
||||||
|
return (Node *) citus_evaluate_expr((Expr *) param,
|
||||||
|
param->paramtype,
|
||||||
|
param->paramtypmod,
|
||||||
|
param->paramcollid,
|
||||||
|
planState);
|
||||||
|
}
|
||||||
|
|
||||||
return expression;
|
return expression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -396,6 +396,7 @@ OutJobFields(StringInfo str, const Job *node)
|
||||||
WRITE_NODE_FIELD(dependedJobList);
|
WRITE_NODE_FIELD(dependedJobList);
|
||||||
WRITE_BOOL_FIELD(subqueryPushdown);
|
WRITE_BOOL_FIELD(subqueryPushdown);
|
||||||
WRITE_BOOL_FIELD(requiresMasterEvaluation);
|
WRITE_BOOL_FIELD(requiresMasterEvaluation);
|
||||||
|
WRITE_BOOL_FIELD(deferredPruning);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -164,6 +164,7 @@ readJobInfo(Job *local_node)
|
||||||
READ_NODE_FIELD(dependedJobList);
|
READ_NODE_FIELD(dependedJobList);
|
||||||
READ_BOOL_FIELD(subqueryPushdown);
|
READ_BOOL_FIELD(subqueryPushdown);
|
||||||
READ_BOOL_FIELD(requiresMasterEvaluation);
|
READ_BOOL_FIELD(requiresMasterEvaluation);
|
||||||
|
READ_BOOL_FIELD(deferredPruning);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -122,6 +122,7 @@ typedef struct Job
|
||||||
List *dependedJobList;
|
List *dependedJobList;
|
||||||
bool subqueryPushdown;
|
bool subqueryPushdown;
|
||||||
bool requiresMasterEvaluation; /* only applies to modify jobs */
|
bool requiresMasterEvaluation; /* only applies to modify jobs */
|
||||||
|
bool deferredPruning;
|
||||||
} Job;
|
} Job;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,8 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||||
ShardInterval *shardInterval);
|
ShardInterval *shardInterval);
|
||||||
|
extern ShardInterval * FindShardForInsert(Query *query,
|
||||||
|
DeferredErrorMessage **planningError);
|
||||||
extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue);
|
extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -361,7 +361,7 @@ ERROR: null value in column "name" violates not-null constraint
|
||||||
DETAIL: Failing row contains (1, null, 5).
|
DETAIL: Failing row contains (1, null, 5).
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
INSERT INTO products VALUES(NULL,'product_1', 5);
|
INSERT INTO products VALUES(NULL,'product_1', 5);
|
||||||
ERROR: cannot plan INSERT using row with NULL value in partition column
|
ERROR: cannot perform an INSERT with NULL in the partition column
|
||||||
DROP TABLE products;
|
DROP TABLE products;
|
||||||
-- Check "NOT NULL" with reference table
|
-- Check "NOT NULL" with reference table
|
||||||
CREATE TABLE products_ref (
|
CREATE TABLE products_ref (
|
||||||
|
|
|
@ -166,12 +166,12 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 430;
|
||||||
|
|
||||||
-- INSERT without partition key
|
-- INSERT without partition key
|
||||||
INSERT INTO limit_orders DEFAULT VALUES;
|
INSERT INTO limit_orders DEFAULT VALUES;
|
||||||
ERROR: cannot plan INSERT using row with NULL value in partition column
|
ERROR: cannot perform an INSERT without a partition column value
|
||||||
-- squelch WARNINGs that contain worker_port
|
-- squelch WARNINGs that contain worker_port
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
-- INSERT violating NOT NULL constraint
|
-- INSERT violating NOT NULL constraint
|
||||||
INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT);
|
INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT);
|
||||||
ERROR: cannot plan INSERT using row with NULL value in partition column
|
ERROR: cannot perform an INSERT with NULL in the partition column
|
||||||
-- INSERT violating column constraint
|
-- INSERT violating column constraint
|
||||||
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
||||||
-5.00);
|
-5.00);
|
||||||
|
@ -195,7 +195,6 @@ SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are unsupported
|
-- commands with non-constant partition values are unsupported
|
||||||
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
'sell', 0.58);
|
'sell', 0.58);
|
||||||
ERROR: values given for the partition column must be constants or constant expressions
|
|
||||||
-- values for other columns are totally fine
|
-- values for other columns are totally fine
|
||||||
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
|
@ -632,3 +631,31 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *
|
||||||
3 | 103 | Mynt
|
3 | 103 | Mynt
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE app_analytics_events;
|
||||||
|
-- again with serial in the partition column
|
||||||
|
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||||
|
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||||
|
id | app_id | name
|
||||||
|
----+--------+------
|
||||||
|
3 | 103 | Mynt
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE app_analytics_events;
|
||||||
|
|
|
@ -67,12 +67,12 @@ SELECT * FROM limit_orders_mx WHERE id = 430;
|
||||||
|
|
||||||
-- INSERT without partition key
|
-- INSERT without partition key
|
||||||
INSERT INTO limit_orders_mx DEFAULT VALUES;
|
INSERT INTO limit_orders_mx DEFAULT VALUES;
|
||||||
ERROR: cannot plan INSERT using row with NULL value in partition column
|
ERROR: cannot perform an INSERT without a partition column value
|
||||||
-- squelch WARNINGs that contain worker_port
|
-- squelch WARNINGs that contain worker_port
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
-- INSERT violating NOT NULL constraint
|
-- INSERT violating NOT NULL constraint
|
||||||
INSERT INTO limit_orders_mx VALUES (NULL, 'T', 975234, DEFAULT);
|
INSERT INTO limit_orders_mx VALUES (NULL, 'T', 975234, DEFAULT);
|
||||||
ERROR: cannot plan INSERT using row with NULL value in partition column
|
ERROR: cannot perform an INSERT with NULL in the partition column
|
||||||
-- INSERT violating column constraint
|
-- INSERT violating column constraint
|
||||||
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
|
||||||
-5.00);
|
-5.00);
|
||||||
|
@ -96,7 +96,6 @@ SET client_min_messages TO DEFAULT;
|
||||||
-- commands with non-constant partition values are unsupported
|
-- commands with non-constant partition values are unsupported
|
||||||
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
|
||||||
'sell', 0.58);
|
'sell', 0.58);
|
||||||
ERROR: values given for the partition column must be constants or constant expressions
|
|
||||||
-- values for other columns are totally fine
|
-- values for other columns are totally fine
|
||||||
INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
|
||||||
-- commands with mutable functions in their quals
|
-- commands with mutable functions in their quals
|
||||||
|
|
|
@ -417,3 +417,15 @@ SELECT master_create_worker_shards('app_analytics_events', 4, 1);
|
||||||
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
||||||
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||||
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||||
|
|
||||||
|
DROP TABLE app_analytics_events;
|
||||||
|
|
||||||
|
-- again with serial in the partition column
|
||||||
|
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||||
|
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||||
|
|
||||||
|
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
||||||
|
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||||
|
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||||
|
|
||||||
|
DROP TABLE app_analytics_events;
|
||||||
|
|
Loading…
Reference in New Issue