diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index a8b13fa31..54df5f8a5 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -1063,9 +1063,16 @@ bool SendCancelationRequest(MultiConnection *connection) { char errorBuffer[ERROR_BUFFER_SIZE] = { 0 }; - PGcancel *cancelObject = PQgetCancel(connection->pgConn); + bool cancelSent = false; - bool cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); + PGcancel *cancelObject = PQgetCancel(connection->pgConn); + if (cancelObject == NULL) + { + /* this can happen if connection is invalid */ + return false; + } + + cancelSent = PQcancel(cancelObject, errorBuffer, sizeof(errorBuffer)); if (!cancelSent) { ereport(WARNING, (errmsg("could not issue cancel request"), diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 66bcbf0bf..e0c385102 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -396,6 +396,16 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { switch (event) { + /* + * Our subtransaction stack should be consistent with postgres' internal + * transaction stack. In case of subxact begin, postgres calls our + * callback after it has pushed the transaction into stack, so we have to + * do the same even if worker commands fail, so we PushSubXact() first. + * In case of subxact commit, callback is called before pushing subxact to + * the postgres transaction stack, so we call PopSubXact() after making sure + * worker commands didn't fail. Otherwise, Postgres would roll back that + * would cause us to call PopSubXact again. + */ case SUBXACT_EVENT_START_SUB: { PushSubXact(subId); @@ -408,21 +418,21 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, case SUBXACT_EVENT_COMMIT_SUB: { - PopSubXact(subId); if (InCoordinatedTransaction()) { CoordinatedRemoteTransactionsSavepointRelease(subId); } + PopSubXact(subId); break; } case SUBXACT_EVENT_ABORT_SUB: { - PopSubXact(subId); if (InCoordinatedTransaction()) { CoordinatedRemoteTransactionsSavepointRollback(subId); } + PopSubXact(subId); UnsetCitusNoticeLevel(); break; diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index d4a1133a7..b42ab48b7 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -15,3 +15,6 @@ s/shard [0-9]+/shard xxxxx/g # the generated plan s/"(fkey_ref_|referenced_table_|referencing_table_)[0-9]+"/"\1xxxxxxx"/g s/\(id\)=\([0-9]+\)/(id)=(X)/g + +# Savepoint error messages changed between postgres 10 and 11. +s/savepoint ".*" does not exist/no such savepoint/g diff --git a/src/test/regress/bin/normalized_tests.lst b/src/test/regress/bin/normalized_tests.lst index 528775d8c..5430a6628 100644 --- a/src/test/regress/bin/normalized_tests.lst +++ b/src/test/regress/bin/normalized_tests.lst @@ -2,3 +2,4 @@ multi_alter_table_add_constraints multi_alter_table_statements foreign_key_to_reference_table +failure_savepoints diff --git a/src/test/regress/expected/failure_savepoints.out b/src/test/regress/expected/failure_savepoints.out index a95451d1a..aff797501 100644 --- a/src/test/regress/expected/failure_savepoints.out +++ b/src/test/regress/expected/failure_savepoints.out @@ -38,8 +38,6 @@ CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 DELETE FROM artists WHERE id=4; -WARNING: could not issue cancel request -DETAIL: Client error: PQcancel() -- no cancel object supplied WARNING: connection not open CONTEXT: while executing command on localhost:9060 WARNING: connection error: localhost:9060 @@ -66,16 +64,24 @@ SELECT citus.mitmproxy('conn.onQuery(query="^RELEASE").kill()'); (1 row) BEGIN; -INSERT INTO artists VALUES (5, 'Asher Lev'); +UPDATE artists SET name='a'; SAVEPOINT s1; DELETE FROM artists WHERE id=4; RELEASE SAVEPOINT s1; +WARNING: AbortSubTransaction while in COMMIT state +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: connection error: localhost:9060 +DETAIL: connection not open WARNING: connection not open CONTEXT: while executing command on localhost:9060 WARNING: connection not open CONTEXT: while executing command on localhost:9060 -COMMIT; -ERROR: could not make changes to shard 100950 on any node +WARNING: savepoint "savepoint_2" does not exist +CONTEXT: while executing command on localhost:57637 +ERROR: connection not open +CONTEXT: while executing command on localhost:9060 +ROLLBACK; SELECT * FROM artists WHERE id IN (4, 5); id | name ----+----------------- @@ -197,7 +203,6 @@ INSERT INTO artists VALUES (7, 'Emily Carr'); ROLLBACK TO SAVEPOINT s1; WARNING: connection not open WARNING: connection not open -WARNING: could not issue cancel request COMMIT; ERROR: could not make changes to shard 100950 on any node SELECT * FROM artists WHERE id=6; @@ -233,7 +238,6 @@ WARNING: connection not open WARNING: connection not open INSERT INTO researchers VALUES (8, 4, 'Alonzo Church'); ROLLBACK TO s1; -WARNING: could not issue cancel request WARNING: connection not open WARNING: connection error: localhost:9060 WARNING: connection not open diff --git a/src/test/regress/sql/failure_savepoints.sql b/src/test/regress/sql/failure_savepoints.sql index 6f7550911..cf06048ad 100644 --- a/src/test/regress/sql/failure_savepoints.sql +++ b/src/test/regress/sql/failure_savepoints.sql @@ -33,11 +33,11 @@ SELECT * FROM artists WHERE id IN (4, 5); SELECT citus.mitmproxy('conn.onQuery(query="^RELEASE").kill()'); BEGIN; -INSERT INTO artists VALUES (5, 'Asher Lev'); +UPDATE artists SET name='a'; SAVEPOINT s1; DELETE FROM artists WHERE id=4; RELEASE SAVEPOINT s1; -COMMIT; +ROLLBACK; SELECT * FROM artists WHERE id IN (4, 5);