Support expressions in the partition column in INSERTs

pull/1302/head
Marco Slot 2017-03-30 11:47:54 +02:00
parent 701aaccd9c
commit 4ed093970a
11 changed files with 379 additions and 204 deletions

View File

@ -74,6 +74,7 @@ bool EnableDeadlockPrevention = true;
/* functions needed during run phase */
static void ReacquireMetadataLocks(List *taskList);
static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
@ -87,7 +88,6 @@ static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *taskList);
static bool RequiresConsistentSnapshot(Task *task);
static void ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
@ -406,7 +406,11 @@ RequiresConsistentSnapshot(Task *task)
/*
* CitusModifyBeginScan checks the validity of the given custom scan node and
* CitusModifyBeginScan first evaluates expressions in the query and then
* performs shard pruning in case the partition column in an insert was
* defined as a function call.
*
* The function also checks the validity of the given custom scan node and
* gets locks on the shards involved in the task list of the distributed plan.
*/
void
@ -415,7 +419,23 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusScanState *scanState = (CitusScanState *) node;
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList;
bool deferredPruning = workerJob->deferredPruning;
if (workerJob->requiresMasterEvaluation)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
ExecuteMasterEvaluableFunctions(jobQuery, planState);
if (deferredPruning)
{
AssignInsertTaskShardId(jobQuery, taskList);
}
RebuildQueryStrings(jobQuery, taskList);
}
/*
* If we are executing a prepared statement, then we may not yet have obtained
@ -428,6 +448,53 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
* locks as early as possible.
*/
ReacquireMetadataLocks(taskList);
/*
* If we deferred shard pruning to the executor, then we still need to assign
* shard placements. We do this after acquiring the metadata locks to ensure
* we can't get stale metadata. At some point, we may want to load all
* placement metadata here.
*/
if (deferredPruning)
{
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}
}
/*
* AssignInsertTaskShardId performs shard pruning for an insert and sets
* anchorShardId accordingly.
*/
static void
AssignInsertTaskShardId(Query *jobQuery, List *taskList)
{
ShardInterval *shardInterval = NULL;
Task *insertTask = NULL;
DeferredErrorMessage *planningError = NULL;
Assert(jobQuery->commandType == CMD_INSERT);
/*
* We skipped shard pruning in the planner because the partition
* column contained an expression. Perform shard pruning now.
*/
shardInterval = FindShardForInsert(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
else if (shardInterval == NULL)
{
/* expression could not be evaluated */
ereport(ERROR, (errmsg("parameters in the partition column are not "
"allowed")));
}
/* assign a shard ID to the task */
insertTask = (Task *) linitial(taskList);
insertTask->anchorShardId = shardInterval->shardId;
}
@ -443,15 +510,12 @@ RouterSingleModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
MultiPlan *multiPlan = scanState->multiPlan;
bool hasReturning = multiPlan->hasReturning;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = (Task *) linitial(taskList);
ProcessMasterEvaluableFunctions(workerJob, planState);
ExecuteSingleModifyTask(scanState, task, hasReturning);
scanState->finishedRemoteScan = true;
@ -463,24 +527,6 @@ RouterSingleModifyExecScan(CustomScanState *node)
}
/*
* ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds
* the query strings in task lists.
*/
static void
ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState)
{
if (workerJob->requiresMasterEvaluation)
{
Query *jobQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList;
ExecuteMasterEvaluableFunctions(jobQuery, planState);
RebuildQueryStrings(jobQuery, taskList);
}
}
/*
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in custom scan's tuple store.
@ -494,15 +540,12 @@ RouterMultiModifyExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
bool hasReturning = multiPlan->hasReturning;
bool isModificationQuery = true;
ProcessMasterEvaluableFunctions(workerJob, planState);
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
scanState->finishedRemoteScan = true;
@ -527,14 +570,11 @@ RouterSelectExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
PlanState *planState = &(scanState->customScanState.ss.ps);
MultiPlan *multiPlan = scanState->multiPlan;
Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList;
Task *task = (Task *) linitial(taskList);
ProcessMasterEvaluableFunctions(workerJob, planState);
ExecuteSingleSelectTask(scanState, task);
scanState->finishedRemoteScan = true;

View File

@ -101,12 +101,14 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
static char MostPermissiveVolatileFlag(char left, char right);
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
FromExpr *joinTree);
static Task * RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval);
static ShardInterval * TargetShardIntervalForModify(Query *query,
static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery,
ShardInterval *shardInterval);
static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query,
DeferredErrorMessage **planningError);
static List * QueryRestrictList(Query *query);
static bool FastShardPruningPossible(CmdType commandType, char partitionMethod);
static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static ShardInterval * FindShardForUpdateOrDelete(Query *query,
DeferredErrorMessage **planningError);
static List * QueryRestrictList(Query *query, char partitionMethod);
static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn);
static Task * RouterSelectTask(Query *originalQuery,
RelationRestrictionContext *restrictionContext,
List **placementList);
@ -215,6 +217,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
if (modifyTask)
{
Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery);
ShardInterval *targetShardInterval = NULL;
DeferredErrorMessage *planningError = NULL;
@ -226,14 +229,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
return multiPlan;
}
targetShardInterval = TargetShardIntervalForModify(query, &planningError);
targetShardInterval = TargetShardIntervalForModify(distributedTableId, query,
&planningError);
if (planningError != NULL)
{
multiPlan->planningError = planningError;
return multiPlan;
}
task = RouterModifyTask(originalQuery, targetShardInterval);
task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval);
Assert(task);
}
else
@ -1373,15 +1377,6 @@ ModifyQuerySupported(Query *queryTree)
specifiesPartitionValue = true;
}
if (commandType == CMD_INSERT && targetEntryPartitionColumn &&
!IsA(targetEntry->expr, Const))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"values given for the partition column must be"
" constants or constant expressions",
NULL, NULL);
}
if (commandType == CMD_UPDATE &&
MasterIrreducibleExpression((Node *) targetEntry->expr,
&hasVarArgument, &hasBadCoalesce))
@ -1826,24 +1821,27 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
* shard-extended deparsed SQL to be run during execution.
*/
static Task *
RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
RouterModifyTask(Oid distributedTableId, Query *originalQuery,
ShardInterval *shardInterval)
{
uint64 shardId = shardInterval->shardId;
Oid distributedTableId = shardInterval->relationId;
StringInfo queryString = makeStringInfo();
Task *modifyTask = NULL;
bool upsertQuery = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
modifyTask = CitusMakeNode(Task);
modifyTask->jobId = INVALID_JOB_ID;
modifyTask->taskId = INVALID_TASK_ID;
modifyTask->taskType = MODIFY_TASK;
modifyTask->queryString = NULL;
modifyTask->anchorShardId = INVALID_SHARD_ID;
modifyTask->dependedTaskList = NIL;
modifyTask->replicationModel = cacheEntry->replicationModel;
if (originalQuery->onConflict != NULL)
{
RangeTblEntry *rangeTableEntry = NULL;
/* set the flag */
upsertQuery = true;
modifyTask->upsertQuery = true;
/* setting an alias simplifies deparsing of UPSERTs */
rangeTableEntry = linitial(originalQuery->rtable);
@ -1854,18 +1852,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
}
}
deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString);
if (shardInterval != NULL)
{
uint64 shardId = shardInterval->shardId;
StringInfo queryString = makeStringInfo();
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
deparse_shard_query(originalQuery, shardInterval->relationId, shardId,
queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data)));
modifyTask = CitusMakeNode(Task);
modifyTask->jobId = INVALID_JOB_ID;
modifyTask->taskId = INVALID_TASK_ID;
modifyTask->taskType = MODIFY_TASK;
modifyTask->queryString = queryString->data;
modifyTask->anchorShardId = shardId;
modifyTask->dependedTaskList = NIL;
modifyTask->upsertQuery = upsertQuery;
modifyTask->replicationModel = cacheEntry->replicationModel;
}
return modifyTask;
}
@ -1873,40 +1874,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval)
/*
* TargetShardIntervalForModify determines the single shard targeted by a provided
* modify command. If no matching shards exist, it throws an error. If the modification
* targets more than one shard, this function sets the deferred error and returns NULL,
* to handle cases in which we cannot prune down to one shard due to a parameter.
* modify command. If no matching shards exist, it throws an error. Otherwise, it
* delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the
* command type.
*/
static ShardInterval *
TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
TargetShardIntervalForModify(Oid distributedTableId, Query *query,
DeferredErrorMessage **planningError)
{
List *prunedShardList = NIL;
int prunedShardCount = 0;
int shardCount = 0;
Oid distributedTableId = ExtractFirstDistributedTableId(query);
ShardInterval *shardInterval = NULL;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
bool fastShardPruningPossible = false;
CmdType commandType = query->commandType;
const char *commandName = NULL;
int shardCount = 0;
Assert(commandType != CMD_SELECT);
if (commandType == CMD_INSERT)
{
commandName = "INSERT";
}
else if (commandType == CMD_UPDATE)
{
commandName = "UPDATE";
}
else
{
Assert(commandType == CMD_DELETE);
commandName = "DELETE";
}
/* error out if no shards exist for the table */
shardCount = cacheEntry->shardIntervalArrayLength;
if (shardCount == 0)
@ -1921,28 +1903,101 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
"and try again.")));
}
fastShardPruningPossible = FastShardPruningPossible(query->commandType,
partitionMethod);
if (fastShardPruningPossible)
if (commandType == CMD_INSERT)
{
shardInterval = FindShardForInsert(query, planningError);
}
else
{
shardInterval = FindShardForUpdateOrDelete(query, planningError);
}
return shardInterval;
}
/*
* FindShardForInsert returns the shard interval for an INSERT query or NULL if
* the partition column value is defined as an expression that still needs to be
* evaluated. If the partition column value falls within 0 or multiple
* (overlapping) shards, the planningError is set.
*/
ShardInterval *
FindShardForInsert(Query *query, DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
uint32 rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn);
Var *partitionColumn = NULL;
Expr *partitionValueExpr = NULL;
Const *partitionValueConst = NULL;
List *shardIntervalList = NIL;
List *prunedShardList = NIL;
int prunedShardCount = 0;
Assert(query->commandType == CMD_INSERT);
/* reference tables can only have one shard */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
int shardCount = 0;
shardIntervalList = LoadShardIntervalList(distributedTableId);
shardCount = list_length(shardIntervalList);
if (shardCount != 1)
{
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount)));
}
return (ShardInterval *) linitial(shardIntervalList);
}
partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn);
if (!IsA(partitionValueExpr, Const))
{
/* shard pruning not possible right now */
return NULL;
}
partitionValueConst = (Const *) partitionValueExpr;
if (partitionValueConst->constisnull)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot perform an INSERT with NULL in the partition "
"column")));
}
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
Datum partitionValue = partitionValueConst->constvalue;
ShardInterval *shardInterval = FastShardPruning(distributedTableId,
partitionValue);
if (shardInterval != NULL)
{
prunedShardList = lappend(prunedShardList, shardInterval);
prunedShardList = list_make1(shardInterval);
}
}
else
{
List *restrictClauseList = QueryRestrictList(query);
List *restrictClauseList = NIL;
Index tableId = 1;
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) equalityExpr);
Const *rightConst = (Const *) rightOp;
Assert(IsA(rightOp, Const));
rightConst->constvalue = partitionValueConst->constvalue;
rightConst->constisnull = partitionValueConst->constisnull;
rightConst->constbyval = partitionValueConst->constbyval;
restrictClauseList = list_make1(equalityExpr);
shardIntervalList = LoadShardIntervalList(distributedTableId);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList);
}
@ -1950,13 +2005,12 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
{
Oid relationId = cacheEntry->relationId;
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(relationId, partitionKeyString);
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *targetCountType = NULL;
bool showHint = false;
if (prunedShardCount == 0)
{
@ -1967,42 +2021,24 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
targetCountType = "multiple";
}
if (commandType == CMD_INSERT && prunedShardCount == 0)
if (prunedShardCount == 0)
{
appendStringInfo(errorHint, "Make sure you have created a shard which "
"can receive this partition column value.");
}
else if (commandType == CMD_INSERT)
else
{
appendStringInfo(errorHint, "Make sure the value for partition column "
"\"%s\" falls into a single shard.",
partitionColumnName);
}
else
{
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a "
"single shard. If you'd like to run a "
"multi-shard operation, use "
"master_modify_multiple_shards().",
partitionColumnName);
}
if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND)
{
appendStringInfo(errorHint, " You can also use "
"master_apply_delete_command() to drop "
"all shards satisfying delete criteria.");
}
showHint = errorHint->len > 0;
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
commandName, targetCountType);
appendStringInfo(errorMessage, "cannot run INSERT command which targets %s "
"shards", targetCountType);
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
showHint ? errorHint->data : NULL);
errorHint->data);
return NULL;
}
@ -2011,29 +2047,6 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError)
}
/*
* UseFastShardPruning returns true if the commandType is INSERT and partition method
* is hash or range.
*/
static bool
FastShardPruningPossible(CmdType commandType, char partitionMethod)
{
/* we currently only support INSERTs */
if (commandType != CMD_INSERT)
{
return false;
}
/* fast shard pruning is only supported for hash and range partitioned tables */
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
return true;
}
return false;
}
/*
* FastShardPruning is a higher level API for FindShardInterval function. Given the
* relationId of the distributed table and partitionValue, FastShardPruning function finds
@ -2078,6 +2091,89 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue)
}
/*
* FindShardForUpdateOrDelete finds the shard interval in which an UPDATE or
* DELETE command should be applied, or sets planningError when the query
* needs to be applied to multiple or no shards.
*/
static ShardInterval *
FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError)
{
Oid distributedTableId = ExtractFirstDistributedTableId(query);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
CmdType commandType = query->commandType;
List *shardIntervalList = NIL;
List *restrictClauseList = NIL;
Index tableId = 1;
List *prunedShardList = NIL;
int prunedShardCount = 0;
Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE);
shardIntervalList = LoadShardIntervalList(distributedTableId);
restrictClauseList = QueryRestrictList(query, partitionMethod);
prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList,
shardIntervalList);
prunedShardCount = list_length(prunedShardList);
if (prunedShardCount != 1)
{
char *partitionKeyString = cacheEntry->partitionKeyString;
char *partitionColumnName = ColumnNameToColumn(distributedTableId,
partitionKeyString);
StringInfo errorMessage = makeStringInfo();
StringInfo errorHint = makeStringInfo();
const char *commandName = NULL;
const char *targetCountType = NULL;
if (commandType == CMD_UPDATE)
{
commandName = "UPDATE";
}
else
{
commandName = "DELETE";
}
if (prunedShardCount == 0)
{
targetCountType = "no";
}
else
{
targetCountType = "multiple";
}
appendStringInfo(errorHint, "Consider using an equality filter on "
"partition column \"%s\" to target a "
"single shard. If you'd like to run a "
"multi-shard operation, use "
"master_modify_multiple_shards().",
partitionColumnName);
if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND)
{
appendStringInfo(errorHint, " You can also use "
"master_apply_delete_command() to drop "
"all shards satisfying delete criteria.");
}
appendStringInfo(errorMessage, "cannot run %s command which targets %s shards",
commandName, targetCountType);
(*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL,
errorHint->data);
return NULL;
}
return (ShardInterval *) linitial(prunedShardList);
}
/*
* QueryRestrictList returns the restriction clauses for the query. For a SELECT
* statement these are the where-clause expressions. For INSERT statements we
@ -2088,12 +2184,9 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue)
* NIL for reference tables.
*/
static List *
QueryRestrictList(Query *query)
QueryRestrictList(Query *query, char partitionMethod)
{
List *queryRestrictList = NIL;
CmdType commandType = query->commandType;
Oid distributedTableId = ExtractFirstDistributedTableId(query);
char partitionMethod = PartitionMethod(distributedTableId);
/*
* Reference tables do not have the notion of partition column. Thus,
@ -2104,30 +2197,7 @@ QueryRestrictList(Query *query)
return queryRestrictList;
}
if (commandType == CMD_INSERT)
{
/* build equality expression based on partition column value for row */
uint32 rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn);
OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber);
Node *rightOp = get_rightop((Expr *) equalityExpr);
Const *rightConst = (Const *) rightOp;
Assert(IsA(rightOp, Const));
rightConst->constvalue = partitionValue->constvalue;
rightConst->constisnull = partitionValue->constisnull;
rightConst->constbyval = partitionValue->constbyval;
queryRestrictList = list_make1(equalityExpr);
}
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE ||
commandType == CMD_SELECT)
{
queryRestrictList = WhereClauseList(query->jointree);
}
return queryRestrictList;
}
@ -2168,27 +2238,19 @@ ExtractFirstDistributedTableId(Query *query)
* of an INSERT command. If a partition value is missing altogether or is
* NULL, this function throws an error.
*/
static Const *
static Expr *
ExtractInsertPartitionValue(Query *query, Var *partitionColumn)
{
Const *partitionValue = NULL;
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
partitionColumn->varattno);
if (targetEntry != NULL)
{
Assert(IsA(targetEntry->expr, Const));
partitionValue = (Const *) targetEntry->expr;
}
if (partitionValue == NULL || partitionValue->constisnull)
if (targetEntry == NULL)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot plan INSERT using row with NULL value "
"in partition column")));
errmsg("cannot perform an INSERT without a partition column "
"value")));
}
return partitionValue;
return targetEntry->expr;
}
@ -2580,19 +2642,37 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
List *taskList = NIL;
TaskType taskType = task->taskType;
bool requiresMasterEvaluation = false;
bool deferredPruning = false;
/*
* We send modify task to the first replica, otherwise we choose the target shard
* according to task assignment policy. Placement list for select queries are
* provided as function parameter.
*/
if (taskType == MODIFY_TASK)
{
if (task->anchorShardId != INVALID_SHARD_ID)
{
/*
* We were able to assign a shard ID. Generate task
* placement list using the first-replica assignment
* policy (modify placements in placement ID order).
*/
taskList = FirstReplicaAssignTaskList(list_make1(task));
requiresMasterEvaluation = RequiresMasterEvaluation(query);
}
else
{
/*
* We were unable to assign a shard ID yet, meaning
* the partition column value is an expression.
*/
taskList = list_make1(task);
requiresMasterEvaluation = true;
deferredPruning = true;
}
}
else
{
/*
* For selects we get the placement list during shard
* pruning.
*/
Assert(placementList != NIL);
task->taskPlacementList = placementList;
@ -2606,6 +2686,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
job->jobQuery = query;
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
return job;
}

View File

@ -298,6 +298,17 @@ EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState)
planState);
}
if (IsA(expression, Param))
{
Param *param = (Param *) expression;
return (Node *) citus_evaluate_expr((Expr *) param,
param->paramtype,
param->paramtypmod,
param->paramcollid,
planState);
}
return expression;
}

View File

@ -396,6 +396,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_NODE_FIELD(dependedJobList);
WRITE_BOOL_FIELD(subqueryPushdown);
WRITE_BOOL_FIELD(requiresMasterEvaluation);
WRITE_BOOL_FIELD(deferredPruning);
}

View File

@ -164,6 +164,7 @@ readJobInfo(Job *local_node)
READ_NODE_FIELD(dependedJobList);
READ_BOOL_FIELD(subqueryPushdown);
READ_BOOL_FIELD(requiresMasterEvaluation);
READ_BOOL_FIELD(deferredPruning);
}

View File

@ -122,6 +122,7 @@ typedef struct Job
List *dependedJobList;
bool subqueryPushdown;
bool requiresMasterEvaluation; /* only applies to modify jobs */
bool deferredPruning;
} Job;

View File

@ -42,6 +42,8 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
ShardInterval *shardInterval);
extern ShardInterval * FindShardForInsert(Query *query,
DeferredErrorMessage **planningError);
extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue);

View File

@ -361,7 +361,7 @@ ERROR: null value in column "name" violates not-null constraint
DETAIL: Failing row contains (1, null, 5).
CONTEXT: while executing command on localhost:57638
INSERT INTO products VALUES(NULL,'product_1', 5);
ERROR: cannot plan INSERT using row with NULL value in partition column
ERROR: cannot perform an INSERT with NULL in the partition column
DROP TABLE products;
-- Check "NOT NULL" with reference table
CREATE TABLE products_ref (

View File

@ -166,12 +166,12 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 430;
-- INSERT without partition key
INSERT INTO limit_orders DEFAULT VALUES;
ERROR: cannot plan INSERT using row with NULL value in partition column
ERROR: cannot perform an INSERT without a partition column value
-- squelch WARNINGs that contain worker_port
SET client_min_messages TO ERROR;
-- INSERT violating NOT NULL constraint
INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT);
ERROR: cannot plan INSERT using row with NULL value in partition column
ERROR: cannot perform an INSERT with NULL in the partition column
-- INSERT violating column constraint
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
-5.00);
@ -195,7 +195,6 @@ SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58);
ERROR: values given for the partition column must be constants or constant expressions
-- values for other columns are totally fine
INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
-- commands with mutable functions in their quals
@ -632,3 +631,31 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *
3 | 103 | Mynt
(1 row)
DROP TABLE app_analytics_events;
-- again with serial in the partition column
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT create_distributed_table('app_analytics_events', 'id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
id
----
1
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
id
----
2
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
id | app_id | name
----+--------+------
3 | 103 | Mynt
(1 row)
DROP TABLE app_analytics_events;

View File

@ -67,12 +67,12 @@ SELECT * FROM limit_orders_mx WHERE id = 430;
-- INSERT without partition key
INSERT INTO limit_orders_mx DEFAULT VALUES;
ERROR: cannot plan INSERT using row with NULL value in partition column
ERROR: cannot perform an INSERT without a partition column value
-- squelch WARNINGs that contain worker_port
SET client_min_messages TO ERROR;
-- INSERT violating NOT NULL constraint
INSERT INTO limit_orders_mx VALUES (NULL, 'T', 975234, DEFAULT);
ERROR: cannot plan INSERT using row with NULL value in partition column
ERROR: cannot perform an INSERT with NULL in the partition column
-- INSERT violating column constraint
INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
-5.00);
@ -96,7 +96,6 @@ SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
'sell', 0.58);
ERROR: values given for the partition column must be constants or constant expressions
-- values for other columns are totally fine
INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random());
-- commands with mutable functions in their quals

View File

@ -417,3 +417,15 @@ SELECT master_create_worker_shards('app_analytics_events', 4, 1);
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
DROP TABLE app_analytics_events;
-- again with serial in the partition column
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT create_distributed_table('app_analytics_events', 'id');
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
DROP TABLE app_analytics_events;