Merge pull request #578 from citusdata/feature/returning

Support RETURNING
pull/639/head
Andres Freund 2016-07-01 13:15:36 -07:00 committed by GitHub
commit 18236b18b5
12 changed files with 654 additions and 240 deletions

View File

@ -59,9 +59,9 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
Task *task = NULL;
List *taskList = workerJob->taskList;
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(
planStatement->planTree->targetlist, false);
List *dependendJobList PG_USED_FOR_ASSERTS_ONLY = workerJob->dependedJobList;
List *workerTargetList = multiPlan->workerJob->jobQuery->targetList;
TupleDesc tupleDescriptor = ExecCleanTypeFromTL(workerTargetList, false);
/* router executor can only execute distributed plans with a single task */
Assert(list_length(taskList) == 1);

View File

@ -42,14 +42,17 @@ bool AllModificationsCommutative = false;
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static uint64 ExecuteDistributedModify(Task *task);
static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount,
Task *task, EState *executorState,
TupleDesc tupleDescriptor,
DestReceiver *destination);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
static bool StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
Task *task,
bool isModificationQuery,
bool expectResults);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination,
Tuplestorestate *tupleStore);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query);
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows);
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
/*
@ -181,6 +184,9 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
EState *estate = queryDesc->estate;
CmdType operation = queryDesc->operation;
MemoryContext oldcontext = NULL;
DestReceiver *destination = queryDesc->dest;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
@ -201,26 +207,74 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
InstrStartNode(queryDesc->totaltime);
}
estate->es_processed = 0;
/* startup the tuple receiver */
if (sendTuples)
{
(*destination->rStartup)(destination, operation, queryDesc->tupDesc);
}
/*
* If query has not yet been executed, do so now. The main reason why the
* query might already have been executed is cursors.
*/
if (!routerState->eof_underlying)
{
bool resultsOK = false;
bool isModificationQuery = false;
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
operation == CMD_DELETE)
{
uint64 affectedRowCount = ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount;
isModificationQuery = true;
}
else if (operation == CMD_SELECT)
{
DestReceiver *destination = queryDesc->dest;
TupleDesc resultTupleDescriptor = queryDesc->tupDesc;
ExecuteSingleShardSelect(queryDesc, count, task, estate,
resultTupleDescriptor, destination);
}
else
else if (operation != CMD_SELECT)
{
ereport(ERROR, (errmsg("unrecognized operation code: %d",
(int) operation)));
}
resultsOK = ExecuteTaskAndStoreResults(queryDesc, task,
isModificationQuery,
sendTuples);
if (!resultsOK)
{
ereport(ERROR, (errmsg("could not receive query results")));
}
/* mark underlying query as having executed */
routerState->eof_underlying = true;
}
/* if the underlying query produced output, return it */
if (routerState->tuplestorestate != NULL)
{
TupleDesc resultTupleDescriptor = queryDesc->tupDesc;
int64 returnedRows = 0;
/* return rows from the tuplestore */
returnedRows = ReturnRowsFromTuplestore(count, resultTupleDescriptor,
destination,
routerState->tuplestorestate);
/*
* Count tuples processed, if this is a SELECT. (For modifications
* it'll already have been increased, as we want the number of
* modified tuples, not the number of RETURNed tuples.)
*/
if (operation == CMD_SELECT)
{
estate->es_processed += returnedRows;
}
}
/* shutdown tuple receiver, if we started it */
if (sendTuples)
{
(*destination->rShutdown)(destination);
}
if (queryDesc->totaltime != NULL)
{
InstrStopNode(queryDesc->totaltime, estate->es_processed);
@ -231,81 +285,75 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
/*
* ExecuteDistributedModify is the main entry point for modifying distributed
* tables. A distributed modification is successful if any placement of the
* distributed table is successful. ExecuteDistributedModify returns the number
* of modified rows in that case and errors in all others. This function will
* also generate warnings for individual placement failures.
* ExecuteTaskAndStoreResults executes the task on the remote node, retrieves
* the results and stores them, if SELECT or RETURNING is used, in a tuple
* store.
*
* If the task fails on one of the placements, the function retries it on
* other placements (SELECT), reraises the remote error (constraint violation
* in DML), marks the affected placement as invalid (DML on some placement
* failed), or errors out (DML failed on all placements).
*/
static uint64
ExecuteDistributedModify(Task *task)
static bool
ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool isModificationQuery,
bool expectResults)
{
int64 affectedTupleCount = -1;
TupleDesc tupleDescriptor = queryDesc->tupDesc;
EState *executorState = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
bool resultsOK = false;
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
List *failedPlacementList = NIL;
ListCell *failedPlacementCell = NULL;
int64 affectedTupleCount = -1;
bool gotResults = false;
foreach(taskPlacementCell, task->taskPlacementList)
/*
* Try to run the query to completion on one placement. If the query fails
* attempt the query on the next placement.
*/
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
char *nodeName = taskPlacement->nodeName;
int32 nodePort = taskPlacement->nodePort;
bool queryOK = false;
int64 currentAffectedTupleCount = 0;
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
PGconn *connection = NULL;
PGresult *result = NULL;
char *currentAffectedTupleString = NULL;
int64 currentAffectedTupleCount = -1;
Assert(taskPlacement->shardState == FILE_FINALIZED);
connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
result = PQexec(connection, task->queryString);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
queryOK = SendQueryInSingleRowMode(connection, task->queryString);
if (!queryOK)
{
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int category = 0;
bool raiseError = false;
/*
* If the error code is in constraint violation class, we want to
* fail fast because we must get the same error from all shard
* placements.
*/
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
raiseError = SqlStateMatchesCategory(sqlStateString, category);
if (raiseError)
{
ReraiseRemoteError(connection, result);
}
else
{
WarnRemoteError(connection, result);
}
PQclear(result);
PurgeConnection(connection);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
currentAffectedTupleString = PQcmdTuples(result);
/* could throw error if input > MAX_INT64 */
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount);
Assert(currentAffectedTupleCount >= 0);
#if (PG_VERSION_NUM < 90600)
/* before 9.6, PostgreSQL used a uint32 for this field, so check */
Assert(currentAffectedTupleCount <= 0xFFFFFFFF);
#endif
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (!gotResults && expectResults)
{
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
&currentAffectedTupleCount);
}
else
{
queryOK = ConsumeQueryResult(connection, &currentAffectedTupleCount);
}
if (queryOK)
{
if ((affectedTupleCount == -1) ||
(affectedTupleCount == currentAffectedTupleCount))
{
@ -314,14 +362,43 @@ ExecuteDistributedModify(Task *task)
else
{
ereport(WARNING,
(errmsg("modified " INT64_FORMAT " tuples, but expected to modify "
INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%d", nodeName, nodePort)));
(errmsg("modified "INT64_FORMAT " tuples, but expected "
"to modify "INT64_FORMAT,
currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%d",
nodeName, nodePort)));
}
PQclear(result);
#if (PG_VERSION_NUM < 90600)
/* before 9.6, PostgreSQL used a uint32 for this field, so check */
Assert(currentAffectedTupleCount <= 0xFFFFFFFF);
#endif
resultsOK = true;
gotResults = true;
/*
* Modifications have to be executed on all placements, but for
* read queries we can stop here.
*/
if (!isModificationQuery)
{
break;
}
}
else
{
PurgeConnection(connection);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
}
if (isModificationQuery)
{
/* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(task->taskPlacementList))
{
@ -331,7 +408,8 @@ ExecuteDistributedModify(Task *task)
/* otherwise, mark failed placements as inactive: they're stale */
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell);
ShardPlacement *failedPlacement =
(ShardPlacement *) lfirst(failedPlacementCell);
uint64 shardLength = 0;
DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName,
@ -340,44 +418,26 @@ ExecuteDistributedModify(Task *task)
failedPlacement->nodeName, failedPlacement->nodePort);
}
return (uint64) affectedTupleCount;
executorState->es_processed = affectedTupleCount;
}
return resultsOK;
}
/*
* ExecuteSingleShardSelect executes, if not done already, the remote select query and
* sends the resulting tuples to the given destination receiver. If the query fails on a
* given placement, the function attempts it on its replica.
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
* receiver. It performs the necessary limiting to support cursors.
*/
static void
ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task,
EState *executorState, TupleDesc tupleDescriptor,
DestReceiver *destination)
static uint64
ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, Tuplestorestate *tupleStore)
{
bool resultsOK = false;
TupleTableSlot *tupleTableSlot = NULL;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
Tuplestorestate *tupleStore = routerState->tuplestorestate;
uint64 currentTupleCount = 0;
/* initialize tuplestore for the first call */
if (routerState->tuplestorestate == NULL)
{
routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
tupleStore = routerState->tuplestorestate;
resultsOK = ExecuteTaskAndStoreResults(task, tupleDescriptor, tupleStore);
if (!resultsOK)
{
ereport(ERROR, (errmsg("could not receive query results")));
}
}
tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor);
/* startup the tuple receiver */
(*destination->rStartup)(destination, CMD_SELECT, tupleDescriptor);
/* iterate over tuples in tuple store, and send them to destination */
for (;;)
{
@ -388,7 +448,6 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task,
}
(*destination->receiveSlot)(tupleTableSlot, destination);
executorState->es_processed++;
ExecClearTuple(tupleTableSlot);
@ -404,65 +463,9 @@ ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, Task *task,
}
}
/* shutdown the tuple receiver */
(*destination->rShutdown)(destination);
ExecDropSingleTupleTableSlot(tupleTableSlot);
}
/*
* ExecuteTaskAndStoreResults executes the task on the remote node, retrieves
* the results and stores them in the given tuple store. If the task fails on
* one of the placements, the function retries it on other placements.
*/
bool
ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore)
{
bool resultsOK = false;
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
/*
* Try to run the query to completion on one placement. If the query fails
* attempt the query on the next placement.
*/
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
char *nodeName = taskPlacement->nodeName;
int32 nodePort = taskPlacement->nodePort;
bool queryOK = false;
bool storedOK = false;
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
continue;
}
queryOK = SendQueryInSingleRowMode(connection, task->queryString);
if (!queryOK)
{
PurgeConnection(connection);
continue;
}
storedOK = StoreQueryResult(connection, tupleDescriptor, tupleStore);
if (storedOK)
{
resultsOK = true;
break;
}
else
{
tuplestore_clear(tupleStore);
PurgeConnection(connection);
}
}
return resultsOK;
return currentTupleCount;
}
@ -497,25 +500,38 @@ SendQueryInSingleRowMode(PGconn *connection, char *query)
/*
* StoreQueryResult gets the query results from the given connection, builds
* tuples from the results and stores them in the given tuple-store. If the
* function can't receive query results, it returns false. Note that this
* function assumes the query has already been sent on the connection and the
* tuplestore has earlier been initialized.
* tuples from the results, and stores them in the a newly created
* tuple-store. If the function can't receive query results, it returns
* false. Note that this function assumes the query has already been sent on
* the connection.
*/
static bool
StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore)
StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows)
{
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
Tuplestorestate *tupleStore = NULL;
uint32 expectedColumnCount = tupleDescriptor->natts;
char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
bool commandFailed = false;
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"StoreQueryResult",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
*rows = 0;
Assert(tupleStore != NULL);
if (routerState->tuplestorestate == NULL)
{
routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
}
else
{
/* might have failed query execution on another placement before */
tuplestore_clear(routerState->tuplestorestate);
}
tupleStore = routerState->tuplestorestate;
for (;;)
{
@ -533,11 +549,34 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
resultStatus = PQresultStatus(result);
if ((resultStatus != PGRES_SINGLE_TUPLE) && (resultStatus != PGRES_TUPLES_OK))
{
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int category = 0;
bool raiseError = false;
/*
* If the error code is in constraint violation class, we want to
* fail fast because we must get the same error from all shard
* placements.
*/
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
raiseError = SqlStateMatchesCategory(sqlStateString, category);
if (raiseError)
{
ReraiseRemoteError(connection, result);
}
else
{
WarnRemoteError(connection, result);
}
PQclear(result);
return false;
commandFailed = true;
/* continue, there could be other lingering results due to row mode */
continue;
}
rowCount = PQntuples(result);
@ -575,6 +614,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
tuplestore_puttuple(tupleStore, heapTuple);
MemoryContextReset(ioContext);
(*rows)++;
}
PQclear(result);
@ -582,7 +622,98 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
pfree(columnArray);
return true;
return !commandFailed;
}
/*
* ConsumeQueryResult gets a query result from a connection, counting the rows
* and checking for errors, but otherwise discarding potentially returned
* rows. Returns true if a non-error result has been returned, false if there
* has been an error.
*/
static bool
ConsumeQueryResult(PGconn *connection, int64 *rows)
{
bool commandFailed = false;
bool gotResponse = false;
*rows = 0;
/*
* Due to single row mode we have to do multiple PQgetResult() to finish
* processing of this query, even without RETURNING. For single-row mode
* we have to loop until all rows are consumed.
*/
while (true)
{
PGresult *result = PQgetResult(connection);
ExecStatusType status = PGRES_COMMAND_OK;
if (result == NULL)
{
break;
}
status = PQresultStatus(result);
if (status != PGRES_COMMAND_OK &&
status != PGRES_SINGLE_TUPLE &&
status != PGRES_TUPLES_OK)
{
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int category = 0;
bool raiseError = false;
/*
* If the error code is in constraint violation class, we want to
* fail fast because we must get the same error from all shard
* placements.
*/
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
raiseError = SqlStateMatchesCategory(sqlStateString, category);
if (raiseError)
{
ReraiseRemoteError(connection, result);
}
else
{
WarnRemoteError(connection, result);
}
PQclear(result);
commandFailed = true;
/* continue, there could be other lingering results due to row mode */
continue;
}
if (status == PGRES_COMMAND_OK)
{
char *currentAffectedTupleString = PQcmdTuples(result);
int64 currentAffectedTupleCount = 0;
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount);
Assert(currentAffectedTupleCount >= 0);
#if (PG_VERSION_NUM < 90600)
/* before 9.6, PostgreSQL used a uint32 for this field, so check */
Assert(currentAffectedTupleCount <= 0xFFFFFFFF);
#endif
*rows += currentAffectedTupleCount;
}
else
{
*rows += PQntuples(result);
}
PQclear(result);
gotResponse = true;
}
return gotResponse && !commandFailed;
}

View File

@ -116,6 +116,14 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
ErrorIfModifyQueryNotSupported(modifyQuery);
/* reject queries with a returning list */
if (list_length(modifyQuery->returningList) > 0)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("master_modify_multiple_shards() does not support RETURNING")));
}
shardIntervalList = LoadShardIntervalList(relationId);
restrictClauseList = WhereClauseList(modifyQuery->jointree);

View File

@ -25,6 +25,8 @@
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/planner.h"
#include "utils/memutils.h"
@ -183,6 +185,9 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan)
fauxFunctionScan = makeNode(FunctionScan);
fauxFunctionScan->functions = lappend(fauxFunctionScan->functions, fauxFunction);
/* copy original targetlist, accessed for RETURNING queries */
fauxFunctionScan->scan.plan.targetlist = copyObject(result->planTree->targetlist);
/*
* Add set returning function to target list if the original (postgres
* created) plan doesn't support backward scans; doing so prevents
@ -196,11 +201,17 @@ MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan)
if (!ExecSupportsBackwardScan(result->planTree))
{
FuncExpr *funcExpr = makeNode(FuncExpr);
TargetEntry *targetEntry = NULL;
bool resjunkAttribute = true;
funcExpr->funcretset = true;
targetEntry = makeTargetEntry((Expr *) funcExpr, InvalidAttrNumber, NULL,
resjunkAttribute);
fauxFunctionScan->scan.plan.targetlist =
lappend(fauxFunctionScan->scan.plan.targetlist,
funcExpr);
targetEntry);
}
result->planTree = (Plan *) fauxFunctionScan;

View File

@ -255,16 +255,6 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
"supported.")));
}
/* reject queries with a returning list */
if (list_length(queryTree->returningList) > 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning for the given"
" modification"),
errdetail("RETURNING clauses are not supported in distributed "
"modifications.")));
}
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
@ -298,6 +288,11 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
{
hasNonConstQualExprs = true;
}
if (contain_mutable_functions((Node *) queryTree->returningList))
{
hasNonConstTargetEntryExprs = true;
}
}
#if (PG_VERSION_NUM >= 90500)

View File

@ -18,8 +18,6 @@ extern bool AllModificationsCommutative;
extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count,
Task *task);
extern bool ExecuteTaskAndStoreResults(Task *task, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore);
extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc);

View File

@ -12,6 +12,10 @@ CREATE TABLE limit_orders (
kind order_side NOT NULL,
limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00)
);
CREATE TABLE multiple_hash (
category text NOT NULL,
data text NOT NULL
);
CREATE TABLE insufficient_shards ( LIKE limit_orders );
CREATE TABLE range_partitioned ( LIKE limit_orders );
CREATE TABLE append_partitioned ( LIKE limit_orders );
@ -21,6 +25,12 @@ SELECT master_create_distributed_table('limit_orders', 'id', 'hash');
(1 row)
SELECT master_create_distributed_table('multiple_hash', 'category', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash');
master_create_distributed_table
---------------------------------
@ -45,6 +55,12 @@ SELECT master_create_worker_shards('limit_orders', 2, 2);
(1 row)
SELECT master_create_worker_shards('multiple_hash', 2, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- make a single shard that covers no partition values
SELECT master_create_worker_shards('insufficient_shards', 1, 1);
master_create_worker_shards
@ -81,6 +97,13 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 32743;
1
(1 row)
-- basic single-row INSERT with RETURNING
INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *;
id | symbol | bidder_id | placed_at | kind | limit_price
-------+--------+-----------+--------------------------+------+-------------
32744 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69
(1 row)
-- try a single-row INSERT with no shard to receive it
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
@ -95,7 +118,7 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
SET client_min_messages TO 'DEBUG2';
SET citus.task_executor_type TO 'real-time';
SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: predicate pruning for shardId 750004
DEBUG: predicate pruning for shardId 750006
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price
-------+--------+-----------+--------------------------+------+-------------
@ -103,7 +126,7 @@ DEBUG: Plan is router executable
(1 row)
SELECT * FROM append_partitioned WHERE id = 414123;
DEBUG: predicate pruning for shardId 750006
DEBUG: predicate pruning for shardId 750008
DEBUG: Plan is router executable
id | symbol | bidder_id | placed_at | kind | limit_price
--------+--------+-----------+--------------------------+------+-------------
@ -157,6 +180,14 @@ INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
DETAIL: Key (id)=(32743) already exists.
CONTEXT: while executing command on localhost:57638
-- INSERT violating primary key constraint, with RETURNING specified.
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *;
ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001"
DETAIL: Key (id)=(32743) already exists.
CONTEXT: while executing command on localhost:57638
-- INSERT, with RETURNING specified, failing with a non-constraint error
INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
ERROR: could not modify any active placements
SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
@ -179,11 +210,6 @@ DETAIL: Multi-row INSERTs to distributed tables are not supported.
INSERT INTO limit_orders SELECT * FROM limit_orders;
ERROR: cannot perform distributed planning for the given modifications
DETAIL: Subqueries are not supported in distributed modifications.
-- commands with a RETURNING clause are unsupported
INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00)
RETURNING *;
ERROR: cannot perform distributed planning for the given modification
DETAIL: RETURNING clauses are not supported in distributed modifications.
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;
@ -204,6 +230,19 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
0
(1 row)
-- test simple DELETE with RETURNING
DELETE FROM limit_orders WHERE id = 430 RETURNING *;
id | symbol | bidder_id | placed_at | kind | limit_price
-----+--------+-----------+--------------------------+------+-----------------
430 | IBM | 214 | Tue Jan 28 15:31:17 2003 | buy | 1.4142135623731
(1 row)
SELECT COUNT(*) FROM limit_orders WHERE id = 430;
count
-------
0
(1 row)
-- DELETE with expression in WHERE clause
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
SELECT COUNT(*) FROM limit_orders WHERE id = 246;
@ -228,10 +267,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries that include both regular and partitioned relations
-- commands with a RETURNING clause are unsupported
DELETE FROM limit_orders WHERE id = 246 RETURNING *;
ERROR: cannot perform distributed planning for the given modification
DETAIL: RETURNING clauses are not supported in distributed modifications.
-- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders;
@ -249,6 +284,13 @@ SELECT symbol FROM limit_orders WHERE id = 246;
GM
(1 row)
-- simple UPDATE with RETURNING
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *;
id | symbol | bidder_id | placed_at | kind | limit_price
-----+--------+-----------+--------------------------+------+-------------
246 | GM | 162 | Mon Jul 02 16:32:15 2007 | sell | 20.69
(1 row)
-- expression UPDATE
UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246;
SELECT bidder_id FROM limit_orders WHERE id = 246;
@ -257,6 +299,13 @@ SELECT bidder_id FROM limit_orders WHERE id = 246;
18
(1 row)
-- expression UPDATE with RETURNING
UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *;
id | symbol | bidder_id | placed_at | kind | limit_price
-----+--------+-----------+--------------------------+------+-------------
246 | GM | 30 | Mon Jul 02 16:32:15 2007 | sell | 20.69
(1 row)
-- multi-column UPDATE
UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246;
SELECT kind, limit_price FROM limit_orders WHERE id = 246;
@ -265,6 +314,13 @@ SELECT kind, limit_price FROM limit_orders WHERE id = 246;
buy | 0.00
(1 row)
-- multi-column UPDATE with RETURNING
UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *;
id | symbol | bidder_id | placed_at | kind | limit_price
-----+--------+-----------+--------------------------+------+-------------
246 | GM | 30 | Mon Jul 02 16:32:15 2007 | buy | 999
(1 row)
-- Test that on unique contraint violations, we fail fast
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
@ -347,10 +403,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: cannot plan queries that include both regular and partitioned relations
-- commands with a RETURNING clause are unsupported
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *;
ERROR: cannot perform distributed planning for the given modification
DETAIL: RETURNING clauses are not supported in distributed modifications.
-- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
UPDATE limit_orders SET symbol = 'GM';
@ -359,7 +411,7 @@ DETAIL: Common table expressions are not supported in distributed modifications
SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
symbol | bidder_id
--------+-----------
GM | 18
GM | 30
(1 row)
-- updates referencing just a var are supported
@ -374,9 +426,123 @@ SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
gm | 247
(1 row)
-- IMMUTABLE functions are allowed -- even in returning
UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol;
id | lower | symbol
-----+-------+--------
246 | gm | GM
(1 row)
-- updates referencing non-IMMUTABLE functions are unsupported
UPDATE limit_orders SET placed_at = now() WHERE id = 246;
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
-- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE
-- cursors are not supported
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
ERROR: distributed modifications must target exactly one shard
-- check that multi-row UPDATE/DELETEs with RETURNING work
INSERT INTO multiple_hash VALUES ('0', '1');
INSERT INTO multiple_hash VALUES ('0', '2');
INSERT INTO multiple_hash VALUES ('0', '3');
INSERT INTO multiple_hash VALUES ('0', '4');
INSERT INTO multiple_hash VALUES ('0', '5');
INSERT INTO multiple_hash VALUES ('0', '6');
UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *;
category | data
----------+------
0 | 1-1
0 | 2-1
0 | 3-1
0 | 4-1
0 | 5-1
0 | 6-1
(6 rows)
DELETE FROM multiple_hash WHERE category = '0' RETURNING *;
category | data
----------+------
0 | 1-1
0 | 2-1
0 | 3-1
0 | 4-1
0 | 5-1
0 | 6-1
(6 rows)
-- ensure returned row counters are correct
\set QUIET off
INSERT INTO multiple_hash VALUES ('1', '1');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('1', '2');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('1', '3');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('2', '1');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('2', '2');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('2', '3');
INSERT 0 1
INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *;
category | data
----------+------
2 | 3
(1 row)
INSERT 0 1
-- check that update return the right number of rows
-- one row
UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1';
UPDATE 1
-- three rows
UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1';
UPDATE 3
-- three rows, with RETURNING
UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category;
category
----------
1
1
1
(3 rows)
UPDATE 3
-- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
category | data
----------+---------
1 | 1-1-2-2
1 | 2-2-2
1 | 3-2-2
(3 rows)
-- check that deletes return the right number of rows
-- one row
DELETE FROM multiple_hash WHERE category = '2' AND data = '1';
DELETE 1
-- two rows
DELETE FROM multiple_hash WHERE category = '2';
DELETE 3
-- three rows, with RETURNING
DELETE FROM multiple_hash WHERE category = '1' RETURNING category;
category
----------
1
1
1
(3 rows)
DELETE 3
-- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
category | data
----------+------
(0 rows)
SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;
category | data
----------+------
(0 rows)

View File

@ -62,8 +62,7 @@ ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
-- commands with a RETURNING clause are unsupported
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 3 RETURNING *');
ERROR: cannot perform distributed planning for the given modification
DETAIL: RETURNING clauses are not supported in distributed modifications.
ERROR: master_modify_multiple_shards() does not support RETURNING
-- commands containing a CTE are unsupported
SELECT master_modify_multiple_shards('WITH deleted_stuff AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) DELETE FROM multi_shard_modify_test');
ERROR: cannot perform distributed planning for the given modification
@ -205,8 +204,7 @@ ERROR: cannot perform distributed planning for the given modification
DETAIL: Joins are not supported in distributed modifications.
-- commands with a RETURNING clause are unsupported
SELECT master_modify_multiple_shards('UPDATE multi_shard_modify_test SET t_name=''FAIL'' WHERE t_key=4 RETURNING *');
ERROR: cannot perform distributed planning for the given modification
DETAIL: RETURNING clauses are not supported in distributed modifications.
ERROR: master_modify_multiple_shards() does not support RETURNING
-- commands containing a CTE are unsupported
SELECT master_modify_multiple_shards('WITH t AS (INSERT INTO multi_shard_modify_test DEFAULT VALUES RETURNING *) UPDATE multi_shard_modify_test SET t_name = ''FAIL'' ');
ERROR: cannot perform distributed planning for the given modification

View File

@ -105,6 +105,23 @@ SELECT * FROM upsert_test;
1 | 5 | 872
(1 row)
-- Test upsert, with returning:
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
part_key | other_col | third_col
----------+-----------+-----------
2 | 2 |
(1 row)
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
part_key | other_col | third_col
----------+-----------+-----------
2 | 3 |
(1 row)
-- create another table
CREATE TABLE upsert_test_2
(

View File

@ -138,6 +138,19 @@ SELECT * FROM upsert_test;
1 | 1 |
(1 row)
-- Test upsert, with returning:
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
ERROR: syntax error at or near "ON"
LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3
^
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
ERROR: syntax error at or near "ON"
LINE 2: ON CONFLICT (part_key) DO UPDATE SET other_col = 3
^
-- create another table
CREATE TABLE upsert_test_2
(

View File

@ -18,16 +18,24 @@ CREATE TABLE limit_orders (
limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00)
);
CREATE TABLE multiple_hash (
category text NOT NULL,
data text NOT NULL
);
CREATE TABLE insufficient_shards ( LIKE limit_orders );
CREATE TABLE range_partitioned ( LIKE limit_orders );
CREATE TABLE append_partitioned ( LIKE limit_orders );
SELECT master_create_distributed_table('limit_orders', 'id', 'hash');
SELECT master_create_distributed_table('multiple_hash', 'category', 'hash');
SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash');
SELECT master_create_distributed_table('range_partitioned', 'id', 'range');
SELECT master_create_distributed_table('append_partitioned', 'id', 'append');
SELECT master_create_worker_shards('limit_orders', 2, 2);
SELECT master_create_worker_shards('multiple_hash', 2, 2);
-- make a single shard that covers no partition values
SELECT master_create_worker_shards('insufficient_shards', 1, 1);
@ -61,6 +69,9 @@ INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'bu
20.69);
SELECT COUNT(*) FROM limit_orders WHERE id = 32743;
-- basic single-row INSERT with RETURNING
INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *;
-- try a single-row INSERT with no shard to receive it
INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69);
@ -110,10 +121,16 @@ INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT);
-- INSERT violating column constraint
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
-5.00);
-- INSERT violating primary key constraint
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
-- INSERT violating primary key constraint, with RETURNING specified.
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *;
-- INSERT, with RETURNING specified, failing with a non-constraint error
INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0;
SET client_min_messages TO DEFAULT;
-- commands with non-constant partition values are unsupported
@ -135,10 +152,6 @@ INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT);
-- INSERT ... SELECT ... FROM commands are unsupported
INSERT INTO limit_orders SELECT * FROM limit_orders;
-- commands with a RETURNING clause are unsupported
INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00)
RETURNING *;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;
@ -150,6 +163,10 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
DELETE FROM limit_orders WHERE id = 246;
SELECT COUNT(*) FROM limit_orders WHERE id = 246;
-- test simple DELETE with RETURNING
DELETE FROM limit_orders WHERE id = 430 RETURNING *;
SELECT COUNT(*) FROM limit_orders WHERE id = 430;
-- DELETE with expression in WHERE clause
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
SELECT COUNT(*) FROM limit_orders WHERE id = 246;
@ -166,9 +183,6 @@ DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
-- commands with a RETURNING clause are unsupported
DELETE FROM limit_orders WHERE id = 246 RETURNING *;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
DELETE FROM limit_orders;
@ -182,14 +196,23 @@ INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell'
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246;
SELECT symbol FROM limit_orders WHERE id = 246;
-- simple UPDATE with RETURNING
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *;
-- expression UPDATE
UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246;
SELECT bidder_id FROM limit_orders WHERE id = 246;
-- expression UPDATE with RETURNING
UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *;
-- multi-column UPDATE
UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246;
SELECT kind, limit_price FROM limit_orders WHERE id = 246;
-- multi-column UPDATE with RETURNING
UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *;
-- Test that on unique contraint violations, we fail fast
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67);
@ -267,9 +290,6 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
-- commands with a RETURNING clause are unsupported
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *)
UPDATE limit_orders SET symbol = 'GM';
@ -287,8 +307,56 @@ UPDATE limit_orders SET symbol = LOWER(symbol) WHERE id = 246;
SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
-- IMMUTABLE functions are allowed -- even in returning
UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol;
-- updates referencing non-IMMUTABLE functions are unsupported
UPDATE limit_orders SET placed_at = now() WHERE id = 246;
-- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
-- cursors are not supported
UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name;
-- check that multi-row UPDATE/DELETEs with RETURNING work
INSERT INTO multiple_hash VALUES ('0', '1');
INSERT INTO multiple_hash VALUES ('0', '2');
INSERT INTO multiple_hash VALUES ('0', '3');
INSERT INTO multiple_hash VALUES ('0', '4');
INSERT INTO multiple_hash VALUES ('0', '5');
INSERT INTO multiple_hash VALUES ('0', '6');
UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *;
DELETE FROM multiple_hash WHERE category = '0' RETURNING *;
-- ensure returned row counters are correct
\set QUIET off
INSERT INTO multiple_hash VALUES ('1', '1');
INSERT INTO multiple_hash VALUES ('1', '2');
INSERT INTO multiple_hash VALUES ('1', '3');
INSERT INTO multiple_hash VALUES ('2', '1');
INSERT INTO multiple_hash VALUES ('2', '2');
INSERT INTO multiple_hash VALUES ('2', '3');
INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *;
-- check that update return the right number of rows
-- one row
UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1';
-- three rows
UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1';
-- three rows, with RETURNING
UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category;
-- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
-- check that deletes return the right number of rows
-- one row
DELETE FROM multiple_hash WHERE category = '2' AND data = '1';
-- two rows
DELETE FROM multiple_hash WHERE category = '2';
-- three rows, with RETURNING
DELETE FROM multiple_hash WHERE category = '1' RETURNING category;
-- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;

View File

@ -85,6 +85,15 @@ INSERT INTO upsert_test as ups_test (part_key, other_col) VALUES (1, 1) ON CONFL
-- see the results
SELECT * FROM upsert_test;
-- Test upsert, with returning:
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
INSERT INTO upsert_test (part_key, other_col) VALUES (2, 2)
ON CONFLICT (part_key) DO UPDATE SET other_col = 3
RETURNING *;
-- create another table
CREATE TABLE upsert_test_2
(