diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 931b785a7..e951a5c9e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -74,6 +74,7 @@ bool EnableDeadlockPrevention = true; /* functions needed during run phase */ static void ReacquireMetadataLocks(List *taskList); +static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); @@ -87,7 +88,6 @@ static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); -static void ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -406,7 +406,11 @@ RequiresConsistentSnapshot(Task *task) /* - * CitusModifyBeginScan checks the validity of the given custom scan node and + * CitusModifyBeginScan first evaluates expressions in the query and then + * performs shard pruning in case the partition column in an insert was + * defined as a function call. + * + * The function also checks the validity of the given custom scan node and * gets locks on the shards involved in the task list of the distributed plan. */ void @@ -415,7 +419,23 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) CitusScanState *scanState = (CitusScanState *) node; MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; + Query *jobQuery = workerJob->jobQuery; List *taskList = workerJob->taskList; + bool deferredPruning = workerJob->deferredPruning; + + if (workerJob->requiresMasterEvaluation) + { + PlanState *planState = &(scanState->customScanState.ss.ps); + + ExecuteMasterEvaluableFunctions(jobQuery, planState); + + if (deferredPruning) + { + AssignInsertTaskShardId(jobQuery, taskList); + } + + RebuildQueryStrings(jobQuery, taskList); + } /* * If we are executing a prepared statement, then we may not yet have obtained @@ -428,6 +448,53 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) * locks as early as possible. */ ReacquireMetadataLocks(taskList); + + /* + * If we deferred shard pruning to the executor, then we still need to assign + * shard placements. We do this after acquiring the metadata locks to ensure + * we can't get stale metadata. At some point, we may want to load all + * placement metadata here. + */ + if (deferredPruning) + { + /* modify tasks are always assigned using first-replica policy */ + workerJob->taskList = FirstReplicaAssignTaskList(taskList); + } +} + + +/* + * AssignInsertTaskShardId performs shard pruning for an insert and sets + * anchorShardId accordingly. + */ +static void +AssignInsertTaskShardId(Query *jobQuery, List *taskList) +{ + ShardInterval *shardInterval = NULL; + Task *insertTask = NULL; + DeferredErrorMessage *planningError = NULL; + + Assert(jobQuery->commandType == CMD_INSERT); + + /* + * We skipped shard pruning in the planner because the partition + * column contained an expression. Perform shard pruning now. + */ + shardInterval = FindShardForInsert(jobQuery, &planningError); + if (planningError != NULL) + { + RaiseDeferredError(planningError, ERROR); + } + else if (shardInterval == NULL) + { + /* expression could not be evaluated */ + ereport(ERROR, (errmsg("parameters in the partition column are not " + "allowed"))); + } + + /* assign a shard ID to the task */ + insertTask = (Task *) linitial(taskList); + insertTask->anchorShardId = shardInterval->shardId; } @@ -443,15 +510,12 @@ RouterSingleModifyExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - PlanState *planState = &(scanState->customScanState.ss.ps); MultiPlan *multiPlan = scanState->multiPlan; bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; Task *task = (Task *) linitial(taskList); - ProcessMasterEvaluableFunctions(workerJob, planState); - ExecuteSingleModifyTask(scanState, task, hasReturning); scanState->finishedRemoteScan = true; @@ -463,24 +527,6 @@ RouterSingleModifyExecScan(CustomScanState *node) } -/* - * ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds - * the query strings in task lists. - */ -static void -ProcessMasterEvaluableFunctions(Job *workerJob, PlanState *planState) -{ - if (workerJob->requiresMasterEvaluation) - { - Query *jobQuery = workerJob->jobQuery; - List *taskList = workerJob->taskList; - - ExecuteMasterEvaluableFunctions(jobQuery, planState); - RebuildQueryStrings(jobQuery, taskList); - } -} - - /* * RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves * the results and, if RETURNING is used, stores them in custom scan's tuple store. @@ -494,15 +540,12 @@ RouterMultiModifyExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - PlanState *planState = &(scanState->customScanState.ss.ps); MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; bool hasReturning = multiPlan->hasReturning; bool isModificationQuery = true; - ProcessMasterEvaluableFunctions(workerJob, planState); - ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); scanState->finishedRemoteScan = true; @@ -527,14 +570,11 @@ RouterSelectExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { - PlanState *planState = &(scanState->customScanState.ss.ps); MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; Task *task = (Task *) linitial(taskList); - ProcessMasterEvaluableFunctions(workerJob, planState); - ExecuteSingleSelectTask(scanState, task); scanState->finishedRemoteScan = true; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 52bb3d1b5..8d0ded0f4 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -101,12 +101,14 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta static char MostPermissiveVolatileFlag(char left, char right); static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree); -static Task * RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval); -static ShardInterval * TargetShardIntervalForModify(Query *query, +static Task * RouterModifyTask(Oid distributedTableId, Query *originalQuery, + ShardInterval *shardInterval); +static ShardInterval * TargetShardIntervalForModify(Oid distriubtedTableId, Query *query, DeferredErrorMessage **planningError); -static List * QueryRestrictList(Query *query); -static bool FastShardPruningPossible(CmdType commandType, char partitionMethod); -static Const * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); +static ShardInterval * FindShardForUpdateOrDelete(Query *query, + DeferredErrorMessage **planningError); +static List * QueryRestrictList(Query *query, char partitionMethod); +static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Task * RouterSelectTask(Query *originalQuery, RelationRestrictionContext *restrictionContext, List **placementList); @@ -215,6 +217,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, if (modifyTask) { + Oid distributedTableId = ExtractFirstDistributedTableId(originalQuery); ShardInterval *targetShardInterval = NULL; DeferredErrorMessage *planningError = NULL; @@ -226,14 +229,15 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, return multiPlan; } - targetShardInterval = TargetShardIntervalForModify(query, &planningError); + targetShardInterval = TargetShardIntervalForModify(distributedTableId, query, + &planningError); if (planningError != NULL) { multiPlan->planningError = planningError; return multiPlan; } - task = RouterModifyTask(originalQuery, targetShardInterval); + task = RouterModifyTask(distributedTableId, originalQuery, targetShardInterval); Assert(task); } else @@ -1373,15 +1377,6 @@ ModifyQuerySupported(Query *queryTree) specifiesPartitionValue = true; } - if (commandType == CMD_INSERT && targetEntryPartitionColumn && - !IsA(targetEntry->expr, Const)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "values given for the partition column must be" - " constants or constant expressions", - NULL, NULL); - } - if (commandType == CMD_UPDATE && MasterIrreducibleExpression((Node *) targetEntry->expr, &hasVarArgument, &hasBadCoalesce)) @@ -1826,24 +1821,27 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre * shard-extended deparsed SQL to be run during execution. */ static Task * -RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval) +RouterModifyTask(Oid distributedTableId, Query *originalQuery, + ShardInterval *shardInterval) { - uint64 shardId = shardInterval->shardId; - Oid distributedTableId = shardInterval->relationId; - StringInfo queryString = makeStringInfo(); Task *modifyTask = NULL; - bool upsertQuery = false; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - /* grab shared metadata lock to stop concurrent placement additions */ - LockShardDistributionMetadata(shardId, ShareLock); + modifyTask = CitusMakeNode(Task); + modifyTask->jobId = INVALID_JOB_ID; + modifyTask->taskId = INVALID_TASK_ID; + modifyTask->taskType = MODIFY_TASK; + modifyTask->queryString = NULL; + modifyTask->anchorShardId = INVALID_SHARD_ID; + modifyTask->dependedTaskList = NIL; + modifyTask->replicationModel = cacheEntry->replicationModel; if (originalQuery->onConflict != NULL) { RangeTblEntry *rangeTableEntry = NULL; /* set the flag */ - upsertQuery = true; + modifyTask->upsertQuery = true; /* setting an alias simplifies deparsing of UPSERTs */ rangeTableEntry = linitial(originalQuery->rtable); @@ -1854,18 +1852,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval) } } - deparse_shard_query(originalQuery, shardInterval->relationId, shardId, queryString); - ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + if (shardInterval != NULL) + { + uint64 shardId = shardInterval->shardId; + StringInfo queryString = makeStringInfo(); - modifyTask = CitusMakeNode(Task); - modifyTask->jobId = INVALID_JOB_ID; - modifyTask->taskId = INVALID_TASK_ID; - modifyTask->taskType = MODIFY_TASK; - modifyTask->queryString = queryString->data; - modifyTask->anchorShardId = shardId; - modifyTask->dependedTaskList = NIL; - modifyTask->upsertQuery = upsertQuery; - modifyTask->replicationModel = cacheEntry->replicationModel; + /* grab shared metadata lock to stop concurrent placement additions */ + LockShardDistributionMetadata(shardId, ShareLock); + + deparse_shard_query(originalQuery, shardInterval->relationId, shardId, + queryString); + ereport(DEBUG4, (errmsg("distributed statement: %s", queryString->data))); + + modifyTask->queryString = queryString->data; + modifyTask->anchorShardId = shardId; + } return modifyTask; } @@ -1873,40 +1874,21 @@ RouterModifyTask(Query *originalQuery, ShardInterval *shardInterval) /* * TargetShardIntervalForModify determines the single shard targeted by a provided - * modify command. If no matching shards exist, it throws an error. If the modification - * targets more than one shard, this function sets the deferred error and returns NULL, - * to handle cases in which we cannot prune down to one shard due to a parameter. + * modify command. If no matching shards exist, it throws an error. Otherwise, it + * delegates to FindShardForInsert or FindShardForUpdateOrDelete based on the + * command type. */ static ShardInterval * -TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError) +TargetShardIntervalForModify(Oid distributedTableId, Query *query, + DeferredErrorMessage **planningError) { - List *prunedShardList = NIL; - int prunedShardCount = 0; - int shardCount = 0; - Oid distributedTableId = ExtractFirstDistributedTableId(query); + ShardInterval *shardInterval = NULL; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; - bool fastShardPruningPossible = false; CmdType commandType = query->commandType; - const char *commandName = NULL; + int shardCount = 0; Assert(commandType != CMD_SELECT); - - if (commandType == CMD_INSERT) - { - commandName = "INSERT"; - } - else if (commandType == CMD_UPDATE) - { - commandName = "UPDATE"; - } - else - { - Assert(commandType == CMD_DELETE); - commandName = "DELETE"; - } - /* error out if no shards exist for the table */ shardCount = cacheEntry->shardIntervalArrayLength; if (shardCount == 0) @@ -1921,28 +1903,101 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError) "and try again."))); } - fastShardPruningPossible = FastShardPruningPossible(query->commandType, - partitionMethod); - if (fastShardPruningPossible) + if (commandType == CMD_INSERT) + { + shardInterval = FindShardForInsert(query, planningError); + } + else + { + shardInterval = FindShardForUpdateOrDelete(query, planningError); + } + + return shardInterval; +} + + +/* + * FindShardForInsert returns the shard interval for an INSERT query or NULL if + * the partition column value is defined as an expression that still needs to be + * evaluated. If the partition column value falls within 0 or multiple + * (overlapping) shards, the planningError is set. + */ +ShardInterval * +FindShardForInsert(Query *query, DeferredErrorMessage **planningError) +{ + Oid distributedTableId = ExtractFirstDistributedTableId(query); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + Expr *partitionValueExpr = NULL; + Const *partitionValueConst = NULL; + List *shardIntervalList = NIL; + List *prunedShardList = NIL; + int prunedShardCount = 0; + + Assert(query->commandType == CMD_INSERT); + + /* reference tables can only have one shard */ + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + int shardCount = 0; + + shardIntervalList = LoadShardIntervalList(distributedTableId); + shardCount = list_length(shardIntervalList); + + if (shardCount != 1) + { + ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount))); + } + + return (ShardInterval *) linitial(shardIntervalList); + } + + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + + partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn); + if (!IsA(partitionValueExpr, Const)) + { + /* shard pruning not possible right now */ + return NULL; + } + + partitionValueConst = (Const *) partitionValueExpr; + if (partitionValueConst->constisnull) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot perform an INSERT with NULL in the partition " + "column"))); + } + + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) { - uint32 rangeTableId = 1; - Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValueConst = ExtractInsertPartitionValue(query, partitionColumn); Datum partitionValue = partitionValueConst->constvalue; ShardInterval *shardInterval = FastShardPruning(distributedTableId, partitionValue); - if (shardInterval != NULL) { - prunedShardList = lappend(prunedShardList, shardInterval); + prunedShardList = list_make1(shardInterval); } } else { - List *restrictClauseList = QueryRestrictList(query); + List *restrictClauseList = NIL; Index tableId = 1; - List *shardIntervalList = LoadShardIntervalList(distributedTableId); + OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber); + Node *rightOp = get_rightop((Expr *) equalityExpr); + Const *rightConst = (Const *) rightOp; + Assert(IsA(rightOp, Const)); + + rightConst->constvalue = partitionValueConst->constvalue; + rightConst->constisnull = partitionValueConst->constisnull; + rightConst->constbyval = partitionValueConst->constbyval; + + restrictClauseList = list_make1(equalityExpr); + + shardIntervalList = LoadShardIntervalList(distributedTableId); prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, shardIntervalList); } @@ -1950,13 +2005,12 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError) prunedShardCount = list_length(prunedShardList); if (prunedShardCount != 1) { - Oid relationId = cacheEntry->relationId; char *partitionKeyString = cacheEntry->partitionKeyString; - char *partitionColumnName = ColumnNameToColumn(relationId, partitionKeyString); + char *partitionColumnName = ColumnNameToColumn(distributedTableId, + partitionKeyString); StringInfo errorMessage = makeStringInfo(); StringInfo errorHint = makeStringInfo(); const char *targetCountType = NULL; - bool showHint = false; if (prunedShardCount == 0) { @@ -1967,42 +2021,24 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError) targetCountType = "multiple"; } - if (commandType == CMD_INSERT && prunedShardCount == 0) + if (prunedShardCount == 0) { appendStringInfo(errorHint, "Make sure you have created a shard which " "can receive this partition column value."); } - else if (commandType == CMD_INSERT) + else { appendStringInfo(errorHint, "Make sure the value for partition column " "\"%s\" falls into a single shard.", partitionColumnName); } - else - { - appendStringInfo(errorHint, "Consider using an equality filter on " - "partition column \"%s\" to target a " - "single shard. If you'd like to run a " - "multi-shard operation, use " - "master_modify_multiple_shards().", - partitionColumnName); - } - if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND) - { - appendStringInfo(errorHint, " You can also use " - "master_apply_delete_command() to drop " - "all shards satisfying delete criteria."); - } - - showHint = errorHint->len > 0; - - appendStringInfo(errorMessage, "cannot run %s command which targets %s shards", - commandName, targetCountType); + appendStringInfo(errorMessage, "cannot run INSERT command which targets %s " + "shards", targetCountType); (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorMessage->data, NULL, - showHint ? errorHint->data : NULL); + errorHint->data); return NULL; } @@ -2011,29 +2047,6 @@ TargetShardIntervalForModify(Query *query, DeferredErrorMessage **planningError) } -/* - * UseFastShardPruning returns true if the commandType is INSERT and partition method - * is hash or range. - */ -static bool -FastShardPruningPossible(CmdType commandType, char partitionMethod) -{ - /* we currently only support INSERTs */ - if (commandType != CMD_INSERT) - { - return false; - } - - /* fast shard pruning is only supported for hash and range partitioned tables */ - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) - { - return true; - } - - return false; -} - - /* * FastShardPruning is a higher level API for FindShardInterval function. Given the * relationId of the distributed table and partitionValue, FastShardPruning function finds @@ -2078,6 +2091,89 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue) } +/* + * FindShardForUpdateOrDelete finds the shard interval in which an UPDATE or + * DELETE command should be applied, or sets planningError when the query + * needs to be applied to multiple or no shards. + */ +static ShardInterval * +FindShardForUpdateOrDelete(Query *query, DeferredErrorMessage **planningError) +{ + Oid distributedTableId = ExtractFirstDistributedTableId(query); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + CmdType commandType = query->commandType; + List *shardIntervalList = NIL; + List *restrictClauseList = NIL; + Index tableId = 1; + List *prunedShardList = NIL; + int prunedShardCount = 0; + + Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE); + + shardIntervalList = LoadShardIntervalList(distributedTableId); + + restrictClauseList = QueryRestrictList(query, partitionMethod); + prunedShardList = PruneShardList(distributedTableId, tableId, restrictClauseList, + shardIntervalList); + + prunedShardCount = list_length(prunedShardList); + if (prunedShardCount != 1) + { + char *partitionKeyString = cacheEntry->partitionKeyString; + char *partitionColumnName = ColumnNameToColumn(distributedTableId, + partitionKeyString); + StringInfo errorMessage = makeStringInfo(); + StringInfo errorHint = makeStringInfo(); + const char *commandName = NULL; + const char *targetCountType = NULL; + + if (commandType == CMD_UPDATE) + { + commandName = "UPDATE"; + } + else + { + commandName = "DELETE"; + } + + if (prunedShardCount == 0) + { + targetCountType = "no"; + } + else + { + targetCountType = "multiple"; + } + + appendStringInfo(errorHint, "Consider using an equality filter on " + "partition column \"%s\" to target a " + "single shard. If you'd like to run a " + "multi-shard operation, use " + "master_modify_multiple_shards().", + partitionColumnName); + + if (commandType == CMD_DELETE && partitionMethod == DISTRIBUTE_BY_APPEND) + { + appendStringInfo(errorHint, " You can also use " + "master_apply_delete_command() to drop " + "all shards satisfying delete criteria."); + } + + appendStringInfo(errorMessage, "cannot run %s command which targets %s shards", + commandName, targetCountType); + + (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, + errorHint->data); + + return NULL; + } + + return (ShardInterval *) linitial(prunedShardList); +} + + /* * QueryRestrictList returns the restriction clauses for the query. For a SELECT * statement these are the where-clause expressions. For INSERT statements we @@ -2088,12 +2184,9 @@ FastShardPruning(Oid distributedTableId, Datum partitionValue) * NIL for reference tables. */ static List * -QueryRestrictList(Query *query) +QueryRestrictList(Query *query, char partitionMethod) { List *queryRestrictList = NIL; - CmdType commandType = query->commandType; - Oid distributedTableId = ExtractFirstDistributedTableId(query); - char partitionMethod = PartitionMethod(distributedTableId); /* * Reference tables do not have the notion of partition column. Thus, @@ -2104,30 +2197,7 @@ QueryRestrictList(Query *query) return queryRestrictList; } - if (commandType == CMD_INSERT) - { - /* build equality expression based on partition column value for row */ - uint32 rangeTableId = 1; - Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn); - - OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber); - - Node *rightOp = get_rightop((Expr *) equalityExpr); - Const *rightConst = (Const *) rightOp; - Assert(IsA(rightOp, Const)); - - rightConst->constvalue = partitionValue->constvalue; - rightConst->constisnull = partitionValue->constisnull; - rightConst->constbyval = partitionValue->constbyval; - - queryRestrictList = list_make1(equalityExpr); - } - else if (commandType == CMD_UPDATE || commandType == CMD_DELETE || - commandType == CMD_SELECT) - { - queryRestrictList = WhereClauseList(query->jointree); - } + queryRestrictList = WhereClauseList(query->jointree); return queryRestrictList; } @@ -2168,27 +2238,19 @@ ExtractFirstDistributedTableId(Query *query) * of an INSERT command. If a partition value is missing altogether or is * NULL, this function throws an error. */ -static Const * +static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn) { - Const *partitionValue = NULL; TargetEntry *targetEntry = get_tle_by_resno(query->targetList, partitionColumn->varattno); - if (targetEntry != NULL) - { - Assert(IsA(targetEntry->expr, Const)); - - partitionValue = (Const *) targetEntry->expr; - } - - if (partitionValue == NULL || partitionValue->constisnull) + if (targetEntry == NULL) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot plan INSERT using row with NULL value " - "in partition column"))); + errmsg("cannot perform an INSERT without a partition column " + "value"))); } - return partitionValue; + return targetEntry->expr; } @@ -2580,19 +2642,37 @@ RouterQueryJob(Query *query, Task *task, List *placementList) List *taskList = NIL; TaskType taskType = task->taskType; bool requiresMasterEvaluation = false; + bool deferredPruning = false; - /* - * We send modify task to the first replica, otherwise we choose the target shard - * according to task assignment policy. Placement list for select queries are - * provided as function parameter. - */ if (taskType == MODIFY_TASK) { - taskList = FirstReplicaAssignTaskList(list_make1(task)); - requiresMasterEvaluation = RequiresMasterEvaluation(query); + if (task->anchorShardId != INVALID_SHARD_ID) + { + /* + * We were able to assign a shard ID. Generate task + * placement list using the first-replica assignment + * policy (modify placements in placement ID order). + */ + taskList = FirstReplicaAssignTaskList(list_make1(task)); + requiresMasterEvaluation = RequiresMasterEvaluation(query); + } + else + { + /* + * We were unable to assign a shard ID yet, meaning + * the partition column value is an expression. + */ + taskList = list_make1(task); + requiresMasterEvaluation = true; + deferredPruning = true; + } } else { + /* + * For selects we get the placement list during shard + * pruning. + */ Assert(placementList != NIL); task->taskPlacementList = placementList; @@ -2606,6 +2686,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList) job->jobQuery = query; job->taskList = taskList; job->requiresMasterEvaluation = requiresMasterEvaluation; + job->deferredPruning = deferredPruning; return job; } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 61268b9ea..91f2a0db2 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -298,6 +298,17 @@ EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState) planState); } + if (IsA(expression, Param)) + { + Param *param = (Param *) expression; + + return (Node *) citus_evaluate_expr((Expr *) param, + param->paramtype, + param->paramtypmod, + param->paramcollid, + planState); + } + return expression; } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 8aa485857..460de189b 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -396,6 +396,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_NODE_FIELD(dependedJobList); WRITE_BOOL_FIELD(subqueryPushdown); WRITE_BOOL_FIELD(requiresMasterEvaluation); + WRITE_BOOL_FIELD(deferredPruning); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 37c9245fa..9bcf61efc 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -164,6 +164,7 @@ readJobInfo(Job *local_node) READ_NODE_FIELD(dependedJobList); READ_BOOL_FIELD(subqueryPushdown); READ_BOOL_FIELD(requiresMasterEvaluation); + READ_BOOL_FIELD(deferredPruning); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 96d74561c..20512bc3e 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -122,6 +122,7 @@ typedef struct Job List *dependedJobList; bool subqueryPushdown; bool requiresMasterEvaluation; /* only applies to modify jobs */ + bool deferredPruning; } Job; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 91c4c963f..322cbf791 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -42,6 +42,8 @@ extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); +extern ShardInterval * FindShardForInsert(Query *query, + DeferredErrorMessage **planningError); extern ShardInterval * FastShardPruning(Oid distributedTableId, Datum partitionValue); diff --git a/src/test/regress/expected/multi_alter_table_add_constraints.out b/src/test/regress/expected/multi_alter_table_add_constraints.out index 9b56819cc..93ce385ec 100644 --- a/src/test/regress/expected/multi_alter_table_add_constraints.out +++ b/src/test/regress/expected/multi_alter_table_add_constraints.out @@ -361,7 +361,7 @@ ERROR: null value in column "name" violates not-null constraint DETAIL: Failing row contains (1, null, 5). CONTEXT: while executing command on localhost:57638 INSERT INTO products VALUES(NULL,'product_1', 5); -ERROR: cannot plan INSERT using row with NULL value in partition column +ERROR: cannot perform an INSERT with NULL in the partition column DROP TABLE products; -- Check "NOT NULL" with reference table CREATE TABLE products_ref ( diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 929e3e965..68950caea 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -166,12 +166,12 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 430; -- INSERT without partition key INSERT INTO limit_orders DEFAULT VALUES; -ERROR: cannot plan INSERT using row with NULL value in partition column +ERROR: cannot perform an INSERT without a partition column value -- squelch WARNINGs that contain worker_port SET client_min_messages TO ERROR; -- INSERT violating NOT NULL constraint INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT); -ERROR: cannot plan INSERT using row with NULL value in partition column +ERROR: cannot perform an INSERT with NULL in the partition column -- INSERT violating column constraint INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); @@ -195,7 +195,6 @@ SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); -ERROR: values given for the partition column must be constants or constant expressions -- values for other columns are totally fine INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals @@ -632,3 +631,31 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING * 3 | 103 | Mynt (1 row) +DROP TABLE app_analytics_events; +-- again with serial in the partition column +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; + id +---- + 1 +(1 row) + +INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; + id +---- + 2 +(1 row) + +INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 3 | 103 | Mynt +(1 row) + +DROP TABLE app_analytics_events; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 89d4b9ec6..e960dce3f 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -67,12 +67,12 @@ SELECT * FROM limit_orders_mx WHERE id = 430; -- INSERT without partition key INSERT INTO limit_orders_mx DEFAULT VALUES; -ERROR: cannot plan INSERT using row with NULL value in partition column +ERROR: cannot perform an INSERT without a partition column value -- squelch WARNINGs that contain worker_port SET client_min_messages TO ERROR; -- INSERT violating NOT NULL constraint INSERT INTO limit_orders_mx VALUES (NULL, 'T', 975234, DEFAULT); -ERROR: cannot plan INSERT using row with NULL value in partition column +ERROR: cannot perform an INSERT with NULL in the partition column -- INSERT violating column constraint INSERT INTO limit_orders_mx VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); @@ -96,7 +96,6 @@ SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders_mx VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); -ERROR: values given for the partition column must be constants or constant expressions -- values for other columns are totally fine INSERT INTO limit_orders_mx VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); -- commands with mutable functions in their quals diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 43f2c6b7e..583301fe8 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -417,3 +417,15 @@ SELECT master_create_worker_shards('app_analytics_events', 4, 1); INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; + +DROP TABLE app_analytics_events; + +-- again with serial in the partition column +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + +INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; +INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; +INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; + +DROP TABLE app_analytics_events;