mirror of https://github.com/citusdata/citus.git
Prevent crash when remote transaction start fails (#1662)
We sent multiple commands to worker when starting a transaction. Previously we only checked the result of the first command that is transaction 'BEGIN' which always succeeds. Any failure on following commands were not checked. With this commit, we make sure all command results are checked. If there is any error we report the first error found.pull/1670/head
parent
a8428dff01
commit
4676c4f7a5
|
@ -56,6 +56,8 @@ IsResponseOK(PGresult *result)
|
||||||
*
|
*
|
||||||
* Note that this might require network IO. If that's not acceptable, use
|
* Note that this might require network IO. If that's not acceptable, use
|
||||||
* NonblockingForgetResults().
|
* NonblockingForgetResults().
|
||||||
|
*
|
||||||
|
* ClearResults is variant of this function which can also raise errors.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ForgetResults(MultiConnection *connection)
|
ForgetResults(MultiConnection *connection)
|
||||||
|
@ -81,6 +83,52 @@ ForgetResults(MultiConnection *connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ClearResults clears a connection from pending activity,
|
||||||
|
* returns true if all pending commands return success. It raises
|
||||||
|
* error if raiseErrors flag is set, any command fails and transaction
|
||||||
|
* is marked critical.
|
||||||
|
*
|
||||||
|
* Note that this might require network IO. If that's not acceptable, use
|
||||||
|
* NonblockingForgetResults().
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ClearResults(MultiConnection *connection, bool raiseErrors)
|
||||||
|
{
|
||||||
|
bool success = true;
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
|
||||||
|
if (result == NULL)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* End any pending copy operation. Transaction will be marked
|
||||||
|
* as failed by the following part.
|
||||||
|
*/
|
||||||
|
if (PQresultStatus(result) == PGRES_COPY_IN)
|
||||||
|
{
|
||||||
|
PQputCopyEnd(connection->pgConn, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, raiseErrors);
|
||||||
|
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NonblockingForgetResults clears a connection from pending activity if doing
|
* NonblockingForgetResults clears a connection from pending activity if doing
|
||||||
* so does not require network IO. Returns true if successful, false
|
* so does not require network IO. Returns true if successful, false
|
||||||
|
|
|
@ -115,26 +115,18 @@ void
|
||||||
FinishRemoteTransactionBegin(struct MultiConnection *connection)
|
FinishRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
{
|
{
|
||||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
PGresult *result = NULL;
|
bool clearSuccessful = true;
|
||||||
const bool raiseErrors = true;
|
bool raiseErrors = true;
|
||||||
|
|
||||||
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
|
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
|
||||||
|
|
||||||
result = GetRemoteCommandResult(connection, raiseErrors);
|
clearSuccessful = ClearResults(connection, raiseErrors);
|
||||||
if (!IsResponseOK(result))
|
if (clearSuccessful)
|
||||||
{
|
|
||||||
ReportResultError(connection, result, WARNING);
|
|
||||||
MarkRemoteTransactionFailed(connection, raiseErrors);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
transaction->transactionState = REMOTE_TRANS_STARTED;
|
transaction->transactionState = REMOTE_TRANS_STARTED;
|
||||||
transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
|
transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
ForgetResults(connection);
|
|
||||||
|
|
||||||
if (!transaction->transactionFailed)
|
if (!transaction->transactionFailed)
|
||||||
{
|
{
|
||||||
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
|
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern bool LogRemoteCommands;
|
||||||
/* simple helpers */
|
/* simple helpers */
|
||||||
extern bool IsResponseOK(struct pg_result *result);
|
extern bool IsResponseOK(struct pg_result *result);
|
||||||
extern void ForgetResults(MultiConnection *connection);
|
extern void ForgetResults(MultiConnection *connection);
|
||||||
|
extern bool ClearResults(MultiConnection *connection, bool raiseErrors);
|
||||||
extern bool NonblockingForgetResults(MultiConnection *connection);
|
extern bool NonblockingForgetResults(MultiConnection *connection);
|
||||||
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
||||||
|
|
||||||
|
|
|
@ -329,3 +329,42 @@ FROM
|
||||||
-- we don't need the schema and the function anymore
|
-- we don't need the schema and the function anymore
|
||||||
DROP SCHEMA test_deamon CASCADE;
|
DROP SCHEMA test_deamon CASCADE;
|
||||||
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
|
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
|
||||||
|
-- verify citus does not crash while creating a table when run against an older worker
|
||||||
|
-- create_distributed_table piggybacks multiple commands into single one, if one worker
|
||||||
|
-- did not have the required UDF it should fail instead of crash.
|
||||||
|
-- create a test database, configure citus with single node
|
||||||
|
CREATE DATABASE another;
|
||||||
|
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||||
|
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||||
|
HINT: You can manually create a database and its extensions on workers.
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE DATABASE another;
|
||||||
|
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||||
|
DETAIL: Citus does not propagate CREATE DATABASE command to workers
|
||||||
|
HINT: You can manually create a database and its extensions on workers.
|
||||||
|
\c - - - :master_port
|
||||||
|
\c another
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
SELECT FROM master_add_node('localhost', :worker_1_port);
|
||||||
|
--
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
|
||||||
|
RENAME TO dummy_assign_function;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
-- create_distributed_table command should fail
|
||||||
|
CREATE TABLE t1(a int, b int);
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
WARNING: function assign_distributed_transaction_id(integer, integer, unknown) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
\c regression
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
DROP DATABASE another;
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP DATABASE another;
|
||||||
|
|
|
@ -307,3 +307,37 @@ FROM
|
||||||
|
|
||||||
-- we don't need the schema and the function anymore
|
-- we don't need the schema and the function anymore
|
||||||
DROP SCHEMA test_deamon CASCADE;
|
DROP SCHEMA test_deamon CASCADE;
|
||||||
|
|
||||||
|
|
||||||
|
-- verify citus does not crash while creating a table when run against an older worker
|
||||||
|
-- create_distributed_table piggybacks multiple commands into single one, if one worker
|
||||||
|
-- did not have the required UDF it should fail instead of crash.
|
||||||
|
|
||||||
|
-- create a test database, configure citus with single node
|
||||||
|
CREATE DATABASE another;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE DATABASE another;
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
\c another
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
SELECT FROM master_add_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
|
||||||
|
RENAME TO dummy_assign_function;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
-- create_distributed_table command should fail
|
||||||
|
CREATE TABLE t1(a int, b int);
|
||||||
|
SELECT create_distributed_table('t1', 'a');
|
||||||
|
|
||||||
|
\c regression
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
DROP DATABASE another;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP DATABASE another;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue