diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8f06d4fd7..f25acb931 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -67,6 +67,7 @@ #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" +#include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" @@ -842,8 +843,9 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, } else { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, true); + const bool raiseErrors = true; + + HandleRemoteTransactionConnectionError(connection, raiseErrors); failedPlacementCount++; continue; diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 5a55ffc6d..49eecdae7 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1277,16 +1277,18 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query, if (querySent == 0) { - MarkRemoteTransactionFailed(connection, false); - ReportConnectionError(connection, WARNING); + const bool raiseErrors = false; + + HandleRemoteTransactionConnectionError(connection, raiseErrors); return false; } singleRowMode = PQsetSingleRowMode(connection->pgConn); if (singleRowMode == 0) { - MarkRemoteTransactionFailed(connection, false); - ReportConnectionError(connection, WARNING); + const bool raiseErrors = false; + + HandleRemoteTransactionConnectionError(connection, raiseErrors); return false; } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 7171754fb..044876547 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -107,8 +107,9 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, true); + const bool raiseErrors = true; + + HandleRemoteTransactionConnectionError(connection, raiseErrors); } } @@ -187,7 +188,7 @@ void StartRemoteTransactionCommit(MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - const bool dontRaiseError = false; + const bool raiseErrors = false; const bool isCommit = true; /* can only commit if transaction is in progress */ @@ -225,8 +226,7 @@ StartRemoteTransactionCommit(MultiConnection *connection) if (!SendRemoteCommand(connection, command.data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseError); + HandleRemoteTransactionConnectionError(connection, raiseErrors); WarnAboutLeakedPreparedTransaction(connection, isCommit); } @@ -244,8 +244,7 @@ StartRemoteTransactionCommit(MultiConnection *connection) * Failing in this state means that we don't know whether the the * commit has succeeded. */ - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseError); + HandleRemoteTransactionConnectionError(connection, raiseErrors); } } } @@ -261,19 +260,18 @@ FinishRemoteTransactionCommit(MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; PGresult *result = NULL; - const bool dontRaiseErrors = false; + const bool raiseErrors = false; const bool isCommit = true; Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING); - result = GetRemoteCommandResult(connection, dontRaiseErrors); + result = GetRemoteCommandResult(connection, raiseErrors); if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseErrors); + HandleRemoteTransactionResultError(connection, result, raiseErrors); /* * Failing in this state means that we will often not know whether @@ -343,7 +341,7 @@ void StartRemoteTransactionAbort(MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - const bool dontRaiseErrors = false; + const bool raiseErrors = false; const bool isNotCommit = false; Assert(transaction->transactionState != REMOTE_TRANS_INVALID); @@ -370,8 +368,7 @@ StartRemoteTransactionAbort(MultiConnection *connection) if (!SendRemoteCommand(connection, command.data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseErrors); WarnAboutLeakedPreparedTransaction(connection, isNotCommit); } @@ -400,7 +397,7 @@ StartRemoteTransactionAbort(MultiConnection *connection) if (!SendRemoteCommand(connection, "ROLLBACK")) { /* no point in reporting a likely redundant message */ - MarkRemoteTransactionFailed(connection, dontRaiseErrors); + MarkRemoteTransactionFailed(connection, raiseErrors); } else { @@ -426,6 +423,9 @@ FinishRemoteTransactionAbort(MultiConnection *connection) if (!IsResponseOK(result)) { const bool isCommit = false; + + HandleRemoteTransactionResultError(connection, result, raiseErrors); + WarnAboutLeakedPreparedTransaction(connection, isCommit); } @@ -494,8 +494,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) if (!SendRemoteCommand(connection, command.data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseErrors); } else { @@ -522,9 +521,8 @@ FinishRemoteTransactionPrepare(struct MultiConnection *connection) if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); transaction->transactionState = REMOTE_TRANS_ABORTED; - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionResultError(connection, result, raiseErrors); } else { @@ -635,6 +633,53 @@ RemoteTransactionsBeginIfNecessary(List *connectionList) } +/* + * HandleRemoteTransactionConnectionError records a transaction as having failed + * and throws a connection error if the transaction was critical and raiseErrors + * is true, or a warning otherwise. + */ +void +HandleRemoteTransactionConnectionError(MultiConnection *connection, bool raiseErrors) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionFailed = true; + + if (transaction->transactionCritical && raiseErrors) + { + ReportConnectionError(connection, ERROR); + } + else + { + ReportConnectionError(connection, WARNING); + } +} + + +/* + * HandleRemoteTransactionResultError records a transaction as having failed + * and throws a result error if the transaction was critical and raiseErrors + * is true, or a warning otherwise. + */ +void +HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result, bool + raiseErrors) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + transaction->transactionFailed = true; + + if (transaction->transactionCritical && raiseErrors) + { + ReportResultError(connection, result, ERROR); + } + else + { + ReportResultError(connection, result, WARNING); + } +} + + /* * MarkRemoteTransactionFailed records a transaction as having failed. * @@ -993,7 +1038,7 @@ void CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) { dlist_iter iter; - const bool dontRaiseInterrupts = false; + const bool raiseInterrupts = false; List *connectionList = NIL; /* asynchronously send ROLLBACK TO SAVEPOINT */ @@ -1024,7 +1069,7 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) connectionList = lappend(connectionList, connection); } - WaitForAllConnections(connectionList, dontRaiseInterrupts); + WaitForAllConnections(connectionList, raiseInterrupts); /* and wait for the results */ dlist_foreach(iter, &InProgressTransactions) @@ -1055,8 +1100,7 @@ StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransaction if (!SendRemoteCommand(connection, savepointCommand->data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseErrors); } } @@ -1073,8 +1117,7 @@ FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactio PGresult *result = GetRemoteCommandResult(connection, raiseErrors); if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionResultError(connection, result, raiseErrors); } PQclear(result); @@ -1096,8 +1139,7 @@ StartRemoteTransactionSavepointRelease(MultiConnection *connection, if (!SendRemoteCommand(connection, savepointCommand->data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseErrors); } } @@ -1115,8 +1157,7 @@ FinishRemoteTransactionSavepointRelease(MultiConnection *connection, PGresult *result = GetRemoteCommandResult(connection, raiseErrors); if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); + HandleRemoteTransactionResultError(connection, result, raiseErrors); } PQclear(result); @@ -1132,14 +1173,13 @@ static void StartRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId subId) { - const bool dontRaiseErrors = false; + const bool raiseErrors = false; StringInfo savepointCommand = makeStringInfo(); appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId); if (!SendRemoteCommand(connection, savepointCommand->data)) { - ReportConnectionError(connection, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseErrors); } } @@ -1154,14 +1194,13 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId subId) { - const bool dontRaiseErrors = false; + const bool raiseErrors = false; RemoteTransaction *transaction = &connection->remoteTransaction; - PGresult *result = GetRemoteCommandResult(connection, dontRaiseErrors); + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); if (!IsResponseOK(result)) { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, dontRaiseErrors); + HandleRemoteTransactionResultError(connection, result, raiseErrors); } /* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */ diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index d3bd2ccf2..89eb822ab 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -11,6 +11,7 @@ #define REMOTE_TRANSACTION_H +#include "libpq-fe.h" #include "nodes/pg_list.h" #include "lib/ilist.h" @@ -107,6 +108,10 @@ extern void RemoteTransactionBeginIfNecessary(struct MultiConnection *connection extern void RemoteTransactionsBeginIfNecessary(List *connectionList); /* other public functionality */ +extern void HandleRemoteTransactionConnectionError(struct MultiConnection *connection, + bool raiseError); +extern void HandleRemoteTransactionResultError(struct MultiConnection *connection, + PGresult *result, bool raiseErrors); extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, bool allowErrorPromotion); extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 7a79fd2a0..0e8b421b7 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -878,14 +878,12 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference(); \set VERBOSITY terse -- try without wrapping inside a transaction INSERT INTO reference_modifying_xacts VALUES (999, 3); -WARNING: illegal value -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: illegal value -- same test within a transaction BEGIN; INSERT INTO reference_modifying_xacts VALUES (999, 3); COMMIT; -WARNING: illegal value -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: illegal value -- all placements should be healthy SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) FROM pg_dist_shard_placement AS sp, @@ -977,8 +975,7 @@ BEGIN; INSERT INTO reference_modifying_xacts VALUES (12, 12); INSERT INTO hash_modifying_xacts VALUES (997, 1); COMMIT; -WARNING: illegal value -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: illegal value -- ensure that the values didn't go into the reference table SELECT * FROM reference_modifying_xacts WHERE key = 12; key | value @@ -1180,13 +1177,11 @@ ALTER USER test_user RENAME TO test_user_new; \c - test_user - :master_port -- should fail since the worker doesn't have test_user anymore INSERT INTO reference_failure_test VALUES (1, '1'); -WARNING: connection error: localhost:57637 -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: connection error: localhost:57637 -- the same as the above, but wrapped within a transaction BEGIN; INSERT INTO reference_failure_test VALUES (1, '1'); -WARNING: connection error: localhost:57637 -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: connection error: localhost:57637 COMMIT; BEGIN; COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv'); diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 282f15174..e51b4c47f 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -712,11 +712,14 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); (1 row) -- test adding an invalid node while we have reference tables to replicate --- set client message level to ERROR to suppress OS-dependent host name resolution warnings +-- set client message level to ERROR and verbosity to terse to supporess +-- OS-dependent host name resolution warnings SET client_min_messages to ERROR; +\set VERBOSITY terse SELECT master_add_node('invalid-node-name', 9999); -ERROR: failure on connection marked as essential: invalid-node-name:9999 +ERROR: connection error: invalid-node-name:9999 SET client_min_messages to DEFAULT; +\set VERBOSITY default -- drop unnecassary tables DROP TABLE initially_not_replicated_reference_table; -- reload pg_dist_shard_placement table diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index a095bdd21..955ccd7df 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -654,10 +654,9 @@ BEGIN; CREATE INDEX single_index_2 ON single_shard_items(id); CREATE INDEX single_index_3 ON single_shard_items(name); COMMIT; -WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" +ERROR: duplicate key value violates unique constraint "ddl_commands_command_key" DETAIL: Key (command)=(CREATE INDEX) already exists. CONTEXT: while executing command on localhost:57638 -ERROR: failure on connection marked as essential: localhost:57638 -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1; indexname | tablename diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index ac78a199b..baf9fbdc9 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -458,11 +458,15 @@ ORDER BY 1,4,5; SELECT 1 FROM master_add_node('localhost', :worker_2_port); -- test adding an invalid node while we have reference tables to replicate --- set client message level to ERROR to suppress OS-dependent host name resolution warnings +-- set client message level to ERROR and verbosity to terse to supporess +-- OS-dependent host name resolution warnings SET client_min_messages to ERROR; +\set VERBOSITY terse + SELECT master_add_node('invalid-node-name', 9999); SET client_min_messages to DEFAULT; +\set VERBOSITY default -- drop unnecassary tables DROP TABLE initially_not_replicated_reference_table;