Merge pull request #2123 from citusdata/improve_connection_errors

Improve connection error reporting

cr: @jasonmp85
pull/1997/head
Jason Petersen 2018-04-26 13:59:59 -06:00 committed by GitHub
commit 643059860a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 116 additions and 59 deletions

View File

@ -67,6 +67,7 @@
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -842,8 +843,9 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
} }
else else
{ {
ReportConnectionError(connection, WARNING); const bool raiseErrors = true;
MarkRemoteTransactionFailed(connection, true);
HandleRemoteTransactionConnectionError(connection, raiseErrors);
failedPlacementCount++; failedPlacementCount++;
continue; continue;

View File

@ -243,9 +243,17 @@ ReportConnectionError(MultiConnection *connection, int elevel)
{ {
char *nodeName = connection->hostname; char *nodeName = connection->hostname;
int nodePort = connection->port; int nodePort = connection->port;
PGconn *pgConn = connection->pgConn;
char *messageDetail = NULL;
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort), if (pgConn != NULL)
errdetail("%s", pchomp(PQerrorMessage(connection->pgConn))))); {
messageDetail = pchomp(PQerrorMessage(pgConn));
}
ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection error: %s:%d", nodeName, nodePort),
messageDetail != NULL ? errdetail("%s", messageDetail) : 0));
} }

View File

@ -1277,16 +1277,18 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query,
if (querySent == 0) if (querySent == 0)
{ {
MarkRemoteTransactionFailed(connection, false); const bool raiseErrors = false;
ReportConnectionError(connection, WARNING);
HandleRemoteTransactionConnectionError(connection, raiseErrors);
return false; return false;
} }
singleRowMode = PQsetSingleRowMode(connection->pgConn); singleRowMode = PQsetSingleRowMode(connection->pgConn);
if (singleRowMode == 0) if (singleRowMode == 0)
{ {
MarkRemoteTransactionFailed(connection, false); const bool raiseErrors = false;
ReportConnectionError(connection, WARNING);
HandleRemoteTransactionConnectionError(connection, raiseErrors);
return false; return false;
} }

View File

@ -107,8 +107,9 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
{ {
ReportConnectionError(connection, WARNING); const bool raiseErrors = true;
MarkRemoteTransactionFailed(connection, true);
HandleRemoteTransactionConnectionError(connection, raiseErrors);
} }
} }
@ -187,7 +188,7 @@ void
StartRemoteTransactionCommit(MultiConnection *connection) StartRemoteTransactionCommit(MultiConnection *connection)
{ {
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
const bool dontRaiseError = false; const bool raiseErrors = false;
const bool isCommit = true; const bool isCommit = true;
/* can only commit if transaction is in progress */ /* can only commit if transaction is in progress */
@ -225,8 +226,7 @@ StartRemoteTransactionCommit(MultiConnection *connection)
if (!SendRemoteCommand(connection, command.data)) if (!SendRemoteCommand(connection, command.data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseError);
WarnAboutLeakedPreparedTransaction(connection, isCommit); WarnAboutLeakedPreparedTransaction(connection, isCommit);
} }
@ -244,8 +244,7 @@ StartRemoteTransactionCommit(MultiConnection *connection)
* Failing in this state means that we don't know whether the the * Failing in this state means that we don't know whether the the
* commit has succeeded. * commit has succeeded.
*/ */
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseError);
} }
} }
} }
@ -261,19 +260,18 @@ FinishRemoteTransactionCommit(MultiConnection *connection)
{ {
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
PGresult *result = NULL; PGresult *result = NULL;
const bool dontRaiseErrors = false; const bool raiseErrors = false;
const bool isCommit = true; const bool isCommit = true;
Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING || Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING); transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING);
result = GetRemoteCommandResult(connection, dontRaiseErrors); result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING); HandleRemoteTransactionResultError(connection, result, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
/* /*
* Failing in this state means that we will often not know whether * Failing in this state means that we will often not know whether
@ -343,7 +341,7 @@ void
StartRemoteTransactionAbort(MultiConnection *connection) StartRemoteTransactionAbort(MultiConnection *connection)
{ {
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
const bool dontRaiseErrors = false; const bool raiseErrors = false;
const bool isNotCommit = false; const bool isNotCommit = false;
Assert(transaction->transactionState != REMOTE_TRANS_INVALID); Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
@ -370,8 +368,7 @@ StartRemoteTransactionAbort(MultiConnection *connection)
if (!SendRemoteCommand(connection, command.data)) if (!SendRemoteCommand(connection, command.data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
WarnAboutLeakedPreparedTransaction(connection, isNotCommit); WarnAboutLeakedPreparedTransaction(connection, isNotCommit);
} }
@ -400,7 +397,7 @@ StartRemoteTransactionAbort(MultiConnection *connection)
if (!SendRemoteCommand(connection, "ROLLBACK")) if (!SendRemoteCommand(connection, "ROLLBACK"))
{ {
/* no point in reporting a likely redundant message */ /* no point in reporting a likely redundant message */
MarkRemoteTransactionFailed(connection, dontRaiseErrors); MarkRemoteTransactionFailed(connection, raiseErrors);
} }
else else
{ {
@ -426,6 +423,9 @@ FinishRemoteTransactionAbort(MultiConnection *connection)
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
const bool isCommit = false; const bool isCommit = false;
HandleRemoteTransactionResultError(connection, result, raiseErrors);
WarnAboutLeakedPreparedTransaction(connection, isCommit); WarnAboutLeakedPreparedTransaction(connection, isCommit);
} }
@ -494,8 +494,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
if (!SendRemoteCommand(connection, command.data)) if (!SendRemoteCommand(connection, command.data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, raiseErrors);
} }
else else
{ {
@ -522,9 +521,8 @@ FinishRemoteTransactionPrepare(struct MultiConnection *connection)
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING);
transaction->transactionState = REMOTE_TRANS_ABORTED; transaction->transactionState = REMOTE_TRANS_ABORTED;
MarkRemoteTransactionFailed(connection, raiseErrors); HandleRemoteTransactionResultError(connection, result, raiseErrors);
} }
else else
{ {
@ -635,6 +633,53 @@ RemoteTransactionsBeginIfNecessary(List *connectionList)
} }
/*
* HandleRemoteTransactionConnectionError records a transaction as having failed
* and throws a connection error if the transaction was critical and raiseErrors
* is true, or a warning otherwise.
*/
void
HandleRemoteTransactionConnectionError(MultiConnection *connection, bool raiseErrors)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
transaction->transactionFailed = true;
if (transaction->transactionCritical && raiseErrors)
{
ReportConnectionError(connection, ERROR);
}
else
{
ReportConnectionError(connection, WARNING);
}
}
/*
* HandleRemoteTransactionResultError records a transaction as having failed
* and throws a result error if the transaction was critical and raiseErrors
* is true, or a warning otherwise.
*/
void
HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result, bool
raiseErrors)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
transaction->transactionFailed = true;
if (transaction->transactionCritical && raiseErrors)
{
ReportResultError(connection, result, ERROR);
}
else
{
ReportResultError(connection, result, WARNING);
}
}
/* /*
* MarkRemoteTransactionFailed records a transaction as having failed. * MarkRemoteTransactionFailed records a transaction as having failed.
* *
@ -993,7 +1038,7 @@ void
CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId) CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
{ {
dlist_iter iter; dlist_iter iter;
const bool dontRaiseInterrupts = false; const bool raiseInterrupts = false;
List *connectionList = NIL; List *connectionList = NIL;
/* asynchronously send ROLLBACK TO SAVEPOINT */ /* asynchronously send ROLLBACK TO SAVEPOINT */
@ -1024,7 +1069,7 @@ CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
connectionList = lappend(connectionList, connection); connectionList = lappend(connectionList, connection);
} }
WaitForAllConnections(connectionList, dontRaiseInterrupts); WaitForAllConnections(connectionList, raiseInterrupts);
/* and wait for the results */ /* and wait for the results */
dlist_foreach(iter, &InProgressTransactions) dlist_foreach(iter, &InProgressTransactions)
@ -1055,8 +1100,7 @@ StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransaction
if (!SendRemoteCommand(connection, savepointCommand->data)) if (!SendRemoteCommand(connection, savepointCommand->data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, raiseErrors);
} }
} }
@ -1073,8 +1117,7 @@ FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactio
PGresult *result = GetRemoteCommandResult(connection, raiseErrors); PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING); HandleRemoteTransactionResultError(connection, result, raiseErrors);
MarkRemoteTransactionFailed(connection, raiseErrors);
} }
PQclear(result); PQclear(result);
@ -1096,8 +1139,7 @@ StartRemoteTransactionSavepointRelease(MultiConnection *connection,
if (!SendRemoteCommand(connection, savepointCommand->data)) if (!SendRemoteCommand(connection, savepointCommand->data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, raiseErrors);
} }
} }
@ -1115,8 +1157,7 @@ FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
PGresult *result = GetRemoteCommandResult(connection, raiseErrors); PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING); HandleRemoteTransactionResultError(connection, result, raiseErrors);
MarkRemoteTransactionFailed(connection, raiseErrors);
} }
PQclear(result); PQclear(result);
@ -1132,14 +1173,13 @@ static void
StartRemoteTransactionSavepointRollback(MultiConnection *connection, StartRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId) SubTransactionId subId)
{ {
const bool dontRaiseErrors = false; const bool raiseErrors = false;
StringInfo savepointCommand = makeStringInfo(); StringInfo savepointCommand = makeStringInfo();
appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId); appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId);
if (!SendRemoteCommand(connection, savepointCommand->data)) if (!SendRemoteCommand(connection, savepointCommand->data))
{ {
ReportConnectionError(connection, WARNING); HandleRemoteTransactionConnectionError(connection, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
} }
} }
@ -1154,14 +1194,13 @@ static void
FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId
subId) subId)
{ {
const bool dontRaiseErrors = false; const bool raiseErrors = false;
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
PGresult *result = GetRemoteCommandResult(connection, dontRaiseErrors); PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result)) if (!IsResponseOK(result))
{ {
ReportResultError(connection, result, WARNING); HandleRemoteTransactionResultError(connection, result, raiseErrors);
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
} }
/* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */ /* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */

View File

@ -11,6 +11,7 @@
#define REMOTE_TRANSACTION_H #define REMOTE_TRANSACTION_H
#include "libpq-fe.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "lib/ilist.h" #include "lib/ilist.h"
@ -107,6 +108,10 @@ extern void RemoteTransactionBeginIfNecessary(struct MultiConnection *connection
extern void RemoteTransactionsBeginIfNecessary(List *connectionList); extern void RemoteTransactionsBeginIfNecessary(List *connectionList);
/* other public functionality */ /* other public functionality */
extern void HandleRemoteTransactionConnectionError(struct MultiConnection *connection,
bool raiseError);
extern void HandleRemoteTransactionResultError(struct MultiConnection *connection,
PGresult *result, bool raiseErrors);
extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, extern void MarkRemoteTransactionFailed(struct MultiConnection *connection,
bool allowErrorPromotion); bool allowErrorPromotion);
extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);

View File

@ -878,14 +878,12 @@ FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\set VERBOSITY terse \set VERBOSITY terse
-- try without wrapping inside a transaction -- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3); INSERT INTO reference_modifying_xacts VALUES (999, 3);
WARNING: illegal value ERROR: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- same test within a transaction -- same test within a transaction
BEGIN; BEGIN;
INSERT INTO reference_modifying_xacts VALUES (999, 3); INSERT INTO reference_modifying_xacts VALUES (999, 3);
COMMIT; COMMIT;
WARNING: illegal value ERROR: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- all placements should be healthy -- all placements should be healthy
SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*) SELECT s.logicalrelid::regclass::text, sp.shardstate, count(*)
FROM pg_dist_shard_placement AS sp, FROM pg_dist_shard_placement AS sp,
@ -977,8 +975,7 @@ BEGIN;
INSERT INTO reference_modifying_xacts VALUES (12, 12); INSERT INTO reference_modifying_xacts VALUES (12, 12);
INSERT INTO hash_modifying_xacts VALUES (997, 1); INSERT INTO hash_modifying_xacts VALUES (997, 1);
COMMIT; COMMIT;
WARNING: illegal value ERROR: illegal value
ERROR: failure on connection marked as essential: localhost:57637
-- ensure that the values didn't go into the reference table -- ensure that the values didn't go into the reference table
SELECT * FROM reference_modifying_xacts WHERE key = 12; SELECT * FROM reference_modifying_xacts WHERE key = 12;
key | value key | value
@ -1180,13 +1177,11 @@ ALTER USER test_user RENAME TO test_user_new;
\c - test_user - :master_port \c - test_user - :master_port
-- should fail since the worker doesn't have test_user anymore -- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1'); INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637 ERROR: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
-- the same as the above, but wrapped within a transaction -- the same as the above, but wrapped within a transaction
BEGIN; BEGIN;
INSERT INTO reference_failure_test VALUES (1, '1'); INSERT INTO reference_failure_test VALUES (1, '1');
WARNING: connection error: localhost:57637 ERROR: connection error: localhost:57637
ERROR: failure on connection marked as essential: localhost:57637
COMMIT; COMMIT;
BEGIN; BEGIN;
COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv'); COPY reference_failure_test FROM STDIN WITH (FORMAT 'csv');

View File

@ -712,11 +712,14 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
(1 row) (1 row)
-- test adding an invalid node while we have reference tables to replicate -- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR to suppress OS-dependent host name resolution warnings -- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
\set VERBOSITY terse
SELECT master_add_node('invalid-node-name', 9999); SELECT master_add_node('invalid-node-name', 9999);
ERROR: failure on connection marked as essential: invalid-node-name:9999 ERROR: connection error: invalid-node-name:9999
SET client_min_messages to DEFAULT; SET client_min_messages to DEFAULT;
\set VERBOSITY default
-- drop unnecassary tables -- drop unnecassary tables
DROP TABLE initially_not_replicated_reference_table; DROP TABLE initially_not_replicated_reference_table;
-- reload pg_dist_shard_placement table -- reload pg_dist_shard_placement table

View File

@ -654,10 +654,9 @@ BEGIN;
CREATE INDEX single_index_2 ON single_shard_items(id); CREATE INDEX single_index_2 ON single_shard_items(id);
CREATE INDEX single_index_3 ON single_shard_items(name); CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT; COMMIT;
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" ERROR: duplicate key value violates unique constraint "ddl_commands_command_key"
DETAIL: Key (command)=(CREATE INDEX) already exists. DETAIL: Key (command)=(CREATE INDEX) already exists.
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
ERROR: failure on connection marked as essential: localhost:57638
-- Nothing from the block should have committed -- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1;
indexname | tablename indexname | tablename

View File

@ -458,11 +458,15 @@ ORDER BY 1,4,5;
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- test adding an invalid node while we have reference tables to replicate -- test adding an invalid node while we have reference tables to replicate
-- set client message level to ERROR to suppress OS-dependent host name resolution warnings -- set client message level to ERROR and verbosity to terse to supporess
-- OS-dependent host name resolution warnings
SET client_min_messages to ERROR; SET client_min_messages to ERROR;
\set VERBOSITY terse
SELECT master_add_node('invalid-node-name', 9999); SELECT master_add_node('invalid-node-name', 9999);
SET client_min_messages to DEFAULT; SET client_min_messages to DEFAULT;
\set VERBOSITY default
-- drop unnecassary tables -- drop unnecassary tables
DROP TABLE initially_not_replicated_reference_table; DROP TABLE initially_not_replicated_reference_table;