From aafd22dffa3134b2275195be25ad2b2f31ad7482 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 22 Apr 2019 11:55:11 -0700 Subject: [PATCH] Fix savepoint rollback for INSERT INTO ... SELECT. --- src/backend/distributed/commands/multi_copy.c | 65 ++++++++++++++--- .../distributed/connection/remote_commands.c | 31 +++++++- .../transaction/remote_transaction.c | 5 +- src/include/distributed/remote_commands.h | 1 + .../regress/expected/failure_savepoints.out | 4 -- .../expected/multi_subtransactions.out | 70 +++++++++++++++---- .../expected/multi_subtransactions_0.out | 70 +++++++++++++++---- .../regress/sql/multi_subtransactions.sql | 43 +++++++++++- 8 files changed, 239 insertions(+), 50 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a96d87529..9c439f831 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -154,6 +154,8 @@ static void CopySendInt32(CopyOutState outputState, int32 val); static void CopySendInt16(CopyOutState outputState, int16 val); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); +static bool CitusSendTupleToPlacements(TupleTableSlot *slot, + CitusCopyDestReceiver *copyDest); /* CitusCopyDestReceiver functions */ static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, @@ -2251,8 +2253,33 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { + bool result = false; CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + PG_TRY(); + { + result = CitusSendTupleToPlacements(slot, copyDest); + } + PG_CATCH(); + { + HTAB *shardConnectionHash = copyDest->shardConnectionHash; + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return result; +} + + +/* + * CitusSendTupleToPlacements sends the given TupleTableSlot to the appropriate + * shard placement(s). + */ +static bool +CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest) +{ int partitionColumnIndex = copyDest->partitionColumnIndex; TupleDesc tupleDescriptor = copyDest->tupleDescriptor; CopyStmt *copyStatement = copyDest->copyStatement; @@ -2409,21 +2436,37 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) + + PG_TRY(); { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); - - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) + foreach(shardConnectionsCell, shardConnectionsList) { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } + ShardConnections *shardConnections = (ShardConnections *) lfirst( + shardConnectionsCell); - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); + /* send copy binary footers to all shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); + } + + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, + true); + } } + PG_CATCH(); + { + /* + * We might be able to recover from errors with ROLLBACK TO SAVEPOINT, + * so unclaim the connections before throwing errors. + */ + UnclaimAllShardConnections(shardConnectionHash); + + PG_RE_THROW(); + } + PG_END_TRY(); heap_close(distributedRelation, NoLock); } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index dbce8e070..8fe174251 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -29,6 +29,8 @@ bool LogRemoteCommands = false; +static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors, + bool discardWarnings); static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, @@ -71,7 +73,7 @@ ForgetResults(MultiConnection *connection) /* - * ClearResults clears a connection from pending activity, + * ClearResultsInternal clears a connection from pending activity, * returns true if all pending commands return success. It raises * error if raiseErrors flag is set, any command fails and transaction * is marked critical. @@ -81,6 +83,27 @@ ForgetResults(MultiConnection *connection) */ bool ClearResults(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, false); +} + + +/* + * ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't + * emit warnings. + */ +bool +ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors) +{ + return ClearResultsInternal(connection, raiseErrors, true); +} + + +/* + * ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings. + */ +static bool +ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings) { bool success = true; @@ -103,7 +126,11 @@ ClearResults(MultiConnection *connection, bool raiseErrors) if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); + if (!discardWarnings) + { + ReportResultError(connection, result, WARNING); + } + MarkRemoteTransactionFailed(connection, raiseErrors); success = false; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index e49ba9e6d..5df7a7ad8 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1070,9 +1070,10 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) RemoteTransaction *transaction = &connection->remoteTransaction; /* cancel any ongoing queries before issuing rollback */ - ClearResultsIfReady(connection); SendCancelationRequest(connection); - ForgetResults(connection); + + /* clear results, but don't show cancelation warning messages from workers. */ + ClearResultsDiscardWarnings(connection, raiseInterrupts); if (transaction->transactionFailed) { diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 424cb00a7..5a55a25a1 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -27,6 +27,7 @@ extern bool LogRemoteCommands; extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); extern bool ClearResults(MultiConnection *connection, bool raiseErrors); +extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors); extern bool ClearResultsIfReady(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index 357ce6118..a95451d1a 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -42,8 +42,6 @@ WARNING: could not issue cancel request DETAIL: Client error: PQcancel() -- no cancel object supplied WARNING: connection not open CONTEXT: while executing command on localhost:9060 -WARNING: connection not open -CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 DETAIL: connection not open WARNING: connection not open @@ -200,7 +198,6 @@ ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open WARNING: could not issue cancel request -WARNING: connection not open COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -238,7 +235,6 @@ INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; WARNING: could not issue cancel request WARNING: connection not open -WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open WARNING: connection not open diff --git a/src/test/regress/expected/multi_subtransactions.out b/src/test/regress/expected/multi_subtransactions.out index 5f677d7d8..d8020bc8d 100644 --- a/src/test/regress/expected/multi_subtransactions.out +++ b/src/test/regress/expected/multi_subtransactions.out @@ -131,25 +131,67 @@ ERROR: savepoint "s3" does not exist COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ERROR: null value in column "name" violates not-null constraint ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; - id | name -----+------------------ - 1 | Pablo Picasso - 2 | Vincent van Gogh - 3 | Claude Monet - 4 | William Kurelek - 5 | Jacob Kahn - 6 | Emily Carr - 8 | Uncle Yaakov - 9 | Anna Schaeffer -(8 rows) +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) -- =================================================================== -- Tests for replication factor > 1 diff --git a/src/test/regress/expected/multi_subtransactions_0.out b/src/test/regress/expected/multi_subtransactions_0.out index e03a667d6..9caa09e59 100644 --- a/src/test/regress/expected/multi_subtransactions_0.out +++ b/src/test/regress/expected/multi_subtransactions_0.out @@ -131,25 +131,67 @@ ERROR: no such savepoint COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ERROR: null value in column "name" violates not-null constraint ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; - id | name -----+------------------ - 1 | Pablo Picasso - 2 | Vincent van Gogh - 3 | Claude Monet - 4 | William Kurelek - 5 | Jacob Kahn - 6 | Emily Carr - 8 | Uncle Yaakov - 9 | Anna Schaeffer -(8 rows) +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + id | name +----+--------------- + 8 | Sogand + 9 | Mohsen Namjoo +(2 rows) + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ERROR: null value in column "name" violates not-null constraint +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + id | name +----+-------------------- + 10 | Mahmoud Farshchian +(1 row) + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists cannot be NULL +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + id | name +----+-------------- + 11 | Egon Schiele +(1 row) + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ERROR: the partition column of table public.artists should have a value +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + id | name +----+-------------- + 12 | Marc Chagall +(1 row) -- =================================================================== -- Tests for replication factor > 1 diff --git a/src/test/regress/sql/multi_subtransactions.sql b/src/test/regress/sql/multi_subtransactions.sql index 992e23824..d0f163244 100644 --- a/src/test/regress/sql/multi_subtransactions.sql +++ b/src/test/regress/sql/multi_subtransactions.sql @@ -107,14 +107,51 @@ COMMIT; -- Recover from multi-shard modify errors BEGIN; -INSERT INTO artists VALUES (8, 'Uncle Yaakov'); +INSERT INTO artists VALUES (8, 'Sogand'); SAVEPOINT s1; UPDATE artists SET name = NULL; ROLLBACK TO s1; -INSERT INTO artists VALUES (9, 'Anna Schaeffer'); +INSERT INTO artists VALUES (9, 'Mohsen Namjoo'); COMMIT; -SELECT * FROM artists ORDER BY id; +SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id; + +-- Recover from multi-shard copy shutdown failure. +-- Constraint check for non-partition columns happen only at copy shutdown. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (10, 'Mahmoud Farshchian'); +COMMIT; + +SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id; + +-- Recover from multi-shard copy send failure. +-- Constraint check for partition column happens at copy send. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (11, 'Egon Schiele'); +COMMIT; + +SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id; + +-- Recover from multi-shard copy startup failure. +-- Check for existence of a value for partition columnn happens at copy startup. +BEGIN; +DELETE FROM artists; +SAVEPOINT s1; +INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i; +ROLLBACK TO s1; +INSERT INTO artists VALUES (12, 'Marc Chagall'); +COMMIT; + +SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id; + -- =================================================================== -- Tests for replication factor > 1