diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 181e9f524..e831e4583 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -59,9 +59,9 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) { Task *task = NULL; List *taskList = workerJob->taskList; + TupleDesc tupleDescriptor = ExecCleanTypeFromTL( + planStatement->planTree->targetlist, false); List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList; - List *workerTargetList = multiPlan->workerJob->jobQuery->targetList; - TupleDesc tupleDescriptor = ExecCleanTypeFromTL(workerTargetList, false); /* router executor can only execute distributed plans with a single task */ Assert(list_length(taskList) == 1); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index e1e3ac205..9e643b00c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -42,14 +42,17 @@ bool AllModificationsCommutative = false; static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); -static uint64 ExecuteDistributedModify(Task *task); -static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, - Task *task, EState *executorState, - TupleDesc tupleDescriptor, - DestReceiver *destination); +static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, + Task *task, + bool isModificationQuery, + bool expectResults); +static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, + Tuplestorestate *tupleStore); static bool SendQueryInSingleRowMode(PGconn *connection, char *query); -static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); +static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows); +static bool ConsumeQueryResult(PGconn *connection, int64 *rows); /* @@ -181,6 +184,9 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas EState *estate = queryDesc->estate; CmdType operation = queryDesc->operation; MemoryContext oldcontext = NULL; + DestReceiver *destination = queryDesc->dest; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; Assert(estate != NULL); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); @@ -201,24 +207,72 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) - { - uint64 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_SELECT) - { - DestReceiver *destination = queryDesc->dest; - TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + estate->es_processed = 0; - ExecuteSingleShardSelect(queryDesc, count, task, estate, - resultTupleDescriptor, destination); - } - else + /* startup the tuple receiver */ + if (sendTuples) { - ereport(ERROR, (errmsg("unrecognized operation code: %d", - (int) operation))); + (*destination->rStartup)(destination, operation, queryDesc->tupDesc); + } + + /* + * If query has not yet been executed, do so now. The main reason why the + * query might already have been executed is cursors. + */ + if (!routerState->eof_underlying) + { + bool resultsOK = false; + bool isModificationQuery = false; + + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE) + { + isModificationQuery = true; + } + else if (operation != CMD_SELECT) + { + ereport(ERROR, (errmsg("unrecognized operation code: %d", + (int) operation))); + } + + resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, + isModificationQuery, + sendTuples); + if (!resultsOK) + { + ereport(ERROR, (errmsg("could not receive query results"))); + } + + /* mark underlying query as having executed */ + routerState->eof_underlying = true; + } + + /* if the underlying query produced output, return it */ + if (routerState->tuplestorestate != NULL) + { + TupleDesc resultTupleDescriptor = queryDesc->tupDesc; + int64 returnedRows = 0; + + /* return rows from the tuplestore */ + returnedRows = ReturnRowsFromTuplestore(count, resultTupleDescriptor, + destination, + routerState->tuplestorestate); + + /* + * Count tuples processed, if this is a SELECT. (For modifications + * it'll already have been increased, as we want the number of + * modified tuples, not the number of RETURNed tuples.) + */ + if (operation == CMD_SELECT) + { + estate->es_processed += returnedRows; + } + } + + /* shutdown tuple receiver, if we started it */ + if (sendTuples) + { + (*destination->rShutdown)(destination); } if (queryDesc->totaltime != NULL) @@ -231,153 +285,159 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas /* - * ExecuteDistributedModify is the main entry point for modifying distributed - * tables. A distributed modification is successful if any placement of the - * distributed table is successful. ExecuteDistributedModify returns the number - * of modified rows in that case and errors in all others. This function will - * also generate warnings for individual placement failures. + * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves + * the results and stores them, if SELECT or RETURNING is used, in a tuple + * store. + * + * If the task fails on one of the placements, the function retries it on + * other placements (SELECT), reraises the remote error (constraint violation + * in DML), marks the affected placement as invalid (DML on some placement + * failed), or errors out (DML failed on all placements). */ -static uint64 -ExecuteDistributedModify(Task *task) +static bool +ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, + bool isModificationQuery, + bool expectResults) { - int64 affectedTupleCount = -1; + TupleDesc tupleDescriptor = queryDesc->tupDesc; + EState *executorState = queryDesc->estate; + MaterialState *routerState = (MaterialState *) queryDesc->planstate; + bool resultsOK = false; + List *taskPlacementList = task->taskPlacementList; ListCell *taskPlacementCell = NULL; List *failedPlacementList = NIL; ListCell *failedPlacementCell = NULL; + int64 affectedTupleCount = -1; + bool gotResults = false; - foreach(taskPlacementCell, task->taskPlacementList) + /* + * Try to run the query to completion on one placement. If the query fails + * attempt the query on the next placement. + */ + foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); char *nodeName = taskPlacement->nodeName; int32 nodePort = taskPlacement->nodePort; + bool queryOK = false; + int64 currentAffectedTupleCount = 0; + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - PGconn *connection = NULL; - PGresult *result = NULL; - char *currentAffectedTupleString = NULL; - int64 currentAffectedTupleCount = -1; - - Assert(taskPlacement->shardState == FILE_FINALIZED); - - connection = GetOrEstablishConnection(nodeName, nodePort); if (connection == NULL) { failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - result = PQexec(connection, task->queryString); - if (PQresultStatus(result) != PGRES_COMMAND_OK) + queryOK = SendQueryInSingleRowMode(connection, task->queryString); + if (!queryOK) { - char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); - int category = 0; - bool raiseError = false; - - /* - * If the error code is in constraint violation class, we want to - * fail fast because we must get the same error from all shard - * placements. - */ - category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); - raiseError = SqlStateMatchesCategory(sqlStateString, category); - - if (raiseError) - { - ReraiseRemoteError(connection, result); - } - else - { - WarnRemoteError(connection, result); - } - PQclear(result); - + PurgeConnection(connection); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } - currentAffectedTupleString = PQcmdTuples(result); - - /* could throw error if input > MAX_INT64 */ - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); - Assert(currentAffectedTupleCount >= 0); - -#if (PG_VERSION_NUM < 90600) - - /* before 9.6, PostgreSQL used a uint32 for this field, so check */ - Assert(currentAffectedTupleCount <= 0xFFFFFFFF); -#endif - - if ((affectedTupleCount == -1) || - (affectedTupleCount == currentAffectedTupleCount)) + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (!gotResults && expectResults) { - affectedTupleCount = currentAffectedTupleCount; + queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, + ¤tAffectedTupleCount); } else { - ereport(WARNING, - (errmsg("modified " INT64_FORMAT " tuples, but expected to modify " - INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount), - errdetail("modified placement on %s:%d", nodeName, nodePort))); + queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount); } - PQclear(result); + if (queryOK) + { + if ((affectedTupleCount == -1) || + (affectedTupleCount == currentAffectedTupleCount)) + { + affectedTupleCount = currentAffectedTupleCount; + } + else + { + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples, but expected " + "to modify "INT64_FORMAT, + currentAffectedTupleCount, affectedTupleCount), + errdetail("modified placement on %s:%d", + nodeName, nodePort))); + } + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + + resultsOK = true; + gotResults = true; + + /* + * Modifications have to be executed on all placements, but for + * read queries we can stop here. + */ + if (!isModificationQuery) + { + break; + } + } + else + { + PurgeConnection(connection); + + failedPlacementList = lappend(failedPlacementList, taskPlacement); + + continue; + } } - /* if all placements failed, error out */ - if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + if (isModificationQuery) { - ereport(ERROR, (errmsg("could not modify any active placements"))); + /* if all placements failed, error out */ + if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) + { + ereport(ERROR, (errmsg("could not modify any active placements"))); + } + + /* otherwise, mark failed placements as inactive: they're stale */ + foreach(failedPlacementCell, failedPlacementList) + { + ShardPlacement *failedPlacement = + (ShardPlacement *) lfirst(failedPlacementCell); + uint64 shardLength = 0; + + DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, + failedPlacement->nodePort); + InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + failedPlacement->nodeName, failedPlacement->nodePort); + } + + executorState->es_processed = affectedTupleCount; } - /* otherwise, mark failed placements as inactive: they're stale */ - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 shardLength = 0; - - DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, - failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, - failedPlacement->nodeName, failedPlacement->nodePort); - } - - return (uint64) affectedTupleCount; + return resultsOK; } /* - * ExecuteSingleShardSelect executes, if not done already, the remote select query and - * sends the resulting tuples to the given destination receiver. If the query fails on a - * given placement, the function attempts it on its replica. + * ReturnRowsFromTuplestore moves rows from a given tuplestore into a + * receiver. It performs the necessary limiting to support cursors. */ -static void -ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, - EState *executorState, TupleDesc tupleDescriptor, - DestReceiver *destination) +static uint64 +ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, + DestReceiver *destination, Tuplestorestate *tupleStore) { - bool resultsOK = false; TupleTableSlot *tupleTableSlot = NULL; - MaterialState *routerState = (MaterialState *) queryDesc->planstate; - Tuplestorestate *tupleStore = routerState->tuplestorestate; uint64 currentTupleCount = 0; - /* initialize tuplestore for the first call */ - if (routerState->tuplestorestate == NULL) - { - routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); - tupleStore = routerState->tuplestorestate; - - resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore); - if (!resultsOK) - { - ereport(ERROR, (errmsg("could not receive query results"))); - } - } - tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); - /* startup the tuple receiver */ - (*destination->rStartup)(destination, CMD_SELECT, tupleDescriptor); - /* iterate over tuples in tuple store, and send them to destination */ for (;;) { @@ -388,7 +448,6 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } (*destination->receiveSlot)(tupleTableSlot, destination); - executorState->es_processed++; ExecClearTuple(tupleTableSlot); @@ -404,65 +463,9 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task, } } - /* shutdown the tuple receiver */ - (*destination->rShutdown)(destination); - ExecDropSingleTupleTableSlot(tupleTableSlot); -} - -/* - * ExecuteTaskAndStoreResults executes the task on the remote node, retrieves - * the results and stores them in the given tuple store. If the task fails on - * one of the placements, the function retries it on other placements. - */ -bool -ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) -{ - bool resultsOK = false; - List *taskPlacementList = task->taskPlacementList; - ListCell *taskPlacementCell = NULL; - - /* - * Try to run the query to completion on one placement. If the query fails - * attempt the query on the next placement. - */ - foreach(taskPlacementCell, taskPlacementList) - { - ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - char *nodeName = taskPlacement->nodeName; - int32 nodePort = taskPlacement->nodePort; - bool queryOK = false; - bool storedOK = false; - - PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); - if (connection == NULL) - { - continue; - } - - queryOK = SendQueryInSingleRowMode(connection, task->queryString); - if (!queryOK) - { - PurgeConnection(connection); - continue; - } - - storedOK = StoreQueryResult(connection, tupleDescriptor, tupleStore); - if (storedOK) - { - resultsOK = true; - break; - } - else - { - tuplestore_clear(tupleStore); - PurgeConnection(connection); - } - } - - return resultsOK; + return currentTupleCount; } @@ -497,25 +500,38 @@ SendQueryInSingleRowMode(PGconn *connection, char *query) /* * StoreQueryResult gets the query results from the given connection, builds - * tuples from the results and stores them in the given tuple-store. If the - * function can't receive query results, it returns false. Note that this - * function assumes the query has already been sent on the connection and the - * tuplestore has earlier been initialized. + * tuples from the results, and stores them in the a newly created + * tuple-store. If the function can't receive query results, it returns + * false. Note that this function assumes the query has already been sent on + * the connection. */ static bool -StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore) +StoreQueryResult(MaterialState *routerState, PGconn *connection, + TupleDesc tupleDescriptor, int64 *rows) { AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); + Tuplestorestate *tupleStore = NULL; uint32 expectedColumnCount = tupleDescriptor->natts; char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); + bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + *rows = 0; - Assert(tupleStore != NULL); + if (routerState->tuplestorestate == NULL) + { + routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + } + else + { + /* might have failed query execution on another placement before */ + tuplestore_clear(routerState->tuplestorestate); + } + + tupleStore = routerState->tuplestorestate; for (;;) { @@ -534,10 +550,33 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, resultStatus = PQresultStatus(result); if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK)) { - WarnRemoteError(connection, result); + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); - return false; + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; } rowCount = PQntuples(result); @@ -575,6 +614,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, tuplestore_puttuple(tupleStore, heapTuple); MemoryContextReset(ioContext); + (*rows)++; } PQclear(result); @@ -582,7 +622,98 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor, pfree(columnArray); - return true; + return !commandFailed; +} + + +/* + * ConsumeQueryResult gets a query result from a connection, counting the rows + * and checking for errors, but otherwise discarding potentially returned + * rows. Returns true if a non-error result has been returned, false if there + * has been an error. + */ +static bool +ConsumeQueryResult(PGconn *connection, int64 *rows) +{ + bool commandFailed = false; + bool gotResponse = false; + + *rows = 0; + + /* + * Due to single row mode we have to do multiple PQgetResult() to finish + * processing of this query, even without RETURNING. For single-row mode + * we have to loop until all rows are consumed. + */ + while (true) + { + PGresult *result = PQgetResult(connection); + ExecStatusType status = PGRES_COMMAND_OK; + + if (result == NULL) + { + break; + } + + status = PQresultStatus(result); + + if (status != PGRES_COMMAND_OK && + status != PGRES_SINGLE_TUPLE && + status != PGRES_TUPLES_OK) + { + char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); + int category = 0; + bool raiseError = false; + + /* + * If the error code is in constraint violation class, we want to + * fail fast because we must get the same error from all shard + * placements. + */ + category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); + raiseError = SqlStateMatchesCategory(sqlStateString, category); + + if (raiseError) + { + ReraiseRemoteError(connection, result); + } + else + { + WarnRemoteError(connection, result); + } + PQclear(result); + + commandFailed = true; + + /* continue, there could be other lingering results due to row mode */ + continue; + } + + if (status == PGRES_COMMAND_OK) + { + char *currentAffectedTupleString = PQcmdTuples(result); + int64 currentAffectedTupleCount = 0; + + scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + Assert(currentAffectedTupleCount >= 0); + +#if (PG_VERSION_NUM < 90600) + + /* before 9.6, PostgreSQL used a uint32 for this field, so check */ + Assert(currentAffectedTupleCount <= 0xFFFFFFFF); +#endif + *rows += currentAffectedTupleCount; + } + else + { + *rows += PQntuples(result); + } + + PQclear(result); + gotResponse = true; + } + + return gotResponse && !commandFailed; } diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 0ca5afc52..48d82e66e 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -116,6 +116,14 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) ErrorIfModifyQueryNotSupported(modifyQuery); + /* reject queries with a returning list */ + if (list_length(modifyQuery->returningList) > 0) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("master_modify_multiple_shards() does not support RETURNING"))); + } + shardIntervalList = LoadShardIntervalList(relationId); restrictClauseList = WhereClauseList(modifyQuery->jointree); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 511c6dc8d..87c6f4185 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -25,6 +25,8 @@ #include "executor/executor.h" +#include "nodes/makefuncs.h" + #include "optimizer/planner.h" #include "utils/memutils.h" @@ -183,6 +185,9 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan) fauxFunctionScan = makeNode(FunctionScan); fauxFunctionScan->functions = lappend(fauxFunctionScan->functions, fauxFunction); + /* copy original targetlist, accessed for RETURNING queries */ + fauxFunctionScan->scan.plan.targetlist = copyObject(result->planTree->targetlist); + /* * Add set returning function to target list if the original (postgres * created) plan doesn't support backward scans; doing so prevents @@ -196,11 +201,17 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan) if (!ExecSupportsBackwardScan(result->planTree)) { FuncExpr *funcExpr = makeNode(FuncExpr); + TargetEntry *targetEntry = NULL; + bool resjunkAttribute = true; + funcExpr->funcretset = true; + targetEntry = makeTargetEntry((Expr *) funcExpr, InvalidAttrNumber, NULL, + resjunkAttribute); + fauxFunctionScan->scan.plan.targetlist = lappend(fauxFunctionScan->scan.plan.targetlist, - funcExpr); + targetEntry); } result->planTree = (Plan *) fauxFunctionScan; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index cf12fefa0..5f76b3194 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -255,16 +255,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) "supported."))); } - /* reject queries with a returning list */ - if (list_length(queryTree->returningList) > 0) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning for the given" - " modification"), - errdetail("RETURNING clauses are not supported in distributed " - "modifications."))); - } - if (commandType == CMD_INSERT || commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -298,6 +288,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) { hasNonConstQualExprs = true; } + + if (contain_mutable_functions((Node *) queryTree->returningList)) + { + hasNonConstTargetEntryExprs = true; + } } #if (PG_VERSION_NUM >= 90500) diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b2400071e..f18af7686 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -18,8 +18,6 @@ extern bool AllModificationsCommutative; extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Task *task); -extern bool ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore); extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc); diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 33d64ae68..f3087ed9b 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -12,6 +12,10 @@ CREATE TABLE limit_orders ( kind order_side NOT NULL, limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) ); +CREATE TABLE multiple_hash ( + category text NOT NULL, + data text NOT NULL +); CREATE TABLE insufficient_shards ( LIKE limit_orders ); CREATE TABLE range_partitioned ( LIKE limit_orders ); CREATE TABLE append_partitioned ( LIKE limit_orders ); @@ -21,6 +25,12 @@ SELECT master_create_distributed_table('limit_orders', 'id', 'hash'); (1 row) +SELECT master_create_distributed_table('multiple_hash', 'category', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash'); master_create_distributed_table --------------------------------- @@ -45,6 +55,12 @@ SELECT master_create_worker_shards('limit_orders', 2, 2); (1 row) +SELECT master_create_worker_shards('multiple_hash', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + -- make a single shard that covers no partition values SELECT master_create_worker_shards('insufficient_shards', 1, 1); master_create_worker_shards @@ -81,6 +97,13 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 32743; 1 (1 row) +-- basic single-row INSERT with RETURNING +INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-------+--------+-----------+--------------------------+------+------------- + 32744 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 +(1 row) + -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -95,7 +118,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 SET client_min_messages TO 'DEBUG2'; SET citus.task_executor_type TO 'real-time'; SELECT * FROM range_partitioned WHERE id = 32743; -DEBUG: predicate pruning for shardId 750004 +DEBUG: predicate pruning for shardId 750006 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price -------+--------+-----------+--------------------------+------+------------- @@ -103,7 +126,7 @@ DEBUG: Plan is router executable (1 row) SELECT * FROM append_partitioned WHERE id = 414123; -DEBUG: predicate pruning for shardId 750006 +DEBUG: predicate pruning for shardId 750008 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- @@ -157,6 +180,14 @@ INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" DETAIL: Key (id)=(32743) already exists. CONTEXT: while executing command on localhost:57638 +-- INSERT violating primary key constraint, with RETURNING specified. +INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; +ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" +DETAIL: Key (id)=(32743) already exists. +CONTEXT: while executing command on localhost:57638 +-- INSERT, with RETURNING specified, failing with a non-constraint error +INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; +ERROR: could not modify any active placements SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', @@ -179,11 +210,6 @@ DETAIL: Multi-row INSERTs to distributed tables are not supported. INSERT INTO limit_orders SELECT * FROM limit_orders; ERROR: cannot perform distributed planning for the given modifications DETAIL: Subqueries are not supported in distributed modifications. --- commands with a RETURNING clause are unsupported -INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00) - RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; @@ -204,6 +230,19 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; 0 (1 row) +-- test simple DELETE with RETURNING +DELETE FROM limit_orders WHERE id = 430 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+----------------- + 430 | IBM | 214 | Tue Jan 28 15:31:17 2003 | buy | 1.4142135623731 +(1 row) + +SELECT COUNT(*) FROM limit_orders WHERE id = 430; + count +------- + 0 +(1 row) + -- DELETE with expression in WHERE clause INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -228,10 +267,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations --- commands with a RETURNING clause are unsupported -DELETE FROM limit_orders WHERE id = 246 RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -249,6 +284,13 @@ SELECT symbol FROM limit_orders WHERE id = 246; GM (1 row) +-- simple UPDATE with RETURNING +UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 162 | Mon Jul 02 16:32:15 2007 | sell | 20.69 +(1 row) + -- expression UPDATE UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; SELECT bidder_id FROM limit_orders WHERE id = 246; @@ -257,6 +299,13 @@ SELECT bidder_id FROM limit_orders WHERE id = 246; 18 (1 row) +-- expression UPDATE with RETURNING +UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | sell | 20.69 +(1 row) + -- multi-column UPDATE UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; @@ -265,6 +314,13 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246; buy | 0.00 (1 row) +-- multi-column UPDATE with RETURNING +UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *; + id | symbol | bidder_id | placed_at | kind | limit_price +-----+--------+-----------+--------------------------+------+------------- + 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | buy | 999 +(1 row) + -- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); @@ -347,10 +403,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations --- commands with a RETURNING clause are unsupported -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; @@ -359,7 +411,7 @@ DETAIL: Common table expressions are not supported in distributed modifications SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- - GM | 18 + GM | 30 (1 row) -- updates referencing just a var are supported @@ -374,9 +426,123 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; gm | 247 (1 row) +-- IMMUTABLE functions are allowed -- even in returning +UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; + id | lower | symbol +-----+-------+-------- + 246 | gm | GM +(1 row) + -- updates referencing non-IMMUTABLE functions are unsupported UPDATE limit_orders SET placed_at = now() WHERE id = 246; ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE +-- even in RETURNING +UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); +ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard +-- check that multi-row UPDATE/DELETEs with RETURNING work +INSERT INTO multiple_hash VALUES ('0', '1'); +INSERT INTO multiple_hash VALUES ('0', '2'); +INSERT INTO multiple_hash VALUES ('0', '3'); +INSERT INTO multiple_hash VALUES ('0', '4'); +INSERT INTO multiple_hash VALUES ('0', '5'); +INSERT INTO multiple_hash VALUES ('0', '6'); +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *; + category | data +----------+------ + 0 | 1-1 + 0 | 2-1 + 0 | 3-1 + 0 | 4-1 + 0 | 5-1 + 0 | 6-1 +(6 rows) + +DELETE FROM multiple_hash WHERE category = '0' RETURNING *; + category | data +----------+------ + 0 | 1-1 + 0 | 2-1 + 0 | 3-1 + 0 | 4-1 + 0 | 5-1 + 0 | 6-1 +(6 rows) + +-- ensure returned row counters are correct +\set QUIET off +INSERT INTO multiple_hash VALUES ('1', '1'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('1', '2'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('1', '3'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '1'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '2'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '3'); +INSERT 0 1 +INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *; + category | data +----------+------ + 2 | 3 +(1 row) + +INSERT 0 1 +-- check that update return the right number of rows +-- one row +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; +UPDATE 1 +-- three rows +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +UPDATE 3 +-- three rows, with RETURNING +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category; + category +---------- + 1 + 1 + 1 +(3 rows) + +UPDATE 3 +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + category | data +----------+--------- + 1 | 1-1-2-2 + 1 | 2-2-2 + 1 | 3-2-2 +(3 rows) + +-- check that deletes return the right number of rows +-- one row +DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; +DELETE 1 +-- two rows +DELETE FROM multiple_hash WHERE category = '2'; +DELETE 3 +-- three rows, with RETURNING +DELETE FROM multiple_hash WHERE category = '1' RETURNING category; + category +---------- + 1 + 1 + 1 +(3 rows) + +DELETE 3 +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + category | data +----------+------ +(0 rows) + +SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; + category | data +----------+------ +(0 rows) + diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 59a815659..0d4e389d6 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -62,8 +62,7 @@ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. +ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test'); ERROR: cannot perform distributed planning for the given modification @@ -205,8 +204,7 @@ ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. -- commands with a RETURNING clause are unsupported SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *'); -ERROR: cannot perform distributed planning for the given modification -DETAIL: RETURNING clauses are not supported in distributed modifications. +ERROR: master_modify_multiple_shards() does not support RETURNING -- commands containing a CTE are unsupported SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' '); ERROR: cannot perform distributed planning for the given modification diff --git a/src/test/regress/expected/multi_upsert.out b/src/test/regress/expected/multi_upsert.out index ec9597171..3b3ff22c1 100644 --- a/src/test/regress/expected/multi_upsert.out +++ b/src/test/regress/expected/multi_upsert.out @@ -105,6 +105,23 @@ SELECT * FROM upsert_test; 1 | 5 | 872 (1 row) +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + part_key | other_col | third_col +----------+-----------+----------- + 2 | 2 | +(1 row) + +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + part_key | other_col | third_col +----------+-----------+----------- + 2 | 3 | +(1 row) + -- create another table CREATE TABLE upsert_test_2 ( diff --git a/src/test/regress/expected/multi_upsert_0.out b/src/test/regress/expected/multi_upsert_0.out index 09b052c1c..eb653cf3d 100644 --- a/src/test/regress/expected/multi_upsert_0.out +++ b/src/test/regress/expected/multi_upsert_0.out @@ -138,6 +138,19 @@ SELECT * FROM upsert_test; 1 | 1 | (1 row) +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; +ERROR: syntax error at or near "ON" +LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + ^ +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; +ERROR: syntax error at or near "ON" +LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + ^ -- create another table CREATE TABLE upsert_test_2 ( diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 2adf977bd..0ff747cb7 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -18,16 +18,24 @@ CREATE TABLE limit_orders ( limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) ); + +CREATE TABLE multiple_hash ( + category text NOT NULL, + data text NOT NULL +); + CREATE TABLE insufficient_shards ( LIKE limit_orders ); CREATE TABLE range_partitioned ( LIKE limit_orders ); CREATE TABLE append_partitioned ( LIKE limit_orders ); SELECT master_create_distributed_table('limit_orders', 'id', 'hash'); +SELECT master_create_distributed_table('multiple_hash', 'category', 'hash'); SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash'); SELECT master_create_distributed_table('range_partitioned', 'id', 'range'); SELECT master_create_distributed_table('append_partitioned', 'id', 'append'); SELECT master_create_worker_shards('limit_orders', 2, 2); +SELECT master_create_worker_shards('multiple_hash', 2, 2); -- make a single shard that covers no partition values SELECT master_create_worker_shards('insufficient_shards', 1, 1); @@ -61,6 +69,9 @@ INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 32743; +-- basic single-row INSERT with RETURNING +INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *; + -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -110,10 +121,16 @@ INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT); -- INSERT violating column constraint INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); - -- INSERT violating primary key constraint INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); +-- INSERT violating primary key constraint, with RETURNING specified. +INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; + +-- INSERT, with RETURNING specified, failing with a non-constraint error +INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; + + SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported @@ -135,10 +152,6 @@ INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); -- INSERT ... SELECT ... FROM commands are unsupported INSERT INTO limit_orders SELECT * FROM limit_orders; --- commands with a RETURNING clause are unsupported -INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00) - RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; @@ -150,6 +163,10 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246; DELETE FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246; +-- test simple DELETE with RETURNING +DELETE FROM limit_orders WHERE id = 430 RETURNING *; +SELECT COUNT(*) FROM limit_orders WHERE id = 430; + -- DELETE with expression in WHERE clause INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; @@ -166,9 +183,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands with a RETURNING clause are unsupported -DELETE FROM limit_orders WHERE id = 246 RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; @@ -182,14 +196,23 @@ INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell' UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; SELECT symbol FROM limit_orders WHERE id = 246; +-- simple UPDATE with RETURNING +UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; + -- expression UPDATE UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; SELECT bidder_id FROM limit_orders WHERE id = 246; +-- expression UPDATE with RETURNING +UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *; + -- multi-column UPDATE UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; +-- multi-column UPDATE with RETURNING +UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *; + -- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); @@ -267,9 +290,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; --- commands with a RETURNING clause are unsupported -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; - -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; @@ -287,8 +307,56 @@ UPDATE limit_orders SET symbol = LOWER(symbol) WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; +-- IMMUTABLE functions are allowed -- even in returning +UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; + -- updates referencing non-IMMUTABLE functions are unsupported UPDATE limit_orders SET placed_at = now() WHERE id = 246; +-- even in RETURNING +UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); + -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; + +-- check that multi-row UPDATE/DELETEs with RETURNING work +INSERT INTO multiple_hash VALUES ('0', '1'); +INSERT INTO multiple_hash VALUES ('0', '2'); +INSERT INTO multiple_hash VALUES ('0', '3'); +INSERT INTO multiple_hash VALUES ('0', '4'); +INSERT INTO multiple_hash VALUES ('0', '5'); +INSERT INTO multiple_hash VALUES ('0', '6'); + +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *; +DELETE FROM multiple_hash WHERE category = '0' RETURNING *; + +-- ensure returned row counters are correct +\set QUIET off +INSERT INTO multiple_hash VALUES ('1', '1'); +INSERT INTO multiple_hash VALUES ('1', '2'); +INSERT INTO multiple_hash VALUES ('1', '3'); +INSERT INTO multiple_hash VALUES ('2', '1'); +INSERT INTO multiple_hash VALUES ('2', '2'); +INSERT INTO multiple_hash VALUES ('2', '3'); +INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *; + +-- check that update return the right number of rows +-- one row +UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; +-- three rows +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; +-- three rows, with RETURNING +UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category; +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; + +-- check that deletes return the right number of rows +-- one row +DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; +-- two rows +DELETE FROM multiple_hash WHERE category = '2'; +-- three rows, with RETURNING +DELETE FROM multiple_hash WHERE category = '1' RETURNING category; +-- check +SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; +SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; diff --git a/src/test/regress/sql/multi_upsert.sql b/src/test/regress/sql/multi_upsert.sql index 394ef0bbc..0d8de95f6 100644 --- a/src/test/regress/sql/multi_upsert.sql +++ b/src/test/regress/sql/multi_upsert.sql @@ -85,6 +85,15 @@ INSERT INTO upsert_test as ups_test (part_key, other_col) VALUES (1, 1) ON CONFL -- see the results SELECT * FROM upsert_test; +-- Test upsert, with returning: +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + +INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2) + ON CONFLICT (part_key) DO UPDATE SET other_col = 3 + RETURNING *; + -- create another table CREATE TABLE upsert_test_2 (