From d5ad8d7db9944cbab4a845cc79d9bf6485833b5a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 1/6] Add tests verifying that updates return correct tuple counts. This unfortunately requires adding a new table, triggering renumbering of a number of shard ids. --- .../regress/expected/multi_modifications.out | 63 ++++++++++++++++++- src/test/regress/sql/multi_modifications.sql | 34 ++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 33d64ae68..970d4ff45 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -12,6 +12,10 @@ CREATE TABLE limit_orders ( kind order_side NOT NULL, limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) ); +CREATE TABLE multiple_hash ( + category text NOT NULL, + data text NOT NULL +); CREATE TABLE insufficient_shards ( LIKE limit_orders ); CREATE TABLE range_partitioned ( LIKE limit_orders ); CREATE TABLE append_partitioned ( LIKE limit_orders ); @@ -21,6 +25,12 @@ SELECT master_create_distributed_table('limit_orders', 'id', 'hash'); (1 row) +SELECT master_create_distributed_table('multiple_hash', 'category', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash'); master_create_distributed_table --------------------------------- @@ -45,6 +55,12 @@ SELECT master_create_worker_shards('limit_orders', 2, 2); (1 row) +SELECT master_create_worker_shards('multiple_hash', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + -- make a single shard that covers no partition values SELECT master_create_worker_shards('insufficient_shards', 1, 1); master_create_worker_shards @@ -95,7 +111,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 SET client_min_messages TO 'DEBUG2'; SET citus.task_executor_type TO 'real-time'; SELECT * FROM range_partitioned WHERE id = 32743; -DEBUG: predicate pruning for shardId 750004 +DEBUG: predicate pruning for shardId 750006 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price -------+--------+-----------+--------------------------+------+------------- @@ -103,7 +119,7 @@ DEBUG: Plan is router executable (1 row) SELECT * FROM append_partitioned WHERE id = 414123; -DEBUG: predicate pruning for shardId 750006 +DEBUG: predicate pruning for shardId 750008 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- @@ -380,3 +396,46 @@ ERROR: functions used in modification queries on distributed tables must be mar -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard +-- ensure returned row counters are correct +\set QUIET off +INSERT INTO multiple_hash VALUES ('1', '1'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('1', '2'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('1', '3'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '1'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '2'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '3'); +INSERT 0 1 +-- check that update return the right number of rows +-- one row +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; +UPDATE 1 +-- three rows +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +UPDATE 3 +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + category | data +----------+------- + 1 | 1-1-2 + 1 | 2-2 + 1 | 3-2 +(3 rows) + +-- check that deletes return the right number of rows +-- one row +DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; +DELETE 1 +-- two rows +DELETE FROM multiple_hash WHERE category = '2'; +DELETE 2 +-- check +SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; + category | data +----------+------ +(0 rows) + diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 2adf977bd..f85623cc1 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -18,16 +18,24 @@ CREATE TABLE limit_orders ( limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) ); + +CREATE TABLE multiple_hash ( + category text NOT NULL, + data text NOT NULL +); + CREATE TABLE insufficient_shards ( LIKE limit_orders ); CREATE TABLE range_partitioned ( LIKE limit_orders ); CREATE TABLE append_partitioned ( LIKE limit_orders ); SELECT master_create_distributed_table('limit_orders', 'id', 'hash'); +SELECT master_create_distributed_table('multiple_hash', 'category', 'hash'); SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash'); SELECT master_create_distributed_table('range_partitioned', 'id', 'range'); SELECT master_create_distributed_table('append_partitioned', 'id', 'append'); SELECT master_create_worker_shards('limit_orders', 2, 2); +SELECT master_create_worker_shards('multiple_hash', 2, 2); -- make a single shard that covers no partition values SELECT master_create_worker_shards('insufficient_shards', 1, 1); @@ -292,3 +300,29 @@ UPDATE limit_orders SET placed_at = now() WHERE id = 246; -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; + + +-- ensure returned row counters are correct +\set QUIET off +INSERT INTO multiple_hash VALUES ('1', '1'); +INSERT INTO multiple_hash VALUES ('1', '2'); +INSERT INTO multiple_hash VALUES ('1', '3'); +INSERT INTO multiple_hash VALUES ('2', '1'); +INSERT INTO multiple_hash VALUES ('2', '2'); +INSERT INTO multiple_hash VALUES ('2', '3'); + +-- check that update return the right number of rows +-- one row +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; +-- three rows +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + +-- check that deletes return the right number of rows +-- one row +DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; +-- two rows +DELETE FROM multiple_hash WHERE category = '2'; +-- check +SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; From f78c135e638bc8c619c905a782858d1e4d93b541 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 2/6] Fix definition of faux targetlist element inserted to prevent backward scans. The targetlist contains TargetEntrys containing expressions, not expressions directly. That didn't matter so far, but with the upcoming RETURNING support, the targetlist is inspected to build a TupleDesc. ExecCleanTypeFromTL hits an assert when looking at something that's not a TargetEntry. Mark the entry as resjunk, so it's not actually used. --- src/backend/distributed/planner/multi_planner.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 511c6dc8d..e3c899d37 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -25,6 +25,8 @@ #include "executor/executor.h" +#include "nodes/makefuncs.h" + #include "optimizer/planner.h" #include "utils/memutils.h" @@ -196,11 +198,17 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan) if (!ExecSupportsBackwardScan(result->planTree)) { FuncExpr *funcExpr = makeNode(FuncExpr); + TargetEntry *targetEntry = NULL; + bool resjunkAttribute = true; + funcExpr->funcretset = true; + targetEntry = makeTargetEntry((Expr *) funcExpr, InvalidAttrNumber, NULL, + resjunkAttribute); + fauxFunctionScan->scan.plan.targetlist = lappend(fauxFunctionScan->scan.plan.targetlist, - funcExpr); + targetEntry); } result->planTree = (Plan *) fauxFunctionScan; From e1282b6d708be76e27fdc6f2f193882f42429994 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 3/6] Remember original targetlist in MultiQueryContainerNode(). The old targetlist wasn't used so far, but the upcoming RETURNING support relies on it. This also allows to get rid of some crufty code in multi_executor.c:multi_ExecutorStart(), which used the worker query's targetlist instead of the main statement's (which didn't have one up to now). --- src/backend/distributed/executor/multi_executor.c | 4 ++-- src/backend/distributed/planner/multi_planner.c | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 181e9f524..e831e4583 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -59,9 +59,9 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) { Task *task = NULL; List *taskList = workerJob->taskList; + TupleDesc tupleDescriptor = ExecCleanTypeFromTL( + planStatement->planTree->targetlist, false); List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList; - List *workerTargetList = multiPlan->workerJob->jobQuery->targetList; - TupleDesc tupleDescriptor = ExecCleanTypeFromTL(workerTargetList, false); /* router executor can only execute distributed plans with a single task */ Assert(list_length(taskList) == 1); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index e3c899d37..87c6f4185 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -185,6 +185,9 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan) fauxFunctionScan = makeNode(FunctionScan); fauxFunctionScan->functions = lappend(fauxFunctionScan->functions, fauxFunction); + /* copy original targetlist, accessed for RETURNING queries */ + fauxFunctionScan->scan.plan.targetlist = copyObject(result->planTree->targetlist); + /* * Add set returning function to target list if the original (postgres * created) plan doesn't support backward scans; doing so prevents From c38c1adce1e16561e4e59a2bf174f0094af27813 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 4/6] Combine router executor paths for select and modify commands. The upcoming RETURNING support would otherwise require too much duplication. This contains most of the pieces required for RETURNING support, except removing the planner checks and adjusting regression test output. --- .../executor/multi_router_executor.c | 519 +++++++++++------- .../distributed/multi_router_executor.h | 2 - 2 files changed, 325 insertions(+), 196 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index e1e3ac205..9e643b00c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -42,14 +42,17 @@ bool AllModificationsCommutative = false; static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); -static uint64 ExecuteDistributedModify(Task *task); -static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, - Task *task, EState *executorState, - TupleDesc tupleDescriptor, - DestReceiver *destination); +static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, + Task *task, + bool isModificationQuery, + bool expectResults); +static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, + Tuplestorestate *tupleStore); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); -static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); +static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows); +static bool ConsumeQueryResult(PGconn *connection, int64 *rows); /* @@ -181,6 +184,9 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; + DestReceiver *destination = queryDesc->dest; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); @@ -201,24 +207,72 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) - { - uint64 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_SELECT) - { - DestReceiver *destination = queryDesc->dest; - TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + estate->es_processed = 0; - ExecuteSingleShardSelect(queryDesc, count, task, estate, - resultTupleDescriptor, destination); - } - else + /* startup the tuple receiver */ + if (sendTuples) { - ereport(ERROR, (errmsg("unrecognized operation code: %d", - (int) operation))); + (*destination->rStartup)(destination, operation, queryDesc->tupDesc); + } + + /* + * If query has not yet been executed, do so now. The main reason why the + * query might already have been executed is cursors. + */ + if (!routerState->eof_underlying) + { + bool resultsOK = false; + bool isModificationQuery = false; + + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE) + { + isModificationQuery = true; + } + else if (operation != CMD_SELECT) + { + ereport(ERROR, (errmsg("unrecognized operation code: %d", + (int) operation))); + } + + resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, + isModificationQuery, + sendTuples); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } + + /* mark underlying query as having executed */ + routerState->eof_underlying = true; + } + + /* if the underlying query produced output, return it */ + if (routerState->tuplestorestate != NULL) + { + TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + int64 returnedRows = 0; + + /* return rows from the tuplestore */ + returnedRows = ReturnRowsFromTuplestore(count, resultTupleDescriptor, + destination, + routerState->tuplestorestate); + + /* + * Count tuples processed, if this is a SELECT. (For modifications + * it'll already have been increased, as we want the number of + * modified tuples, not the number of RETURNed tuples.) + */ + if (operation == CMD_SELECT) + { + estate->es_processed += returnedRows; + } + } + + /* shutdown tuple receiver, if we started it */ + if (sendTuples) + { + (*destination->rShutdown)(destination); } if (queryDesc->totaltime != NULL) @@ -231,153 +285,159 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas /* - * ExecuteDistributedModify is the main entry point for modifying distributed - * tables. A distributed modification is successful if any placement of the - * distributed table is successful. ExecuteDistributedModify returns the number - * of modified rows in that case and errors in all others. This function will - * also generate warnings for individual placement failures. + * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves + * the results and stores them, if SELECT or RETURNING is used, in a tuple + * store. + * + * If the task fails on one of the placements, the function retries it on + * other placements (SELECT), reraises the remote error (constraint violation + * in DML), marks the affected placement as invalid (DML on some placement + * failed), or errors out (DML failed on all placements). */ -static uint64 -ExecuteDistributedModify(Task *task) +static bool +ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, + bool expectResults) { - int64 affectedTupleCount = -1; + TupleDesc tupleDescriptor = queryDesc->tupDesc; + EState *executorState = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool resultsOK = false; + List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; List *failedPlacementList = NIL; ListCell *failedPlacementCell = NULL; + int64 affectedTupleCount = -1; + bool gotResults = false; - foreach(taskPlacementCell, task->taskPlacementList) + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); char *nodeName = taskPlacement->nodeName; int32 nodePort = taskPlacement->nodePort; + bool queryOK = false; + int64 currentAffectedTupleCount = 0; + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - PGconn *connection = NULL; - PGresult *result = NULL; - char *currentAffectedTupleString = NULL; - int64 currentAffectedTupleCount = -1; - - Assert(taskPlacement->shardState == FILE_FINALIZED); - - connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - result = PQexec(connection, task->queryString); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + queryOK = SendQueryInSingleRowMode(connection, task->queryString); + if (!queryOK) { - char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); - int category = 0; - bool raiseError = false; - - /* - * If the error code is in constraint violation class, we want to - * fail fast because we must get the same error from all shard - * placements. - */ - category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); - - if (raiseError) - { - ReraiseRemoteError(connection, result); - } - else - { - WarnRemoteError(connection, result); - } - PQclear(result); - + PurgeConnection(connection); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - currentAffectedTupleString = PQcmdTuples(result); - - /* could throw error if input > MAX_INT64 */ - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); - Assert(currentAffectedTupleCount >= 0); - -#if (PG_VERSION_NUM < 90600) - - /* before 9.6, PostgreSQL used a uint32 for this field, so check */ - Assert(currentAffectedTupleCount <= 0xFFFFFFFF); -#endif - - if ((affectedTupleCount == -1) || - (affectedTupleCount == currentAffectedTupleCount)) + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (!gotResults && expectResults) { - affectedTupleCount = currentAffectedTupleCount; + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + ¤tAffectedTupleCount); } else { - ereport(WARNING, - (errmsg("modified " INT64_FORMAT " tuples, but expected to modify " - INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount), - errdetail("modified placement on %s:%d", nodeName, nodePort))); + queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount); } - PQclear(result); + if (queryOK) + { + if ((affectedTupleCount == -1) || + (affectedTupleCount == currentAffectedTupleCount)) + { + affectedTupleCount = currentAffectedTupleCount; + } + else + { + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples, but expected " + "to modify "INT64_FORMAT, + currentAffectedTupleCount, affectedTupleCount), + errdetail("modified placement on %s:%d", + nodeName, nodePort))); + } + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + + resultsOK = true; + gotResults = true; + + /* + * Modifications have to be executed on all placements, but for + * read queries we can stop here. + */ + if (!isModificationQuery) + { + break; + } + } + else + { + PurgeConnection(connection); + + failedPlacementList = lappend(failedPlacementList, taskPlacement); + + continue; + } } - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not modify any active placements"))); + /* if all placements failed, error out */ + if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + { + ereport(ERROR, (errmsg("could not modify any active placements"))); + } + + /* otherwise, mark failed placements as inactive: they're stale */ + foreach(failedPlacementCell, failedPlacementList) + { + ShardPlacement *failedPlacement = + (ShardPlacement *) lfirst(failedPlacementCell); + uint64 shardLength = 0; + + DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, + failedPlacement->nodePort); + InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + failedPlacement->nodeName, failedPlacement->nodePort); + } + + executorState->es_processed = affectedTupleCount; } - /* otherwise, mark failed placements as inactive: they're stale */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 shardLength = 0; - - DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, - failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, - failedPlacement->nodeName, failedPlacement->nodePort); - } - - return (uint64) affectedTupleCount; + return resultsOK; } /* - * ExecuteSingleShardSelect executes, if not done already, the remote select query and - * sends the resulting tuples to the given destination receiver. If the query fails on a - * given placement, the function attempts it on its replica. + * ReturnRowsFromTuplestore moves rows from a given tuplestore into a + * receiver. It performs the necessary limiting to support cursors. */ -static void -ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, - EState *executorState, TupleDesc tupleDescriptor, - DestReceiver *destination) +static uint64 +ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, Tuplestorestate *tupleStore) { - bool resultsOK = false; TupleTableSlot *tupleTableSlot = NULL; - MaterialState *routerState = (MaterialState *) queryDesc->planstate; - Tuplestorestate *tupleStore = routerState->tuplestorestate; uint64 currentTupleCount = 0; - /* initialize tuplestore for the first call */ - if (routerState->tuplestorestate == NULL) - { - routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); - tupleStore = routerState->tuplestorestate; - - resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); - if (!resultsOK) - { - ereport(ERROR, (errmsg("could not receive query results"))); - } - } - tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); - /* startup the tuple receiver */ - (*destination->rStartup)(destination, CMD_SELECT, tupleDescriptor); - /* iterate over tuples in tuple store, and send them to destination */ for (;;) { @@ -388,7 +448,6 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } (*destination->receiveSlot)(tupleTableSlot, destination); - executorState->es_processed++; ExecClearTuple(tupleTableSlot); @@ -404,65 +463,9 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } } - /* shutdown the tuple receiver */ - (*destination->rShutdown)(destination); - ExecDropSingleTupleTableSlot(tupleTableSlot); -} - -/* - * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves - * the results and stores them in the given tuple store. If the task fails on - * one of the placements, the function retries it on other placements. - */ -bool -ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) -{ - bool resultsOK = false; - List *taskPlacementList = task->taskPlacementList; - ListCell *taskPlacementCell = NULL; - - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) - { - ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - char *nodeName = taskPlacement->nodeName; - int32 nodePort = taskPlacement->nodePort; - bool queryOK = false; - bool storedOK = false; - - PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - if (connection == NULL) - { - continue; - } - - queryOK = SendQueryInSingleRowMode(connection, task->queryString); - if (!queryOK) - { - PurgeConnection(connection); - continue; - } - - storedOK = StoreQueryResult(connection, tupleDescriptor, tupleStore); - if (storedOK) - { - resultsOK = true; - break; - } - else - { - tuplestore_clear(tupleStore); - PurgeConnection(connection); - } - } - - return resultsOK; + return currentTupleCount; } @@ -497,25 +500,38 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) /* * StoreQueryResult gets the query results from the given connection, builds - * tuples from the results and stores them in the given tuple-store. If the - * function can't receive query results, it returns false. Note that this - * function assumes the query has already been sent on the connection and the - * tuplestore has earlier been initialized. + * tuples from the results, and stores them in the a newly created + * tuple-store. If the function can't receive query results, it returns + * false. Note that this function assumes the query has already been sent on + * the connection. */ static bool -StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) +StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); + Tuplestorestate *tupleStore = NULL; uint32 expectedColumnCount = tupleDescriptor->natts; char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); + bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + *rows = 0; - Assert(tupleStore != NULL); + if (routerState->tuplestorestate == NULL) + { + routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + } + else + { + /* might have failed query execution on another placement before */ + tuplestore_clear(routerState->tuplestorestate); + } + + tupleStore = routerState->tuplestorestate; for (;;) { @@ -534,10 +550,33 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, resultStatus = PQresultStatus(result); if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK)) { - WarnRemoteError(connection, result); + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); - return false; + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; } rowCount = PQntuples(result); @@ -575,6 +614,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, tuplestore_puttuple(tupleStore, heapTuple); MemoryContextReset(ioContext); + (*rows)++; } PQclear(result); @@ -582,7 +622,98 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, pfree(columnArray); - return true; + return !commandFailed; +} + + +/* + * ConsumeQueryResult gets a query result from a connection, counting the rows + * and checking for errors, but otherwise discarding potentially returned + * rows. Returns true if a non-error result has been returned, false if there + * has been an error. + */ +static bool +ConsumeQueryResult(PGconn *connection, int64 *rows) +{ + bool commandFailed = false; + bool gotResponse = false; + + *rows = 0; + + /* + * Due to single row mode we have to do multiple PQgetResult() to finish + * processing of this query, even without RETURNING. For single-row mode + * we have to loop until all rows are consumed. + */ + while (true) + { + PGresult *result = PQgetResult(connection); + ExecStatusType status = PGRES_COMMAND_OK; + + if (result == NULL) + { + break; + } + + status = PQresultStatus(result); + + if (status != PGRES_COMMAND_OK && + status != PGRES_SINGLE_TUPLE && + status != PGRES_TUPLES_OK) + { + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); + + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; + } + + if (status == PGRES_COMMAND_OK) + { + char *currentAffectedTupleString = PQcmdTuples(result); + int64 currentAffectedTupleCount = 0; + + scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + Assert(currentAffectedTupleCount >= 0); + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + *rows += currentAffectedTupleCount; + } + else + { + *rows += PQntuples(result); + } + + PQclear(result); + gotResponse = true; + } + + return gotResponse && !commandFailed; } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b2400071e..f18af7686 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -18,8 +18,6 @@ extern bool AllModificationsCommutative; extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task); -extern bool ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); From cccba66f24a59ff05971d1ea5c5bb328a8accf17 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 5/6] Support RETURNING for modification commands. Fixes: #242 --- .../master/master_modify_multiple_shards.c | 8 ++++++++ .../distributed/planner/multi_router_planner.c | 15 +++++---------- src/test/regress/expected/multi_modifications.out | 13 ------------- src/test/regress/expected/multi_shard_modify.out | 6 ++---- src/test/regress/sql/multi_modifications.sql | 10 ---------- 5 files changed, 15 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 0ca5afc52..48d82e66e 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -116,6 +116,14 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) ErrorIfModifyQueryNotSupported(modifyQuery); + /* reject queries with a returning list */ + if (list_length(modifyQuery->returningList) > 0) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("master_modify_multiple_shards() does not support RETURNING"))); + } + shardIntervalList = LoadShardIntervalList(relationId); restrictClauseList = WhereClauseList(modifyQuery->jointree); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index cf12fefa0..5f76b3194 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -255,16 +255,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) "supported."))); } - /* reject queries with a returning list */ - if (list_length(queryTree->returningList) > 0) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("RETURNING clauses are not supported in distributed " - "modifications."))); - } - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -298,6 +288,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { hasNonConstQualExprs = true; } + + if (contain_mutable_functions((Node *) queryTree->returningList)) + { + hasNonConstTargetEntryExprs = true; + } } #if (PG_VERSION_NUM >= 90500) diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 970d4ff45..c61b225de 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -195,11 +195,6 @@ DETAIL: Multi-row INSERTs to distributed tables are not supported. INSERT INTO limit_orders SELECT * FROM limit_orders; ERROR: cannot perform distributed planning for the given modifications DETAIL: Subqueries are not supported in distributed modifications. --- commands with a RETURNING clause are unsupported -INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00) - RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; @@ -244,10 +239,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations --- commands with a RETURNING clause are unsupported -DELETE FROM limit_orders WHERE id = 246 RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -363,10 +354,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations --- commands with a RETURNING clause are unsupported -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 59a815659..0d4e389d6 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -62,8 +62,7 @@ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. +ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); ERROR: cannot perform distributed planning for the given modification @@ -205,8 +204,7 @@ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. +ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); ERROR: cannot perform distributed planning for the given modification diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index f85623cc1..935a9228a 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -143,10 +143,6 @@ INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); -- INSERT ... SELECT ... FROM commands are unsupported INSERT INTO limit_orders SELECT * FROM limit_orders; --- commands with a RETURNING clause are unsupported -INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00) - RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; @@ -174,9 +170,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands with a RETURNING clause are unsupported -DELETE FROM limit_orders WHERE id = 246 RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -275,9 +268,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands with a RETURNING clause are unsupported -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; From 4549e0688401168832b333239705309efa69e6a8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 22 Jun 2016 14:36:36 -0700 Subject: [PATCH 6/6] Add regression tests for RETURNING. --- .../regress/expected/multi_modifications.out | 134 +++++++++++++++++- src/test/regress/expected/multi_upsert.out | 17 +++ src/test/regress/expected/multi_upsert_0.out | 13 ++ src/test/regress/sql/multi_modifications.sql | 46 +++++- src/test/regress/sql/multi_upsert.sql | 9 ++ 5 files changed, 211 insertions(+), 8 deletions(-) diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index c61b225de..f3087ed9b 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -97,6 +97,13 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 32743; 1 (1 row) +-- basic single-row INSERT with RETURNING +INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-------+--------+-----------+--------------------------+------+------------- + 32744 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 +(1 row) + -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -173,6 +180,14 @@ INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" DETAIL: Key (id)=(32743) already exists. CONTEXT: while executing command on localhost:57638 +-- INSERT violating primary key constraint, with RETURNING specified. +INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; +ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" +DETAIL: Key (id)=(32743) already exists. +CONTEXT: while executing command on localhost:57638 +-- INSERT, with RETURNING specified, failing with a non-constraint error +INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; +ERROR: could not modify any active placements SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', @@ -215,6 +230,19 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; 0 (1 row) +-- test simple DELETE with RETURNING +DELETE FROM limit_orders WHERE id = 430 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+----------------- + 430 | IBM | 214 | Tue Jan 28 15:31:17 2003 | buy | 1.4142135623731 +(1 row) + +SELECT COUNT(*) FROM limit_orders WHERE id = 430; + count +------- + 0 +(1 row) + -- DELETE with expression in WHERE clause INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -256,6 +284,13 @@ SELECT symbol FROM limit_orders WHERE id = 246; GM (1 row) +-- simple UPDATE with RETURNING +UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 162 | Mon Jul 02 16:32:15 2007 | sell | 20.69 +(1 row) + -- expression UPDATE UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; SELECT bidder_id FROM limit_orders WHERE id = 246; @@ -264,6 +299,13 @@ SELECT bidder_id FROM limit_orders WHERE id = 246; 18 (1 row) +-- expression UPDATE with RETURNING +UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | sell | 20.69 +(1 row) + -- multi-column UPDATE UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; @@ -272,6 +314,13 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246; buy | 0.00 (1 row) +-- multi-column UPDATE with RETURNING +UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | buy | 999 +(1 row) + -- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); @@ -362,7 +411,7 @@ DETAIL: Common table expressions are not supported in distributed modifications SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- - GM | 18 + GM | 30 (1 row) -- updates referencing just a var are supported @@ -377,12 +426,51 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; gm | 247 (1 row) +-- IMMUTABLE functions are allowed -- even in returning +UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; + id | lower | symbol +-----+-------+-------- + 246 | gm | GM +(1 row) + -- updates referencing non-IMMUTABLE functions are unsupported UPDATE limit_orders SET placed_at = now() WHERE id = 246; ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- even in RETURNING +UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard +-- check that multi-row UPDATE/DELETEs with RETURNING work +INSERT INTO multiple_hash VALUES ('0', '1'); +INSERT INTO multiple_hash VALUES ('0', '2'); +INSERT INTO multiple_hash VALUES ('0', '3'); +INSERT INTO multiple_hash VALUES ('0', '4'); +INSERT INTO multiple_hash VALUES ('0', '5'); +INSERT INTO multiple_hash VALUES ('0', '6'); +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *; + category | data +----------+------ + 0 | 1-1 + 0 | 2-1 + 0 | 3-1 + 0 | 4-1 + 0 | 5-1 + 0 | 6-1 +(6 rows) + +DELETE FROM multiple_hash WHERE category = '0' RETURNING *; + category | data +----------+------ + 0 | 1-1 + 0 | 2-1 + 0 | 3-1 + 0 | 4-1 + 0 | 5-1 + 0 | 6-1 +(6 rows) + -- ensure returned row counters are correct \set QUIET off INSERT INTO multiple_hash VALUES ('1', '1'); @@ -396,6 +484,13 @@ INSERT 0 1 INSERT INTO multiple_hash VALUES ('2', '2'); INSERT 0 1 INSERT INTO multiple_hash VALUES ('2', '3'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *; + category | data +----------+------ + 2 | 3 +(1 row) + INSERT 0 1 -- check that update return the right number of rows -- one row @@ -403,14 +498,24 @@ UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; UPDATE 1 -- three rows UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +UPDATE 3 +-- three rows, with RETURNING +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category; + category +---------- + 1 + 1 + 1 +(3 rows) + UPDATE 3 -- check SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; - category | data -----------+------- - 1 | 1-1-2 - 1 | 2-2 - 1 | 3-2 + category | data +----------+--------- + 1 | 1-1-2-2 + 1 | 2-2-2 + 1 | 3-2-2 (3 rows) -- check that deletes return the right number of rows @@ -419,8 +524,23 @@ DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; DELETE 1 -- two rows DELETE FROM multiple_hash WHERE category = '2'; -DELETE 2 +DELETE 3 +-- three rows, with RETURNING +DELETE FROM multiple_hash WHERE category = '1' RETURNING category; + category +---------- + 1 + 1 + 1 +(3 rows) + +DELETE 3 -- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + category | data +----------+------ +(0 rows) + SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; category | data ----------+------ diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index ec9597171..3b3ff22c1 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -105,6 +105,23 @@ SELECT * FROM upsert_test; 1 | 5 | 872 (1 row) +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + part_key | other_col | third_col +----------+-----------+----------- + 2 | 2 | +(1 row) + +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + part_key | other_col | third_col +----------+-----------+----------- + 2 | 3 | +(1 row) + -- create another table CREATE TABLE upsert_test_2 ( diff --git a/src/test/regress/expected/multi_upsert_0.out b/src/test/regress/expected/multi_upsert_0.out index 09b052c1c..eb653cf3d 100644 --- a/src/test/regress/expected/multi_upsert_0.out +++ b/src/test/regress/expected/multi_upsert_0.out @@ -138,6 +138,19 @@ SELECT * FROM upsert_test; 1 | 1 | (1 row) +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; +ERROR: syntax error at or near "ON" +LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + ^ +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; +ERROR: syntax error at or near "ON" +LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + ^ -- create another table CREATE TABLE upsert_test_2 ( diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 935a9228a..0ff747cb7 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -69,6 +69,9 @@ INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 32743; +-- basic single-row INSERT with RETURNING +INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *; + -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -118,10 +121,16 @@ INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT); -- INSERT violating column constraint INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); - -- INSERT violating primary key constraint INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); +-- INSERT violating primary key constraint, with RETURNING specified. +INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; + +-- INSERT, with RETURNING specified, failing with a non-constraint error +INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; + + SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported @@ -154,6 +163,10 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; DELETE FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246; +-- test simple DELETE with RETURNING +DELETE FROM limit_orders WHERE id = 430 RETURNING *; +SELECT COUNT(*) FROM limit_orders WHERE id = 430; + -- DELETE with expression in WHERE clause INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -183,14 +196,23 @@ INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell' UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; SELECT symbol FROM limit_orders WHERE id = 246; +-- simple UPDATE with RETURNING +UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; + -- expression UPDATE UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; SELECT bidder_id FROM limit_orders WHERE id = 246; +-- expression UPDATE with RETURNING +UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *; + -- multi-column UPDATE UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; +-- multi-column UPDATE with RETURNING +UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *; + -- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); @@ -285,12 +307,28 @@ UPDATE limit_orders SET symbol = LOWER(symbol) WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; +-- IMMUTABLE functions are allowed -- even in returning +UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; + -- updates referencing non-IMMUTABLE functions are unsupported UPDATE limit_orders SET placed_at = now() WHERE id = 246; +-- even in RETURNING +UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); + -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; +-- check that multi-row UPDATE/DELETEs with RETURNING work +INSERT INTO multiple_hash VALUES ('0', '1'); +INSERT INTO multiple_hash VALUES ('0', '2'); +INSERT INTO multiple_hash VALUES ('0', '3'); +INSERT INTO multiple_hash VALUES ('0', '4'); +INSERT INTO multiple_hash VALUES ('0', '5'); +INSERT INTO multiple_hash VALUES ('0', '6'); + +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *; +DELETE FROM multiple_hash WHERE category = '0' RETURNING *; -- ensure returned row counters are correct \set QUIET off @@ -300,12 +338,15 @@ INSERT INTO multiple_hash VALUES ('1', '3'); INSERT INTO multiple_hash VALUES ('2', '1'); INSERT INTO multiple_hash VALUES ('2', '2'); INSERT INTO multiple_hash VALUES ('2', '3'); +INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *; -- check that update return the right number of rows -- one row UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; -- three rows UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +-- three rows, with RETURNING +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category; -- check SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; @@ -314,5 +355,8 @@ SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; -- two rows DELETE FROM multiple_hash WHERE category = '2'; +-- three rows, with RETURNING +DELETE FROM multiple_hash WHERE category = '1' RETURNING category; -- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; diff --git a/src/test/regress/sql/multi_upsert.sql b/src/test/regress/sql/multi_upsert.sql index 394ef0bbc..0d8de95f6 100644 --- a/src/test/regress/sql/multi_upsert.sql +++ b/src/test/regress/sql/multi_upsert.sql @@ -85,6 +85,15 @@ INSERT INTO upsert_test as ups_test (part_key, other_col) VALUES (1, 1) ON CONFL -- see the results SELECT * FROM upsert_test; +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + -- create another table CREATE TABLE upsert_test_2 (