Fix savepoint rollback for INSERT INTO ... SELECT.

pull/2673/head
Hadi Moshayedi 2019-04-22 11:55:11 -07:00
parent b69a762e0b
commit aafd22dffa
8 changed files with 239 additions and 50 deletions

View File

@ -154,6 +154,8 @@ static void CopySendInt32(CopyOutState outputState, int32 val);
static void CopySendInt16(CopyOutState outputState, int16 val); static void CopySendInt16(CopyOutState outputState, int16 val);
static void CopyAttributeOutText(CopyOutState outputState, char *string); static void CopyAttributeOutText(CopyOutState outputState, char *string);
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
static bool CitusSendTupleToPlacements(TupleTableSlot *slot,
CitusCopyDestReceiver *copyDest);
/* CitusCopyDestReceiver functions */ /* CitusCopyDestReceiver functions */
static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation,
@ -2251,8 +2253,33 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
static bool static bool
CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
bool result = false;
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
PG_TRY();
{
result = CitusSendTupleToPlacements(slot, copyDest);
}
PG_CATCH();
{
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
UnclaimAllShardConnections(shardConnectionHash);
PG_RE_THROW();
}
PG_END_TRY();
return result;
}
/*
* CitusSendTupleToPlacements sends the given TupleTableSlot to the appropriate
* shard placement(s).
*/
static bool
CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest)
{
int partitionColumnIndex = copyDest->partitionColumnIndex; int partitionColumnIndex = copyDest->partitionColumnIndex;
TupleDesc tupleDescriptor = copyDest->tupleDescriptor; TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
CopyStmt *copyStatement = copyDest->copyStatement; CopyStmt *copyStatement = copyDest->copyStatement;
@ -2409,21 +2436,37 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
Relation distributedRelation = copyDest->distributedRelation; Relation distributedRelation = copyDest->distributedRelation;
shardConnectionsList = ShardConnectionList(shardConnectionHash); shardConnectionsList = ShardConnectionList(shardConnectionHash);
foreach(shardConnectionsCell, shardConnectionsList)
PG_TRY();
{ {
ShardConnections *shardConnections = (ShardConnections *) lfirst( foreach(shardConnectionsCell, shardConnectionsList)
shardConnectionsCell);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{ {
SendCopyBinaryFooters(copyOutState, shardConnections->shardId, ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnections->connectionList); shardConnectionsCell);
}
/* close the COPY input on all shard placements */ /* send copy binary footers to all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList,
true);
}
} }
PG_CATCH();
{
/*
* We might be able to recover from errors with ROLLBACK TO SAVEPOINT,
* so unclaim the connections before throwing errors.
*/
UnclaimAllShardConnections(shardConnectionHash);
PG_RE_THROW();
}
PG_END_TRY();
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
} }

View File

@ -29,6 +29,8 @@
bool LogRemoteCommands = false; bool LogRemoteCommands = false;
static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
bool discardWarnings);
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections, static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
int totalConnectionCount, int totalConnectionCount,
@ -71,7 +73,7 @@ ForgetResults(MultiConnection *connection)
/* /*
* ClearResults clears a connection from pending activity, * ClearResultsInternal clears a connection from pending activity,
* returns true if all pending commands return success. It raises * returns true if all pending commands return success. It raises
* error if raiseErrors flag is set, any command fails and transaction * error if raiseErrors flag is set, any command fails and transaction
* is marked critical. * is marked critical.
@ -81,6 +83,27 @@ ForgetResults(MultiConnection *connection)
*/ */
bool bool
ClearResults(MultiConnection *connection, bool raiseErrors) ClearResults(MultiConnection *connection, bool raiseErrors)
{
return ClearResultsInternal(connection, raiseErrors, false);
}
/*
* ClearResultsDiscardWarnings does the same thing as ClearResults, but doesn't
* emit warnings.
*/
bool
ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors)
{
return ClearResultsInternal(connection, raiseErrors, true);
}
/*
* ClearResultsInternal is used by ClearResults and ClearResultsDiscardWarnings.
*/
static bool
ClearResultsInternal(MultiConnection *connection, bool raiseErrors, bool discardWarnings)
{ {
bool success = true; bool success = true;
@ -103,7 +126,11 @@ ClearResults(MultiConnection *connection, bool raiseErrors)
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING); if (!discardWarnings)
{
ReportResultError(connection, result, WARNING);
}
MarkRemoteTransactionFailed(connection, raiseErrors); MarkRemoteTransactionFailed(connection, raiseErrors);
success = false; success = false;

View File

@ -1070,9 +1070,10 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
/* cancel any ongoing queries before issuing rollback */ /* cancel any ongoing queries before issuing rollback */
ClearResultsIfReady(connection);
SendCancelationRequest(connection); SendCancelationRequest(connection);
ForgetResults(connection);
/* clear results, but don't show cancelation warning messages from workers. */
ClearResultsDiscardWarnings(connection, raiseInterrupts);
if (transaction->transactionFailed) if (transaction->transactionFailed)
{ {

View File

@ -27,6 +27,7 @@ extern bool LogRemoteCommands;
extern bool IsResponseOK(struct pg_result *result); extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection); extern void ForgetResults(MultiConnection *connection);
extern bool ClearResults(MultiConnection *connection, bool raiseErrors); extern bool ClearResults(MultiConnection *connection, bool raiseErrors);
extern bool ClearResultsDiscardWarnings(MultiConnection *connection, bool raiseErrors);
extern bool ClearResultsIfReady(MultiConnection *connection); extern bool ClearResultsIfReady(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category); extern bool SqlStateMatchesCategory(char *sqlStateString, int category);

View File

@ -42,8 +42,6 @@ WARNING: could not issue cancel request
DETAIL: Client error: PQcancel() -- no cancel object supplied DETAIL: Client error: PQcancel() -- no cancel object supplied
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:9060 CONTEXT: while executing command on localhost:9060
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
WARNING: connection error: localhost:9060 WARNING: connection error: localhost:9060
DETAIL: connection not open DETAIL: connection not open
WARNING: connection not open WARNING: connection not open
@ -200,7 +198,6 @@ ROLLBACK TO SAVEPOINT s1;
WARNING: connection not open WARNING: connection not open
WARNING: connection not open WARNING: connection not open
WARNING: could not issue cancel request WARNING: could not issue cancel request
WARNING: connection not open
COMMIT; COMMIT;
ERROR: could not make changes to shard 100950 on any node ERROR: could not make changes to shard 100950 on any node
SELECT * FROM artists WHERE id=6; SELECT * FROM artists WHERE id=6;
@ -238,7 +235,6 @@ INSERT INTO researchers VALUES (8, 4, 'Alonzo Church');
ROLLBACK TO s1; ROLLBACK TO s1;
WARNING: could not issue cancel request WARNING: could not issue cancel request
WARNING: connection not open WARNING: connection not open
WARNING: connection not open
WARNING: connection error: localhost:9060 WARNING: connection error: localhost:9060
WARNING: connection not open WARNING: connection not open
WARNING: connection not open WARNING: connection not open

View File

@ -131,25 +131,67 @@ ERROR: savepoint "s3" does not exist
COMMIT; COMMIT;
-- Recover from multi-shard modify errors -- Recover from multi-shard modify errors
BEGIN; BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov'); INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1; SAVEPOINT s1;
UPDATE artists SET name = NULL; UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1; ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer'); INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT; COMMIT;
SELECT * FROM artists ORDER BY id; SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
id | name id | name
----+------------------ ----+---------------
1 | Pablo Picasso 8 | Sogand
2 | Vincent van Gogh 9 | Mohsen Namjoo
3 | Claude Monet (2 rows)
4 | William Kurelek
5 | Jacob Kahn -- Recover from multi-shard copy shutdown failure.
6 | Emily Carr -- Constraint check for non-partition columns happen only at copy shutdown.
8 | Uncle Yaakov BEGIN;
9 | Anna Schaeffer DELETE FROM artists;
(8 rows) SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
id | name
----+--------------------
10 | Mahmoud Farshchian
(1 row)
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists cannot be NULL
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
id | name
----+--------------
11 | Egon Schiele
(1 row)
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists should have a value
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
id | name
----+--------------
12 | Marc Chagall
(1 row)
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1

View File

@ -131,25 +131,67 @@ ERROR: no such savepoint
COMMIT; COMMIT;
-- Recover from multi-shard modify errors -- Recover from multi-shard modify errors
BEGIN; BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov'); INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1; SAVEPOINT s1;
UPDATE artists SET name = NULL; UPDATE artists SET name = NULL;
ERROR: null value in column "name" violates not-null constraint ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1; ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer'); INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT; COMMIT;
SELECT * FROM artists ORDER BY id; SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
id | name id | name
----+------------------ ----+---------------
1 | Pablo Picasso 8 | Sogand
2 | Vincent van Gogh 9 | Mohsen Namjoo
3 | Claude Monet (2 rows)
4 | William Kurelek
5 | Jacob Kahn -- Recover from multi-shard copy shutdown failure.
6 | Emily Carr -- Constraint check for non-partition columns happen only at copy shutdown.
8 | Uncle Yaakov BEGIN;
9 | Anna Schaeffer DELETE FROM artists;
(8 rows) SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ERROR: null value in column "name" violates not-null constraint
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
id | name
----+--------------------
10 | Mahmoud Farshchian
(1 row)
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists cannot be NULL
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
id | name
----+--------------
11 | Egon Schiele
(1 row)
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ERROR: the partition column of table public.artists should have a value
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
id | name
----+--------------
12 | Marc Chagall
(1 row)
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1

View File

@ -107,14 +107,51 @@ COMMIT;
-- Recover from multi-shard modify errors -- Recover from multi-shard modify errors
BEGIN; BEGIN;
INSERT INTO artists VALUES (8, 'Uncle Yaakov'); INSERT INTO artists VALUES (8, 'Sogand');
SAVEPOINT s1; SAVEPOINT s1;
UPDATE artists SET name = NULL; UPDATE artists SET name = NULL;
ROLLBACK TO s1; ROLLBACK TO s1;
INSERT INTO artists VALUES (9, 'Anna Schaeffer'); INSERT INTO artists VALUES (9, 'Mohsen Namjoo');
COMMIT; COMMIT;
SELECT * FROM artists ORDER BY id; SELECT * FROM artists WHERE id IN (7, 8, 9) ORDER BY id;
-- Recover from multi-shard copy shutdown failure.
-- Constraint check for non-partition columns happen only at copy shutdown.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT i, NULL FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (10, 'Mahmoud Farshchian');
COMMIT;
SELECT * FROM artists WHERE id IN (9, 10) ORDER BY id;
-- Recover from multi-shard copy send failure.
-- Constraint check for partition column happens at copy send.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists SELECT NULL, NULL FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (11, 'Egon Schiele');
COMMIT;
SELECT * FROM artists WHERE id IN (10, 11) ORDER BY id;
-- Recover from multi-shard copy startup failure.
-- Check for existence of a value for partition columnn happens at copy startup.
BEGIN;
DELETE FROM artists;
SAVEPOINT s1;
INSERT INTO artists(name) SELECT 'a' FROM generate_series(1, 5) i;
ROLLBACK TO s1;
INSERT INTO artists VALUES (12, 'Marc Chagall');
COMMIT;
SELECT * FROM artists WHERE id IN (11, 12) ORDER BY id;
-- =================================================================== -- ===================================================================
-- Tests for replication factor > 1 -- Tests for replication factor > 1