From 6a35c2937cd06f2604d9c59d701f520788ade9fb Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Tue, 11 Jul 2017 13:00:47 -0600 Subject: [PATCH 1/4] Enable multi-row INSERTs This is a pretty substantial refactoring of the existing modify path within the router executor and planner. In particular, we now hunt for all VALUES range table entries in INSERT statements and group the rows contained therein by shard identifier. These rows are stashed away for later in "ModifyRoute" elements. During deparse, the appropriate RTE is extracted from the Query and its values list is replaced by these rows before any SQL is generated. In this way, we can create multiple Tasks, but only one per shard, to piecemeal execute a multi-row INSERT. The execution of jobs containing such tasks now exclusively go through the "multi-router executor" which was previously used for e.g. INSERT INTO ... SELECT. By piggybacking onto that executor, we participate in ongoing trans- actions, get rollback-ability, etc. In short order, the only remaining use of the "single modify" router executor will be for bare single- row INSERT statements (i.e. those not in a transaction). This change appropriately handles deferred pruning as well as master- evaluated functions. --- .../distributed/executor/multi_executor.c | 3 +- .../executor/multi_router_executor.c | 7 +- .../distributed/planner/deparse_shard_query.c | 124 +++- .../planner/multi_router_planner.c | 561 ++++++++++++------ src/backend/distributed/utils/citus_clauses.c | 55 +- .../distributed/utils/citus_copyfuncs.c | 1 + .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + src/include/distributed/multi_executor.h | 2 + .../distributed/multi_physical_planner.h | 2 + .../regress/expected/multi_insert_select.out | 6 + .../regress/expected/multi_modifications.out | 10 +- .../expected/multi_mx_modifications.out | 8 +- src/test/regress/sql/multi_insert_select.sql | 9 + src/test/regress/sql/multi_modifications.sql | 8 +- .../regress/sql/multi_mx_modifications.sql | 6 +- 16 files changed, 568 insertions(+), 236 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index d24d59008..62c35d769 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -66,7 +66,8 @@ static CustomExecMethods RouterSingleModifyCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -static CustomExecMethods RouterMultiModifyCustomExecMethods = { +/* not static to enable swapping in multi-modify logic during router execution */ +CustomExecMethods RouterMultiModifyCustomExecMethods = { .CustomName = "RouterMultiModifyScan", .BeginCustomScan = CitusModifyBeginScan, .ExecCustomScan = RouterMultiModifyExecScan, diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 7ba5b864d..3eada7eb7 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -419,6 +419,11 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) RaiseDeferredError(planningError, ERROR); } + if (list_length(taskList) > 1) + { + node->methods = &RouterMultiModifyCustomExecMethods; + } + workerJob->taskList = taskList; } @@ -428,7 +433,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* prevent concurrent placement changes */ AcquireMetadataLocks(taskList); - /* assign task placements */ + /* modify tasks are always assigned using first-replica policy */ workerJob->taskList = FirstReplicaAssignTaskList(taskList); } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 9ddcc0be1..07aa37729 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -26,11 +26,15 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "parser/parsetree.h" #include "storage/lock.h" #include "utils/lsyscache.h" #include "utils/rel.h" - +static RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query, + Oid distributedTableId); +static void UpdateTaskQueryString(Query *query, Oid distributedTableId, + RangeTblEntry *valuesRTE, Task *task); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); @@ -43,11 +47,12 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) { ListCell *taskCell = NULL; Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid; + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery, + relationId); foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); - StringInfo newQueryString = makeStringInfo(); Query *query = originalQuery; if (task->insertSelectQuery) @@ -90,31 +95,108 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) } } - /* - * For INSERT queries, we only have one relation to update, so we can - * use deparse_shard_query(). For UPDATE and DELETE queries, we may have - * subqueries and joins, so we use relation shard list to update shard - * names and call pg_get_query_def() directly. - */ - if (query->commandType == CMD_INSERT) - { - deparse_shard_query(query, relationId, task->anchorShardId, newQueryString); - } - else - { - List *relationShardList = task->relationShardList; - UpdateRelationToShardNames((Node *) query, relationShardList); + ereport(DEBUG4, (errmsg("query before rebuilding: %s", task->queryString))); - pg_get_query_def(query, newQueryString); - } + UpdateTaskQueryString(query, relationId, valuesRTE, task); - ereport(DEBUG4, (errmsg("distributed statement: %s", newQueryString->data))); - - task->queryString = newQueryString->data; + ereport(DEBUG4, (errmsg("query after rebuilding: %s", task->queryString))); } } +/* + * ExtractDistributedInsertValuesRTE does precisely that. If the provided + * query is not an INSERT, or if the table is a reference table, or if the + * INSERT does not have a VALUES RTE (i.e. it is not a multi-row INSERT), this + * function returns NULL. If all those conditions are met, an RTE representing + * the multiple values of a multi-row INSERT is returned. + */ +static RangeTblEntry * +ExtractDistributedInsertValuesRTE(Query *query, Oid distributedTableId) +{ + RangeTblEntry *valuesRTE = NULL; + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + TargetEntry *targetEntry = NULL; + + if (query->commandType != CMD_INSERT) + { + return NULL; + } + + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + if (partitionColumn == NULL) + { + return NULL; + } + + targetEntry = get_tle_by_resno(query->targetList, partitionColumn->varattno); + Assert(targetEntry != NULL); + + if (IsA(targetEntry->expr, Var)) + { + Var *partitionVar = (Var *) targetEntry->expr; + + valuesRTE = rt_fetch(partitionVar->varno, query->rtable); + if (valuesRTE->rtekind != RTE_VALUES) + { + return NULL; + } + } + + return valuesRTE; +} + + +/* + * UpdateTaskQueryString updates the query string stored within the provided + * Task. If the Task has row values from a multi-row INSERT, those are injected + * into the provided query (using the provided valuesRTE, which must belong to + * the query) before deparse occurs (the query's full VALUES list will be + * restored before this function returns). + */ +static void +UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, + Task *task) +{ + StringInfo queryString = makeStringInfo(); + List *oldValuesLists = NIL; + + if (valuesRTE != NULL) + { + Assert(valuesRTE->rtekind == RTE_VALUES); + + oldValuesLists = valuesRTE->values_lists; + valuesRTE->values_lists = task->rowValuesLists; + } + + /* + * For INSERT queries, we only have one relation to update, so we can + * use deparse_shard_query(). For UPDATE and DELETE queries, we may have + * subqueries and joins, so we use relation shard list to update shard + * names and call pg_get_query_def() directly. + */ + if (query->commandType == CMD_INSERT) + { + deparse_shard_query(query, distributedTableId, task->anchorShardId, queryString); + } + else + { + List *relationShardList = task->relationShardList; + UpdateRelationToShardNames((Node *) query, relationShardList); + + pg_get_query_def(query, queryString); + } + + if (valuesRTE != NULL) + { + valuesRTE->values_lists = oldValuesLists; + } + + task->queryString = queryString->data; +} + + /* * UpdateRelationToShardNames walks over the query tree and appends shard ids to * relations. It uses unique identity value to establish connection between a diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 2021379c3..c9854e1b2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -71,6 +71,28 @@ #include "catalog/pg_proc.h" #include "optimizer/planmain.h" +/* intermediate value for INSERT processing */ +typedef struct InsertValues +{ + Expr *partitionValueExpr; /* partition value provided in INSERT row */ + List *rowValues; /* full values list of INSERT row, possibly NIL */ + int64 shardId; /* target shard for this row, possibly invalid */ +} InsertValues; + + +/* + * A ModifyRoute encapsulates the the information needed to route modifications + * to the appropriate shard. For a single-shard modification, only one route + * is needed, but in the case of e.g. a multi-row INSERT, lists of these values + * will help divide the rows by their destination shards, permitting later + * shard-and-row-specific extension of the original SQL. + */ +typedef struct ModifyRoute +{ + int64 shardId; /* identifier of target shard */ + List *rowValuesLists; /* for multi-row INSERTs, list of rows to be inserted */ +} ModifyRoute; + typedef struct WalkerState { @@ -99,9 +121,6 @@ static void ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry); static bool CanShardPrune(Oid distributedTableId, Query *query); static Job * CreateJob(Query *query); static Task * CreateTask(TaskType taskType); -static ShardInterval * FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError); -static Expr * ExtractInsertPartitionValue(Query *query, Var *partitionColumn); static Job * RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, DeferredErrorMessage **planningError); @@ -110,6 +129,9 @@ static List * TargetShardIntervalsForRouter(Query *query, RelationRestrictionContext *restrictionContext, bool *multiShardQuery); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); +static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); +static List * GroupInsertValuesByShardId(List *insertValuesList); +static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -120,6 +142,8 @@ static bool SelectsFromDistributedTable(List *rangeTableList); #if (PG_VERSION_NUM >= 100000) static List * get_all_actual_clauses(List *restrictinfo_list); #endif +static int CompareInsertValuesByShardId(const void *leftElement, + const void *rightElement); /* @@ -471,7 +495,6 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) bool isCoordinator = IsCoordinator(); List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; - bool hasValuesScan = false; uint32 queryTableCount = 0; bool specifiesPartitionValue = false; ListCell *setTargetCell = NULL; @@ -555,7 +578,7 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) } else if (rangeTableEntry->rtekind == RTE_VALUES) { - hasValuesScan = true; + /* do nothing, this type is supported */ } else { @@ -626,24 +649,6 @@ ModifyQuerySupported(Query *queryTree, bool multiShardQuery) } } - /* reject queries which involve multi-row inserts */ - if (hasValuesScan) - { - /* - * NB: If you remove this check you must also change the checks further in this - * method and ensure that VOLATILE function calls aren't allowed in INSERT - * statements. Currently they're allowed but the function call is replaced - * with a constant, and if you're inserting multiple rows at once the function - * should return a different value for each row. - */ - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot perform distributed planning for the given" - " modification", - "Multi-row INSERTs to distributed tables are not " - "supported.", - NULL); - } - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -1143,7 +1148,8 @@ CanShardPrune(Oid distributedTableId, Query *query) { uint32 rangeTableId = 1; Var *partitionColumn = NULL; - Expr *partitionValueExpr = NULL; + List *insertValuesList = NIL; + ListCell *insertValuesCell = NULL; if (query->commandType != CMD_INSERT) { @@ -1158,14 +1164,19 @@ CanShardPrune(Oid distributedTableId, Query *query) return true; } - partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn); - if (IsA(partitionValueExpr, Const)) + /* get full list of partition values and ensure they are all Consts */ + insertValuesList = ExtractInsertValuesList(query, partitionColumn); + foreach(insertValuesCell, insertValuesList) { - /* can do shard pruning if the partition column is constant */ - return true; + InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell); + if (!IsA(insertValues->partitionValueExpr, Const)) + { + /* can't do shard pruning if the partition column is not constant */ + return false; + } } - return false; + return true; } @@ -1198,8 +1209,9 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry) List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) { - ShardInterval *shardInterval = NULL; - Task *modifyTask = NULL; + List *insertTaskList = NIL; + List *modifyRouteList = NIL; + ListCell *modifyRouteCell = NULL; Oid distributedTableId = ExtractFirstDistributedTableId(query); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); @@ -1208,26 +1220,30 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) Assert(query->commandType == CMD_INSERT); - shardInterval = FindShardForInsert(query, cacheEntry, planningError); - + modifyRouteList = BuildRoutesForInsert(query, planningError); if (*planningError != NULL) { return NIL; } - /* an INSERT always routes to exactly one shard */ - Assert(shardInterval != NULL); - - modifyTask = CreateTask(MODIFY_TASK); - modifyTask->anchorShardId = shardInterval->shardId; - modifyTask->replicationModel = cacheEntry->replicationModel; - - if (query->onConflict != NULL) + foreach(modifyRouteCell, modifyRouteList) { - modifyTask->upsertQuery = true; + ModifyRoute *modifyRoute = (ModifyRoute *) lfirst(modifyRouteCell); + + Task *modifyTask = CreateTask(MODIFY_TASK); + modifyTask->anchorShardId = modifyRoute->shardId; + modifyTask->replicationModel = cacheEntry->replicationModel; + modifyTask->rowValuesLists = modifyRoute->rowValuesLists; + + if (query->onConflict != NULL) + { + modifyTask->upsertQuery = true; + } + + insertTaskList = lappend(insertTaskList, modifyTask); } - return list_make1(modifyTask); + return insertTaskList; } @@ -1264,132 +1280,6 @@ CreateTask(TaskType taskType) } -/* - * FindShardForInsert returns the shard interval for an INSERT query or NULL if - * the partition column value is defined as an expression that still needs to be - * evaluated. If the partition column value falls within 0 or multiple - * (overlapping) shards, the planningError is set. - */ -static ShardInterval * -FindShardForInsert(Query *query, DistTableCacheEntry *cacheEntry, - DeferredErrorMessage **planningError) -{ - Oid distributedTableId = cacheEntry->relationId; - char partitionMethod = cacheEntry->partitionMethod; - uint32 rangeTableId = 1; - Var *partitionColumn = NULL; - Expr *partitionValueExpr = NULL; - Const *partitionValueConst = NULL; - int prunedShardCount = 0; - List *prunedShardList = NIL; - - Assert(query->commandType == CMD_INSERT); - - /* reference tables do not have a partition column, but can only have one shard */ - if (partitionMethod == DISTRIBUTE_BY_NONE) - { - int shardCount = cacheEntry->shardIntervalArrayLength; - if (shardCount != 1) - { - ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount))); - } - - return cacheEntry->sortedShardIntervalArray[0]; - } - - partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - partitionValueExpr = ExtractInsertPartitionValue(query, partitionColumn); - - /* non-constants should have been caught by CanShardPrune */ - if (!IsA(partitionValueExpr, Const)) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot perform an INSERT with a non-constant in the " - "partition column"))); - } - - partitionValueConst = (Const *) partitionValueExpr; - if (partitionValueConst->constisnull) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot perform an INSERT with NULL in the partition " - "column"))); - } - - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) - { - Datum partitionValue = partitionValueConst->constvalue; - ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry); - - if (shardInterval != NULL) - { - prunedShardList = list_make1(shardInterval); - } - } - else - { - List *restrictClauseList = NIL; - Index tableId = 1; - OpExpr *equalityExpr = MakeOpExpression(partitionColumn, BTEqualStrategyNumber); - Node *rightOp = get_rightop((Expr *) equalityExpr); - Const *rightConst = (Const *) rightOp; - - Assert(IsA(rightOp, Const)); - - rightConst->constvalue = partitionValueConst->constvalue; - rightConst->constisnull = partitionValueConst->constisnull; - rightConst->constbyval = partitionValueConst->constbyval; - - restrictClauseList = list_make1(equalityExpr); - - prunedShardList = PruneShards(distributedTableId, tableId, restrictClauseList); - } - - prunedShardCount = list_length(prunedShardList); - if (prunedShardCount != 1) - { - char *partitionKeyString = cacheEntry->partitionKeyString; - char *partitionColumnName = ColumnNameToColumn(distributedTableId, - partitionKeyString); - StringInfo errorMessage = makeStringInfo(); - StringInfo errorHint = makeStringInfo(); - const char *targetCountType = NULL; - - if (prunedShardCount == 0) - { - targetCountType = "no"; - } - else - { - targetCountType = "multiple"; - } - - if (prunedShardCount == 0) - { - appendStringInfo(errorHint, "Make sure you have created a shard which " - "can receive this partition column value."); - } - else - { - appendStringInfo(errorHint, "Make sure the value for partition column " - "\"%s\" falls into a single shard.", - partitionColumnName); - } - - appendStringInfo(errorMessage, "cannot run INSERT command which targets %s " - "shards", targetCountType); - - (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - errorMessage->data, NULL, - errorHint->data); - - return NULL; - } - - return (ShardInterval *) linitial(prunedShardList); -} - - /* * ExtractFirstDistributedTableId takes a given query, and finds the relationId * for the first distributed table in that query. If the function cannot find a @@ -1420,27 +1310,6 @@ ExtractFirstDistributedTableId(Query *query) } -/* - * ExtractPartitionValue extracts the partition column value from a the target - * of an INSERT command. If a partition value is missing altogether or is - * NULL, this function throws an error. - */ -static Expr * -ExtractInsertPartitionValue(Query *query, Var *partitionColumn) -{ - TargetEntry *targetEntry = get_tle_by_resno(query->targetList, - partitionColumn->varattno); - if (targetEntry == NULL) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot perform an INSERT without a partition column " - "value"))); - } - - return targetEntry->expr; -} - - /* RouterJob builds a Job to represent a single shard select/update/delete query */ static Job * RouterJob(Query *originalQuery, RelationRestrictionContext *restrictionContext, @@ -1957,6 +1826,167 @@ WorkersContainingAllShards(List *prunedShardIntervalsList) } +/* + * BuildRoutesForInsert returns a list of ModifyRoute objects for an INSERT + * query or an empty list if the partition column value is defined as an ex- + * pression that still needs to be evaluated. If any partition column value + * falls within 0 or multiple (overlapping) shards, the planning error is set. + * + * Multi-row INSERTs are handled by grouping their rows by target shard. These + * groups are returned in ascending order by shard id, ready for later deparse + * to shard-specific SQL. + */ +static List * +BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) +{ + Oid distributedTableId = ExtractFirstDistributedTableId(query); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + uint32 rangeTableId = 1; + Var *partitionColumn = NULL; + List *insertValuesList = NIL; + List *modifyRouteList = NIL; + ListCell *insertValuesCell = NULL; + + Assert(query->commandType == CMD_INSERT); + + /* reference tables can only have one shard */ + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + int shardCount = 0; + List *shardIntervalList = LoadShardIntervalList(distributedTableId); + ShardInterval *shardInterval = NULL; + ModifyRoute *modifyRoute = NULL; + + shardCount = list_length(shardIntervalList); + if (shardCount != 1) + { + ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount))); + } + + shardInterval = linitial(shardIntervalList); + modifyRoute = palloc(sizeof(ModifyRoute)); + + modifyRoute->shardId = shardInterval->shardId; + modifyRoute->rowValuesLists = NIL; + + modifyRouteList = lappend(modifyRouteList, modifyRoute); + + return modifyRouteList; + } + + partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + + /* get full list of insert values and iterate over them to prune */ + insertValuesList = ExtractInsertValuesList(query, partitionColumn); + + foreach(insertValuesCell, insertValuesList) + { + InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell); + Const *partitionValueConst = NULL; + List *prunedShardList = NIL; + int prunedShardCount = 0; + ShardInterval *targetShard = NULL; + + if (!IsA(insertValues->partitionValueExpr, Const)) + { + /* shard pruning not possible right now */ + return NIL; + } + + partitionValueConst = (Const *) insertValues->partitionValueExpr; + if (partitionValueConst->constisnull) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot perform an INSERT with NULL in the partition " + "column"))); + } + + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == + DISTRIBUTE_BY_RANGE) + { + Datum partitionValue = partitionValueConst->constvalue; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry( + distributedTableId); + ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry); + + if (shardInterval != NULL) + { + prunedShardList = list_make1(shardInterval); + } + } + else + { + List *restrictClauseList = NIL; + Index tableId = 1; + OpExpr *equalityExpr = MakeOpExpression(partitionColumn, + BTEqualStrategyNumber); + Node *rightOp = get_rightop((Expr *) equalityExpr); + Const *rightConst = (Const *) rightOp; + + Assert(IsA(rightOp, Const)); + + rightConst->constvalue = partitionValueConst->constvalue; + rightConst->constisnull = partitionValueConst->constisnull; + rightConst->constbyval = partitionValueConst->constbyval; + + restrictClauseList = list_make1(equalityExpr); + + prunedShardList = PruneShards(distributedTableId, tableId, + restrictClauseList); + } + + prunedShardCount = list_length(prunedShardList); + if (prunedShardCount != 1) + { + char *partitionKeyString = cacheEntry->partitionKeyString; + char *partitionColumnName = ColumnNameToColumn(distributedTableId, + partitionKeyString); + StringInfo errorMessage = makeStringInfo(); + StringInfo errorHint = makeStringInfo(); + const char *targetCountType = NULL; + + if (prunedShardCount == 0) + { + targetCountType = "no"; + } + else + { + targetCountType = "multiple"; + } + + if (prunedShardCount == 0) + { + appendStringInfo(errorHint, "Make sure you have created a shard which " + "can receive this partition column value."); + } + else + { + appendStringInfo(errorHint, "Make sure the value for partition column " + "\"%s\" falls into a single shard.", + partitionColumnName); + } + + appendStringInfo(errorMessage, "cannot run INSERT command which targets %s " + "shards", targetCountType); + + (*planningError) = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, + errorHint->data); + + return NIL; + } + + targetShard = (ShardInterval *) linitial(prunedShardList); + insertValues->shardId = targetShard->shardId; + } + + modifyRouteList = GroupInsertValuesByShardId(insertValuesList); + + return modifyRouteList; +} + + /* * IntersectPlacementList performs placement pruning based on matching on * nodeName:nodePort fields of shard placement data. We start pruning from all @@ -1993,6 +2023,129 @@ IntersectPlacementList(List *lhsPlacementList, List *rhsPlacementList) } +/* + * GroupInsertValuesByShardId takes care of grouping the rows from a multi-row + * INSERT by target shard. At this point, all pruning has taken place and we + * need only to build sets of rows for each destination. This is done by a + * simple sort (by shard identifier) and gather step. The sort has the side- + * effect of getting things in ascending order to avoid unnecessary deadlocks + * during Task execution. + */ +static List * +GroupInsertValuesByShardId(List *insertValuesList) +{ + ModifyRoute *route = NULL; + ListCell *insertValuesCell = NULL; + List *modifyRouteList = NIL; + + insertValuesList = SortList(insertValuesList, CompareInsertValuesByShardId); + foreach(insertValuesCell, insertValuesList) + { + InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell); + int64 shardId = insertValues->shardId; + bool foundSameShardId = false; + + if (route != NULL) + { + if (route->shardId == shardId) + { + foundSameShardId = true; + } + else + { + /* new shard id seen; current aggregation done; add to list */ + modifyRouteList = lappend(modifyRouteList, route); + } + } + + if (foundSameShardId) + { + /* + * Our current value has the same shard id as our aggregate object, + * so append the rowValues. + */ + route->rowValuesLists = lappend(route->rowValuesLists, + insertValues->rowValues); + } + else + { + /* we encountered a new shard id; build a new aggregate object */ + route = (ModifyRoute *) palloc(sizeof(ModifyRoute)); + route->shardId = insertValues->shardId; + route->rowValuesLists = list_make1(insertValues->rowValues); + } + } + + /* left holding one final aggregate object; add to list */ + modifyRouteList = lappend(modifyRouteList, route); + + return modifyRouteList; +} + + +/* + * ExtractInsertValuesList extracts the partition column value for an INSERT + * command and returns it within an InsertValues struct. For single-row INSERTs + * this is simply a value extracted from the target list, but multi-row INSERTs + * will generate a List of InsertValues, each with full row values in addition + * to the partition value. If a partition value is NULL or missing altogether, + * this function errors. + */ +static List * +ExtractInsertValuesList(Query *query, Var *partitionColumn) +{ + List *insertValuesList = NIL; + TargetEntry *targetEntry = get_tle_by_resno(query->targetList, + partitionColumn->varattno); + + if (targetEntry == NULL) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("cannot perform an INSERT without a partition column " + "value"))); + } + + /* + * We've got a multi-row INSERT. PostgreSQL internally represents such + * commands by linking Vars in the target list to lists of values within + * a special VALUES range table entry. By extracting the right positional + * expression from each list within that RTE, we will extract the partition + * values for each row within the multi-row INSERT. + */ + if (IsA(targetEntry->expr, Var)) + { + Var *partitionVar = (Var *) targetEntry->expr; + RangeTblEntry *referencedRTE = NULL; + ListCell *valuesListCell = NULL; + + referencedRTE = rt_fetch(partitionVar->varno, query->rtable); + foreach(valuesListCell, referencedRTE->values_lists) + { + InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); + insertValues->rowValues = (List *) lfirst(valuesListCell); + insertValues->partitionValueExpr = list_nth(insertValues->rowValues, + (partitionVar->varattno - 1)); + insertValues->shardId = INVALID_SHARD_ID; + + insertValuesList = lappend(insertValuesList, insertValues); + } + } + + /* nothing's been found yet; this is a simple single-row INSERT */ + if (insertValuesList == NIL) + { + InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); + insertValues->rowValues = NIL; + insertValues->partitionValueExpr = targetEntry->expr; + insertValues->shardId = INVALID_SHARD_ID; + + insertValuesList = lappend(insertValuesList, insertValues); + } + + return insertValuesList; +} + + /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select @@ -2170,3 +2323,31 @@ get_all_actual_clauses(List *restrictinfo_list) #endif + + +/* + * CompareInsertValuesByShardId does what it says in the name. Used for sorting + * InsertValues objects by their shard. + */ +static int +CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) +{ + InsertValues *leftValue = *((InsertValues **) leftElement); + InsertValues *rightValue = *((InsertValues **) rightElement); + int64 leftShardId = leftValue->shardId; + int64 rightShardId = rightValue->shardId; + + /* we compare 64-bit integers, instead of casting their difference to int */ + if (leftShardId > rightShardId) + { + return 1; + } + else if (leftShardId < rightShardId) + { + return -1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 979666b5c..79e5268b1 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -32,6 +32,7 @@ typedef struct FunctionEvaluationContext /* private function declarations */ +static void EvaluateValuesListsItems(List *valuesLists, PlanState *planState); static Node * EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState); static Node * PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *context); @@ -63,14 +64,19 @@ RequiresMasterEvaluation(Query *query) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); - if (rte->rtekind != RTE_SUBQUERY) + if (rte->rtekind == RTE_SUBQUERY) { - continue; + if (RequiresMasterEvaluation(rte->subquery)) + { + return true; + } } - - if (RequiresMasterEvaluation(rte->subquery)) + else if (rte->rtekind == RTE_VALUES) { - return true; + if (contain_mutable_functions((Node *) rte->values_lists)) + { + return true; + } } } @@ -131,12 +137,14 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); - if (rte->rtekind != RTE_SUBQUERY) + if (rte->rtekind == RTE_SUBQUERY) { - continue; + ExecuteMasterEvaluableFunctions(rte->subquery, planState); + } + else if (rte->rtekind == RTE_VALUES) + { + EvaluateValuesListsItems(rte->values_lists, planState); } - - ExecuteMasterEvaluableFunctions(rte->subquery, planState); } foreach(cteCell, query->cteList) @@ -148,6 +156,35 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) } +/* + * EvaluateValuesListsItems siply does the work of walking over each expression + * in each value list contained in a multi-row INSERT's VALUES RTE. Basically + * a nested for loop to perform an in-place replacement of expressions with + * their ultimate values, should evaluation be necessary. + */ +static void +EvaluateValuesListsItems(List *valuesLists, PlanState *planState) +{ + ListCell *exprListCell = NULL; + + foreach(exprListCell, valuesLists) + { + List *exprList = (List *) lfirst(exprListCell); + ListCell *exprCell = NULL; + + foreach(exprCell, exprList) + { + Expr *expr = (Expr *) lfirst(exprCell); + Node *modifiedNode = NULL; + + modifiedNode = PartiallyEvaluateExpression((Node *) expr, planState); + + exprCell->data.ptr_value = (void *) modifiedNode; + } + } +} + + /* * Walks the expression evaluating any node which invokes a function as long as a Var * doesn't show up in the parameter list. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index b93f6b434..9a3ab06af 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -232,6 +232,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(insertSelectQuery); COPY_NODE_FIELD(relationShardList); + COPY_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b4bb5bb30..e5fb43fab 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -441,6 +441,7 @@ OutTask(OUTFUNC_ARGS) WRITE_CHAR_FIELD(replicationModel); WRITE_BOOL_FIELD(insertSelectQuery); WRITE_NODE_FIELD(relationShardList); + WRITE_NODE_FIELD(rowValuesLists); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 849c54ba1..6e597cb6e 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -351,6 +351,7 @@ ReadTask(READFUNC_ARGS) READ_CHAR_FIELD(replicationModel); READ_BOOL_FIELD(insertSelectQuery); READ_NODE_FIELD(relationShardList); + READ_NODE_FIELD(rowValuesLists); READ_DONE(); } diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index eac04ab92..a710a9bc7 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -28,6 +28,8 @@ typedef struct CitusScanState } CitusScanState; +extern CustomExecMethods RouterMultiModifyCustomExecMethods; + extern Node * RealTimeCreateScan(CustomScan *scan); extern Node * TaskTrackerCreateScan(CustomScan *scan); extern Node * RouterCreateScan(CustomScan *scan); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1f43851f3..bbff3a83e 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -187,6 +187,8 @@ typedef struct Task bool insertSelectQuery; List *relationShardList; + + List *rowValuesLists; /* rows to use when building multi-row INSERT */ } Task; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 91494ee9b..16d3c22e7 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1657,6 +1657,12 @@ BEGIN; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; ROLLBACK; +-- Similarly, multi-row INSERTs will take part in transactions and reuse connections... +BEGIN; +INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; +COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; +INSERT INTO raw_events_first (user_id, value_1) VALUES (105, 105), (106, 106); +ROLLBACK; -- selecting from views works CREATE VIEW test_view AS SELECT * FROM raw_events_first; INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index c53e090fa..608c23609 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -190,7 +190,7 @@ CONTEXT: while executing command on localhost:57638 INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; ERROR: could not modify any active placements SET client_min_messages TO DEFAULT; --- commands with non-constant partition values are unsupported +-- commands with non-constant partition values are supported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); -- values for other columns are totally fine @@ -201,10 +201,10 @@ ERROR: functions used in the WHERE clause of modification queries on distribute -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are unsupported -INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Multi-row INSERTs to distributed tables are not supported. +-- commands with multiple rows are supported +INSERT INTO limit_orders VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), + (2038, 'GOOG', 5634, now(), 'buy', random()), + (2039, 'GOOG', 5634, now(), 'buy', random()); -- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported -- INSERT INTO limit_orders SELECT * FROM limit_orders; diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 56e7cd59d..b9f0b09b6 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -104,10 +104,10 @@ ERROR: functions used in the WHERE clause of modification queries on distribute -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are unsupported -INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); -ERROR: cannot perform distributed planning for the given modification -DETAIL: Multi-row INSERTs to distributed tables are not supported. +-- commands with multiple rows are supported +INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), + (2038, 'GOOG', 5634, now(), 'buy', random()), + (2039, 'GOOG', 5634, now(), 'buy', random()); -- connect back to the other node \c - - - :worker_1_port -- commands containing a CTE are unsupported diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index dfc70455a..d1d54d739 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1375,6 +1375,15 @@ COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; \. ROLLBACK; +-- Similarly, multi-row INSERTs will take part in transactions and reuse connections... +BEGIN; +INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; +COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; +104,104 +\. +INSERT INTO raw_events_first (user_id, value_1) VALUES (105, 105), (106, 106); +ROLLBACK; + -- selecting from views works CREATE VIEW test_view AS SELECT * FROM raw_events_first; INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4) VALUES diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 518c4f37c..c3c1becf7 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -132,7 +132,7 @@ INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy SET client_min_messages TO DEFAULT; --- commands with non-constant partition values are unsupported +-- commands with non-constant partition values are supported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); @@ -146,8 +146,10 @@ DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are unsupported -INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); +-- commands with multiple rows are supported +INSERT INTO limit_orders VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), + (2038, 'GOOG', 5634, now(), 'buy', random()), + (2039, 'GOOG', 5634, now(), 'buy', random()); -- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported diff --git a/src/test/regress/sql/multi_mx_modifications.sql b/src/test/regress/sql/multi_mx_modifications.sql index c10e8fe38..fd3874c02 100644 --- a/src/test/regress/sql/multi_mx_modifications.sql +++ b/src/test/regress/sql/multi_mx_modifications.sql @@ -76,8 +76,10 @@ DELETE FROM limit_orders_mx WHERE id = 246 AND bidder_id = (random() * 1000); -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders_mx WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are unsupported -INSERT INTO limit_orders_mx VALUES (DEFAULT), (DEFAULT); +-- commands with multiple rows are supported +INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), + (2038, 'GOOG', 5634, now(), 'buy', random()), + (2039, 'GOOG', 5634, now(), 'buy', random()); -- connect back to the other node \c - - - :worker_1_port From addde54464e2523a8b28c6d513e3c5d589c59ebb Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 9 Aug 2017 23:45:48 -0700 Subject: [PATCH 2/4] Add some tests --- src/test/regress/expected/multi_explain.out | 4 +-- src/test/regress/expected/multi_explain_0.out | 4 +-- .../regress/expected/multi_modifications.out | 34 ++++++++++++++++--- .../expected/multi_modifying_xacts.out | 29 ++++++++++++---- .../regress/expected/multi_prepare_sql.out | 29 +++++++++++++++- src/test/regress/expected/multi_upsert.out | 23 +++++++++++-- src/test/regress/sql/multi_explain.sql | 2 +- src/test/regress/sql/multi_modifications.sql | 24 ++++++++++--- .../regress/sql/multi_modifying_xacts.sql | 24 +++++++++++-- src/test/regress/sql/multi_prepare_sql.sql | 13 +++++++ src/test/regress/sql/multi_upsert.sql | 17 +++++++++- 11 files changed, 177 insertions(+), 26 deletions(-) diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 9670abbf4..e78b1b1eb 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -299,14 +299,14 @@ Limit Filter: (l_quantity < 5.0) -- Test insert EXPLAIN (COSTS FALSE) - INSERT INTO lineitem VALUES(1,0); + INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0); Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Insert on lineitem_290000 - -> Result + -> Values Scan on "*VALUES*" -- Test update EXPLAIN (COSTS FALSE) UPDATE lineitem diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 7e0d6e490..998b97578 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -299,14 +299,14 @@ Limit Filter: (l_quantity < 5.0) -- Test insert EXPLAIN (COSTS FALSE) - INSERT INTO lineitem VALUES(1,0); + INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0); Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task Node: host=localhost port=57638 dbname=regression -> Insert on lineitem_290000 - -> Result + -> Values Scan on "*VALUES*" -- Test update EXPLAIN (COSTS FALSE) UPDATE lineitem diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 608c23609..606d329da 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -201,10 +201,36 @@ ERROR: functions used in the WHERE clause of modification queries on distribute -- commands with mutable but non-volatile functions(ie: stable func.) in their quals -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are supported -INSERT INTO limit_orders VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), - (2038, 'GOOG', 5634, now(), 'buy', random()), - (2039, 'GOOG', 5634, now(), 'buy', random()); +-- multi-row inserts are supported +INSERT INTO limit_orders VALUES (12037, 'GOOG', 5634, '2001-04-16 03:37:28', 'buy', 0.50), + (12038, 'GOOG', 5634, '2001-04-17 03:37:28', 'buy', 2.50), + (12039, 'GOOG', 5634, '2001-04-18 03:37:28', 'buy', 1.50); +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 12037 AND 12039; + count +------- + 3 +(1 row) + +-- even those with functions +INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50), + (22038, 'GOOG', 5634, now(), 'buy', 2.50), + (22039, 'GOOG', 5634, now(), 'buy', 1.50); +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; + count +------- + 3 +(1 row) + +-- even those with functions in their partition columns +INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'buy', 0.50), + (random() * 10 + 80000, 'GOOG', 5634, now(), 'buy', 2.50), + (random() * 10 + 80090, 'GOOG', 5634, now(), 'buy', 1.50); +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; + count +------- + 3 +(1 row) + -- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported -- INSERT INTO limit_orders SELECT * FROM limit_orders; diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 940aeae55..1262e0ec8 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -45,16 +45,28 @@ INSERT INTO researchers VALUES (1, 1, 'Donald Knuth'); INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); INSERT INTO researchers VALUES (3, 2, 'Tony Hoare'); INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson'); --- replace a researcher, reusing their id +-- replace a researcher, reusing their id in a multi-row INSERT BEGIN; DELETE FROM researchers WHERE lab_id = 1 AND id = 2; -INSERT INTO researchers VALUES (2, 1, 'John Backus'); +INSERT INTO researchers VALUES (2, 1, 'John Backus'), (12, 1, 'Frances E. Allen'); COMMIT; -SELECT name FROM researchers WHERE lab_id = 1 AND id = 2; - name -------------- +SELECT name FROM researchers WHERE lab_id = 1 AND id % 10 = 2; + name +------------------ John Backus -(1 row) + Frances E. Allen +(2 rows) + +-- and the other way around +BEGIN; +INSERT INTO researchers VALUES (14, 2, 'Alan Kay'), (15, 2, 'Barbara Liskov'); +DELETE FROM researchers WHERE id = 14 AND lab_id = 2; +ROLLBACK; +-- should have rolled everything back +SELECT * FROM researchers WHERE id = 15 AND lab_id = 2; + id | lab_id | name +----+--------+------ +(0 rows) -- abort a modification BEGIN; @@ -209,6 +221,11 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie" ROLLBACK; +-- but it is allowed after a multi-row insert +BEGIN; +INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); +\copy researchers from stdin delimiter ',' +ROLLBACK; -- after a COPY you can modify multiple shards, since they'll use different connections BEGIN; \copy researchers from stdin delimiter ',' diff --git a/src/test/regress/expected/multi_prepare_sql.out b/src/test/regress/expected/multi_prepare_sql.out index fbc951413..4da5dfa45 100644 --- a/src/test/regress/expected/multi_prepare_sql.out +++ b/src/test/regress/expected/multi_prepare_sql.out @@ -383,6 +383,15 @@ EXECUTE prepared_double_parameter_insert(3, 30); EXECUTE prepared_double_parameter_insert(4, 40); EXECUTE prepared_double_parameter_insert(5, 50); EXECUTE prepared_double_parameter_insert(6, 60); +PREPARE prepared_multi_insert(int, int) AS + INSERT INTO prepare_table (key, value) VALUES ($1, $2), ($1 + 1, $2 + 10); +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_multi_insert( 7, 70); +EXECUTE prepared_multi_insert( 9, 90); +EXECUTE prepared_multi_insert(11, 110); +EXECUTE prepared_multi_insert(13, 130); +EXECUTE prepared_multi_insert(15, 150); +EXECUTE prepared_multi_insert(17, 170); PREPARE prepared_non_partition_parameter_insert(int) AS INSERT INTO prepare_table (key, value) VALUES (0, $1); -- execute 6 times to trigger prepared statement usage @@ -420,7 +429,25 @@ SELECT * FROM prepare_table ORDER BY key, value; 5 | 6 | 60 6 | -(24 rows) + 7 | 70 + 8 | 80 + 9 | 90 + 10 | 100 + 11 | 110 + 12 | 120 + 13 | 130 + 14 | 140 + 15 | 150 + 16 | 160 + 17 | 170 + 18 | 180 +(36 rows) + +SELECT master_modify_multiple_shards('DELETE FROM prepare_table WHERE value >= 70'); + master_modify_multiple_shards +------------------------------- + 12 +(1 row) -- check router executor select PREPARE prepared_router_partition_column_select(int) AS diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index 62cd90bd2..d41abe94a 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -20,7 +20,7 @@ SELECT master_create_worker_shards('upsert_test', '4', '2'); (1 row) -- do a regular insert -INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1); +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2); -- observe that there is a conflict and the following query does nothing INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING; -- same as the above with different syntax @@ -35,8 +35,27 @@ SELECT * FROM upsert_test; part_key | other_col | third_col ----------+-----------+----------- 1 | 2 | 4 -(1 row) + 2 | 2 | +(2 rows) +-- do a multi-row DO NOTHING insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) +ON CONFLICT DO NOTHING; +-- do a multi-row DO UPDATE insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 10), (2, 20), (3, 30) +ON CONFLICT (part_key) DO +UPDATE SET other_col = EXCLUDED.other_col WHERE upsert_test.part_key != 1; +-- see the results +SELECT * FROM upsert_test ORDER BY part_key ASC; + part_key | other_col | third_col +----------+-----------+----------- + 1 | 2 | 4 + 2 | 20 | + 3 | 30 | +(3 rows) + +DELETE FROM upsert_test WHERE part_key = 2; +DELETE FROM upsert_test WHERE part_key = 3; -- use a WHERE clause, so that SET doesn't have an affect INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = 30 WHERE upsert_test.other_col = 3; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 9f3035d0d..07aefa97b 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -87,7 +87,7 @@ EXPLAIN (COSTS FALSE) -- Test insert EXPLAIN (COSTS FALSE) - INSERT INTO lineitem VALUES(1,0); + INSERT INTO lineitem VALUES (1,0), (2, 0), (3, 0), (4, 0); -- Test update EXPLAIN (COSTS FALSE) diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index c3c1becf7..cfe2df393 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -146,10 +146,26 @@ DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- commands with multiple rows are supported -INSERT INTO limit_orders VALUES (2037, 'GOOG', 5634, now(), 'buy', random()), - (2038, 'GOOG', 5634, now(), 'buy', random()), - (2039, 'GOOG', 5634, now(), 'buy', random()); +-- multi-row inserts are supported +INSERT INTO limit_orders VALUES (12037, 'GOOG', 5634, '2001-04-16 03:37:28', 'buy', 0.50), + (12038, 'GOOG', 5634, '2001-04-17 03:37:28', 'buy', 2.50), + (12039, 'GOOG', 5634, '2001-04-18 03:37:28', 'buy', 1.50); + +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 12037 AND 12039; + +-- even those with functions +INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50), + (22038, 'GOOG', 5634, now(), 'buy', 2.50), + (22039, 'GOOG', 5634, now(), 'buy', 1.50); + +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; + +-- even those with functions in their partition columns +INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'buy', 0.50), + (random() * 10 + 80000, 'GOOG', 5634, now(), 'buy', 2.50), + (random() * 10 + 80090, 'GOOG', 5634, now(), 'buy', 1.50); + +SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; -- Who says that? :) -- INSERT ... SELECT ... FROM commands are unsupported diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 883b6feda..395a1ed98 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -32,13 +32,22 @@ INSERT INTO researchers VALUES (2, 1, 'Niklaus Wirth'); INSERT INTO researchers VALUES (3, 2, 'Tony Hoare'); INSERT INTO researchers VALUES (4, 2, 'Kenneth Iverson'); --- replace a researcher, reusing their id +-- replace a researcher, reusing their id in a multi-row INSERT BEGIN; DELETE FROM researchers WHERE lab_id = 1 AND id = 2; -INSERT INTO researchers VALUES (2, 1, 'John Backus'); +INSERT INTO researchers VALUES (2, 1, 'John Backus'), (12, 1, 'Frances E. Allen'); COMMIT; -SELECT name FROM researchers WHERE lab_id = 1 AND id = 2; +SELECT name FROM researchers WHERE lab_id = 1 AND id % 10 = 2; + +-- and the other way around +BEGIN; +INSERT INTO researchers VALUES (14, 2, 'Alan Kay'), (15, 2, 'Barbara Liskov'); +DELETE FROM researchers WHERE id = 14 AND lab_id = 2; +ROLLBACK; + +-- should have rolled everything back +SELECT * FROM researchers WHERE id = 15 AND lab_id = 2; -- abort a modification BEGIN; @@ -172,6 +181,15 @@ INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie'); \. ROLLBACK; +-- but it is allowed after a multi-row insert +BEGIN; +INSERT INTO researchers VALUES (2, 1, 'Knuth Donald'), (10, 6, 'Lamport Leslie'); +\copy researchers from stdin delimiter ',' +3,1,Duth Knonald +10,6,Lesport Lampie +\. +ROLLBACK; + -- after a COPY you can modify multiple shards, since they'll use different connections BEGIN; \copy researchers from stdin delimiter ',' diff --git a/src/test/regress/sql/multi_prepare_sql.sql b/src/test/regress/sql/multi_prepare_sql.sql index bde59efbf..e67d783a9 100644 --- a/src/test/regress/sql/multi_prepare_sql.sql +++ b/src/test/regress/sql/multi_prepare_sql.sql @@ -245,6 +245,17 @@ EXECUTE prepared_double_parameter_insert(4, 40); EXECUTE prepared_double_parameter_insert(5, 50); EXECUTE prepared_double_parameter_insert(6, 60); +PREPARE prepared_multi_insert(int, int) AS + INSERT INTO prepare_table (key, value) VALUES ($1, $2), ($1 + 1, $2 + 10); + +-- execute 6 times to trigger prepared statement usage +EXECUTE prepared_multi_insert( 7, 70); +EXECUTE prepared_multi_insert( 9, 90); +EXECUTE prepared_multi_insert(11, 110); +EXECUTE prepared_multi_insert(13, 130); +EXECUTE prepared_multi_insert(15, 150); +EXECUTE prepared_multi_insert(17, 170); + PREPARE prepared_non_partition_parameter_insert(int) AS INSERT INTO prepare_table (key, value) VALUES (0, $1); @@ -259,6 +270,8 @@ EXECUTE prepared_non_partition_parameter_insert(60); -- check inserted values SELECT * FROM prepare_table ORDER BY key, value; +SELECT master_modify_multiple_shards('DELETE FROM prepare_table WHERE value >= 70'); + -- check router executor select PREPARE prepared_router_partition_column_select(int) AS SELECT diff --git a/src/test/regress/sql/multi_upsert.sql b/src/test/regress/sql/multi_upsert.sql index 72e0b9a92..9795fc116 100644 --- a/src/test/regress/sql/multi_upsert.sql +++ b/src/test/regress/sql/multi_upsert.sql @@ -16,7 +16,7 @@ SELECT master_create_distributed_table('upsert_test', 'part_key', 'hash'); SELECT master_create_worker_shards('upsert_test', '4', '2'); -- do a regular insert -INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1); +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2); -- observe that there is a conflict and the following query does nothing INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING; @@ -34,6 +34,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) -- see the results SELECT * FROM upsert_test; +-- do a multi-row DO NOTHING insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) +ON CONFLICT DO NOTHING; + +-- do a multi-row DO UPDATE insert +INSERT INTO upsert_test (part_key, other_col) VALUES (1, 10), (2, 20), (3, 30) +ON CONFLICT (part_key) DO +UPDATE SET other_col = EXCLUDED.other_col WHERE upsert_test.part_key != 1; + +-- see the results +SELECT * FROM upsert_test ORDER BY part_key ASC; + +DELETE FROM upsert_test WHERE part_key = 2; +DELETE FROM upsert_test WHERE part_key = 3; + -- use a WHERE clause, so that SET doesn't have an affect INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO UPDATE SET other_col = 30 WHERE upsert_test.other_col = 3; From a578506718458e3bda7f24a483c94977e9a38b2d Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Thu, 10 Aug 2017 00:01:50 -0700 Subject: [PATCH 3/4] Add multi-row isolation tests --- .../expected/isolation_concurrent_dml.out | 59 +++++++++++++++++++ .../isolation_distributed_transaction_id.out | 4 +- .../isolation_dump_global_wait_edges.out | 18 +++--- .../isolation_replace_wait_function.out | 2 +- .../specs/isolation_concurrent_dml.spec | 35 +++++++++++ 5 files changed, 106 insertions(+), 12 deletions(-) diff --git a/src/test/regress/expected/isolation_concurrent_dml.out b/src/test/regress/expected/isolation_concurrent_dml.out index f7f8fc9ee..8fab5e91b 100644 --- a/src/test/regress/expected/isolation_concurrent_dml.out +++ b/src/test/regress/expected/isolation_concurrent_dml.out @@ -28,3 +28,62 @@ step s1-insert: step s2-update: UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; + +starting permutation: s1-begin s1-multi-insert s2-update s1-commit +master_create_worker_shards + + +step s1-begin: + BEGIN; + +step s1-multi-insert: + INSERT INTO test_concurrent_dml VALUES (1), (2); + +step s2-update: + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; + +step s1-commit: + COMMIT; + +step s2-update: <... completed> + +starting permutation: s1-begin s1-multi-insert s2-multi-insert-overlap s1-commit +master_create_worker_shards + + +step s1-begin: + BEGIN; + +step s1-multi-insert: + INSERT INTO test_concurrent_dml VALUES (1), (2); + +step s2-multi-insert-overlap: + INSERT INTO test_concurrent_dml VALUES (1), (4); + +step s1-commit: + COMMIT; + +step s2-multi-insert-overlap: <... completed> + +starting permutation: s1-begin s2-begin s1-multi-insert s2-multi-insert s1-commit s2-commit +master_create_worker_shards + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-multi-insert: + INSERT INTO test_concurrent_dml VALUES (1), (2); + +step s2-multi-insert: + INSERT INTO test_concurrent_dml VALUES (3), (4); + +step s1-commit: + COMMIT; + +step s2-commit: + COMMIT; + diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 41806ad2e..56dd8ed15 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -98,7 +98,7 @@ step s1-get-current-transaction-id: row -(0,290) +(0,301) step s2-get-first-worker-active-transactions: SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM @@ -109,4 +109,4 @@ step s2-get-first-worker-active-transactions: nodename nodeport success result -localhost 57637 t (0,290) +localhost 57637 t (0,301) diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 6365b7719..e0621fa88 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -293 292 f +304 303 f transactionnumberwaitingtransactionnumbers -292 -293 292 +303 +304 303 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -297 296 f -298 296 f -298 297 t +308 307 f +309 307 f +309 308 t transactionnumberwaitingtransactionnumbers -296 -297 296 -298 296,297 +307 +308 307 +309 307,308 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_replace_wait_function.out b/src/test/regress/expected/isolation_replace_wait_function.out index 2b0cbd87c..4a6673c3c 100644 --- a/src/test/regress/expected/isolation_replace_wait_function.out +++ b/src/test/regress/expected/isolation_replace_wait_function.out @@ -16,7 +16,7 @@ step s1-finish: COMMIT; step s2-insert: <... completed> -error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102781" +error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102793" step s2-finish: COMMIT; diff --git a/src/test/regress/specs/isolation_concurrent_dml.spec b/src/test/regress/specs/isolation_concurrent_dml.spec index 74c86583f..642dd2a22 100644 --- a/src/test/regress/specs/isolation_concurrent_dml.spec +++ b/src/test/regress/specs/isolation_concurrent_dml.spec @@ -22,6 +22,11 @@ step "s1-insert" INSERT INTO test_concurrent_dml VALUES(1); } +step "s1-multi-insert" +{ + INSERT INTO test_concurrent_dml VALUES (1), (2); +} + step "s1-commit" { COMMIT; @@ -29,12 +34,42 @@ step "s1-commit" session "s2" +step "s2-begin" +{ + BEGIN; +} + step "s2-update" { UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; } +step "s2-multi-insert-overlap" +{ + INSERT INTO test_concurrent_dml VALUES (1), (4); +} + +step "s2-multi-insert" +{ + INSERT INTO test_concurrent_dml VALUES (3), (4); +} + +step "s2-commit" +{ + COMMIT; +} + # verify that an in-progress insert blocks concurrent updates permutation "s1-begin" "s1-insert" "s2-update" "s1-commit" + # but an insert without xact will not block permutation "s1-insert" "s2-update" + +# verify that an in-progress multi-row insert blocks concurrent updates +permutation "s1-begin" "s1-multi-insert" "s2-update" "s1-commit" + +# two multi-row inserts that hit same shards will block +permutation "s1-begin" "s1-multi-insert" "s2-multi-insert-overlap" "s1-commit" + +# but concurrent multi-row inserts don't block unless shards overlap +permutation "s1-begin" "s2-begin" "s1-multi-insert" "s2-multi-insert" "s1-commit" "s2-commit" From dee66e395985d03d1f287f9499d1aa46e1ec682b Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Thu, 10 Aug 2017 00:32:07 -0700 Subject: [PATCH 4/4] Final review feedback --- .../distributed/executor/multi_executor.c | 2 +- .../planner/multi_router_planner.c | 21 +++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 62c35d769..2a210ee5f 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -66,7 +66,7 @@ static CustomExecMethods RouterSingleModifyCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -/* not static to enable swapping in multi-modify logic during router execution */ +/* not static to enable reference by multi-modify logic in router execution */ CustomExecMethods RouterMultiModifyCustomExecMethods = { .CustomName = "RouterMultiModifyScan", .BeginCustomScan = CitusModifyBeginScan, diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c9854e1b2..1f4a2b7bf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -77,6 +77,7 @@ typedef struct InsertValues Expr *partitionValueExpr; /* partition value provided in INSERT row */ List *rowValues; /* full values list of INSERT row, possibly NIL */ int64 shardId; /* target shard for this row, possibly invalid */ + Index listIndex; /* index to make our sorting stable */ } InsertValues; @@ -2117,6 +2118,7 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) Var *partitionVar = (Var *) targetEntry->expr; RangeTblEntry *referencedRTE = NULL; ListCell *valuesListCell = NULL; + Index ivIndex = 0; referencedRTE = rt_fetch(partitionVar->varno, query->rtable); foreach(valuesListCell, referencedRTE->values_lists) @@ -2126,8 +2128,10 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) insertValues->partitionValueExpr = list_nth(insertValues->rowValues, (partitionVar->varattno - 1)); insertValues->shardId = INVALID_SHARD_ID; + insertValues->listIndex = ivIndex; insertValuesList = lappend(insertValuesList, insertValues); + ivIndex++; } } @@ -2336,8 +2340,9 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) InsertValues *rightValue = *((InsertValues **) rightElement); int64 leftShardId = leftValue->shardId; int64 rightShardId = rightValue->shardId; + Index leftIndex = leftValue->listIndex; + Index rightIndex = rightValue->listIndex; - /* we compare 64-bit integers, instead of casting their difference to int */ if (leftShardId > rightShardId) { return 1; @@ -2348,6 +2353,18 @@ CompareInsertValuesByShardId(const void *leftElement, const void *rightElement) } else { - return 0; + /* shard identifiers are the same, list index is secondary sort key */ + if (leftIndex > rightIndex) + { + return 1; + } + else if (leftIndex < rightIndex) + { + return -1; + } + else + { + return 0; + } } }