diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a96d87529..6cc0c5dca 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -154,6 +154,8 @@ static void CopySendInt32(CopyOutState outputState, int32 val); static void CopySendInt16(CopyOutState outputState, int16 val); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); +static bool CitusSendTupleToPlacements(TupleTableSlot *slot, + CitusCopyDestReceiver *copyDest); /* CitusCopyDestReceiver functions */ static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, @@ -2251,8 +2253,37 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { + bool result = false; CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + PG_TRY(); + { + result = CitusSendTupleToPlacements(slot, copyDest); + } + PG_CATCH(); + { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ + HTAB *shardConnectionHash = copyDest->shardConnectionHash; + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return result; +} + + +/* + * CitusSendTupleToPlacements sends the given TupleTableSlot to the appropriate + * shard placement(s). + */ +static bool +CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest) +{ int partitionColumnIndex = copyDest->partitionColumnIndex; TupleDesc tupleDescriptor = copyDest->tupleDescriptor; CopyStmt *copyStatement = copyDest->copyStatement; @@ -2409,21 +2440,37 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) + + PG_TRY(); { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); - - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) + foreach(shardConnectionsCell, shardConnectionsList) { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } + ShardConnections *shardConnections = (ShardConnections *) lfirst( + shardConnectionsCell); - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); + /* send copy binary footers to all shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); + } + + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, + true); + } } + PG_CATCH(); + { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); heap_close(distributedRelation, NoLock); } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7a7043111..a6e23bb5b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -464,7 +464,7 @@ ShutdownConnection(MultiConnection *connection) if (PQstatus(connection->pgConn) == CONNECTION_OK && PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) { - char errorMessage[256] = { 0 }; + char errorMessage[ERROR_BUFFER_SIZE] = { 0 }; PGcancel *cancel = PQgetCancel(connection->pgConn); if (!PQcancel(cancel, errorMessage, sizeof(errorMessage))) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6f5132c21..21d8bd56e 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -29,6 +29,8 @@ bool LogRemoteCommands = false; +static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors, + bool discardWarnings); static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, @@ -71,7 +73,7 @@ ForgetResults(MultiConnection *connection) /* - * ClearResults clears a connection from pending activity, + * ClearResultsInternal clears a connection from pending activity, * returns true if all pending commands return success. It raises * error if raiseErrors flag is set, any command fails and transaction * is marked critical. @@ -81,6 +83,27 @@ ForgetResults(MultiConnection *connection) */ bool ClearResults(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, false); +} + + +/* + * ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't + * emit warnings. + */ +bool +ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, true); +} + + +/* + * ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings. + */ +static bool +ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings) { bool success = true; @@ -103,7 +126,11 @@ ClearResults(MultiConnection *connection, bool raiseErrors) if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); + if (!discardWarnings) + { + ReportResultError(connection, result, WARNING); + } + MarkRemoteTransactionFailed(connection, raiseErrors); success = false; @@ -1008,3 +1035,26 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, return waitEventSet; } + + +/* + * SendCancelationRequest sends a cancelation request on the given connection. + * Return value indicates whether the cancelation request was sent successfully. + */ +bool +SendCancelationRequest(MultiConnection *connection) +{ + char errorBuffer[ERROR_BUFFER_SIZE] = { 0 }; + PGcancel *cancelObject = PQgetCancel(connection->pgConn); + + bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); + if (!cancelSent) + { + ereport(WARNING, (errmsg("could not issue cancel request"), + errdetail("Client error: %s", errorBuffer))); + } + + PQfreeCancel(cancelObject); + + return cancelSent; +} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index 907a37a1c..029152319 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -443,27 +443,13 @@ bool MultiClientCancel(int32 connectionId) { MultiConnection *connection = NULL; - PGcancel *cancelObject = NULL; - int cancelSent = 0; bool canceled = true; - char errorBuffer[STRING_BUFFER_SIZE]; Assert(connectionId != INVALID_CONNECTION_ID); connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); - cancelObject = PQgetCancel(connection->pgConn); - - cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); - if (cancelSent == 0) - { - ereport(WARNING, (errmsg("could not issue cancel request"), - errdetail("Client error: %s", errorBuffer))); - - canceled = false; - } - - PQfreeCancel(cancelObject); + canceled = SendCancelationRequest(connection); return canceled; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 25ea7c921..c5259cc6f 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1574,6 +1574,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } } @@ -1616,28 +1617,42 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn SetCitusNoticeLevel(INFO); } - /* - * If caller is interested, store query results the first time - * through. The output of the query's execution on other shards is - * discarded if we run there (because it's a modification query). - */ - if (placementIndex == 0 && expectResults) + PG_TRY(); { - Assert(scanState != NULL); + /* + * If caller is interested, store query results the first time + * through. The output of the query's execution on other shards is + * discarded if we run there (because it's a modification query). + */ + if (placementIndex == 0 && expectResults) + { + Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, - alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount, NULL); + queryOK = StoreQueryResult(scanState, connection, + alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount, NULL); + } + else + { + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, + ¤tAffectedTupleCount); + } } - else + PG_CATCH(); { - queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, - ¤tAffectedTupleCount); + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ + UnclaimAllShardConnections(shardConnectionHash); + PG_RE_THROW(); } + PG_END_TRY(); /* We error out if the worker fails to return a result for the query. */ if (!queryOK) { + UnclaimAllShardConnections(shardConnectionHash); ReportConnectionError(connection, ERROR); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index f9028c379..5df7a7ad8 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1068,6 +1068,13 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) MultiConnection *connection = dlist_container(MultiConnection, transactionNode, iter.cur); RemoteTransaction *transaction = &connection->remoteTransaction; + + /* cancel any ongoing queries before issuing rollback */ + SendCancelationRequest(connection); + + /* clear results, but don't show cancelation warning messages from workers. */ + ClearResultsDiscardWarnings(connection, raiseInterrupts); + if (transaction->transactionFailed) { if (transaction->lastSuccessfulSubXact <= subId) diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index b6b6b0f1e..1cbf8719e 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -21,6 +21,9 @@ /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ +/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */ +#define ERROR_BUFFER_SIZE 256 + /* default notice level */ #define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 9a0ad75be..5a55a25a1 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -27,6 +27,7 @@ extern bool LogRemoteCommands; extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); extern bool ClearResults(MultiConnection *connection, bool raiseErrors); +extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors); extern bool ClearResultsIfReady(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); @@ -57,5 +58,6 @@ extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); /* waiting for multiple command results */ extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts); +extern bool SendCancelationRequest(MultiConnection *connection); #endif /* REMOTE_COMMAND_H */ diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index ca8a968b1..a95451d1a 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -38,6 +38,8 @@ CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 DELETE FROM artists WHERE id=4; +WARNING: could not issue cancel request +DETAIL: Client error: PQcancel() -- no cancel object supplied WARNING: connection not open CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 @@ -195,6 +197,7 @@ INSERT INTO artists VALUES (7, 'Emily Carr'); ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open +WARNING: could not issue cancel request COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -230,6 +233,7 @@ WARNING: connection not open WARNING: connection not open INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; +WARNING: could not issue cancel request WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index d28d62bce..7a6812dce 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -129,6 +129,140 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: savepoint "s3" does not exist COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Sogand'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); +COMMIT; +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) + +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + create_distributed_table | create_distributed_table +--------------------------+-------------------------- + | +(1 row) + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; +select * from t2 order by a, b; + a | b +---+--- + 1 | 4 + 2 | 5 + 3 | 6 +(3 rows) + +select * from t1 order by a, b; + a | b +---+--- + 1 | 3 + 1 | 4 + 2 | 4 + 2 | 5 + 3 | 5 + 3 | 6 +(6 rows) + +drop table t1, t2; -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index 74765de10..3f7f8aa2c 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -129,6 +129,140 @@ ERROR: current transaction is aborted, commands ignored until end of transactio ROLLBACK TO SAVEPOINT s3; ERROR: no such savepoint COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Sogand'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); +COMMIT; +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) + +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + create_distributed_table | create_distributed_table +--------------------------+-------------------------- + | +(1 row) + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +ERROR: new row for relation "t2_1190004" violates check constraint "t2_b_check" +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +ERROR: new row for relation "t2_1190005" violates check constraint "t2_b_check" +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; +select * from t2 order by a, b; + a | b +---+--- + 1 | 4 + 2 | 5 + 3 | 6 +(3 rows) + +select * from t1 order by a, b; + a | b +---+--- + 1 | 3 + 1 | 4 + 2 | 4 + 2 | 5 + 3 | 5 + 3 | 6 +(6 rows) + +drop table t1, t2; -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index 5865aa467..2dc8fb2f9 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -105,6 +105,103 @@ SAVEPOINT s3; ROLLBACK TO SAVEPOINT s3; COMMIT; +-- Recover from multi-shard modify errors +BEGIN; +INSERT INTO artists VALUES (8, 'Sogand'); +SAVEPOINT s1; +UPDATE artists SET name = NULL; +ROLLBACK TO s1; +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); +COMMIT; + +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; + +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; + +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; + +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + +-- Recover from multi-shard CTE modify failures +create table t1(a int, b int); +create table t2(a int, b int CHECK(b > 0)); + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; + +select create_distributed_table('t1', 'a'), + create_distributed_table('t2', 'a'); + +begin; +insert into t2 select i, i+1 from generate_series(1, 3) i; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +savepoint s1; +with r AS ( + update t1 set b = b - 10 + returning * +) insert into t2 select * from r; +rollback to savepoint s1; +savepoint s2; +with r AS ( + update t2 set b = b - 10 + returning * +) insert into t1 select * from r; +rollback to savepoint s2; +savepoint s3; +with r AS ( + insert into t2 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t1 select * from r; +rollback to savepoint s3; +savepoint s4; +with r AS ( + insert into t1 select i, i+1 from generate_series(-10,-5) i + returning * +) insert into t2 select * from r; +rollback to savepoint s4; +with r AS ( + update t2 set b = b + 1 + returning * +) insert into t1 select * from r; +commit; + +select * from t2 order by a, b; +select * from t1 order by a, b; + +drop table t1, t2; + -- =================================================================== -- Tests for replication factor > 1 -- =================================================================== @@ -216,3 +313,4 @@ SELECT * FROM researchers WHERE lab_id=10; -- Clean-up DROP TABLE artists; DROP TABLE researchers; +