From 4676c4f7a5ae3c7db7a5f6a029059d5ab953ebfb Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Tue, 26 Sep 2017 17:25:46 -0700 Subject: [PATCH] Prevent crash when remote transaction start fails (#1662) We sent multiple commands to worker when starting a transaction. Previously we only checked the result of the first command that is transaction 'BEGIN' which always succeeds. Any failure on following commands were not checked. With this commit, we make sure all command results are checked. If there is any error we report the first error found. --- .../distributed/connection/remote_commands.c | 48 +++++++++++++++++++ .../transaction/remote_transaction.c | 16 ++----- src/include/distributed/remote_commands.h | 1 + src/test/regress/expected/multi_extension.out | 39 +++++++++++++++ src/test/regress/sql/multi_extension.sql | 34 +++++++++++++ 5 files changed, 126 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6bdc75e79..c8eb7ec98 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -56,6 +56,8 @@ IsResponseOK(PGresult *result) * * Note that this might require network IO. If that's not acceptable, use * NonblockingForgetResults(). + * + * ClearResults is variant of this function which can also raise errors. */ void ForgetResults(MultiConnection *connection) @@ -81,6 +83,52 @@ ForgetResults(MultiConnection *connection) } +/* + * ClearResults clears a connection from pending activity, + * returns true if all pending commands return success. It raises + * error if raiseErrors flag is set, any command fails and transaction + * is marked critical. + * + * Note that this might require network IO. If that's not acceptable, use + * NonblockingForgetResults(). + */ +bool +ClearResults(MultiConnection *connection, bool raiseErrors) +{ + bool success = true; + + while (true) + { + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (result == NULL) + { + break; + } + + /* + * End any pending copy operation. Transaction will be marked + * as failed by the following part. + */ + if (PQresultStatus(result) == PGRES_COPY_IN) + { + PQputCopyEnd(connection->pgConn, NULL); + } + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + + success = false; + } + + PQclear(result); + } + + return success; +} + + /* * NonblockingForgetResults clears a connection from pending activity if doing * so does not require network IO. Returns true if successful, false diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index fc131622b..615caf6dd 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -115,26 +115,18 @@ void FinishRemoteTransactionBegin(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - PGresult *result = NULL; - const bool raiseErrors = true; + bool clearSuccessful = true; + bool raiseErrors = true; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); - result = GetRemoteCommandResult(connection, raiseErrors); - if (!IsResponseOK(result)) - { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); - } - else + clearSuccessful = ClearResults(connection, raiseErrors); + if (clearSuccessful) { transaction->transactionState = REMOTE_TRANS_STARTED; transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact; } - PQclear(result); - ForgetResults(connection); - if (!transaction->transactionFailed) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 2c8029e1b..9cab423fd 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -26,6 +26,7 @@ extern bool LogRemoteCommands; /* simple helpers */ extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); +extern bool ClearResults(MultiConnection *connection, bool raiseErrors); extern bool NonblockingForgetResults(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 30aaab73b..61d502e2d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -329,3 +329,42 @@ FROM -- we don't need the schema and the function anymore DROP SCHEMA test_deamon CASCADE; NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text) +-- verify citus does not crash while creating a table when run against an older worker +-- create_distributed_table piggybacks multiple commands into single one, if one worker +-- did not have the required UDF it should fail instead of crash. +-- create a test database, configure citus with single node +CREATE DATABASE another; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :worker_1_port +CREATE DATABASE another; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :master_port +\c another +CREATE EXTENSION citus; +SELECT FROM master_add_node('localhost', :worker_1_port); +-- +(1 row) + +\c - - - :worker_1_port +CREATE EXTENSION citus; +ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) +RENAME TO dummy_assign_function; +\c - - - :master_port +SET citus.shard_replication_factor to 1; +-- create_distributed_table command should fail +CREATE TABLE t1(a int, b int); +SELECT create_distributed_table('t1', 'a'); +WARNING: function assign_distributed_transaction_id(integer, integer, unknown) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +CONTEXT: while executing command on localhost:57637 +ERROR: current transaction is aborted, commands ignored until end of transaction block +CONTEXT: while executing command on localhost:57637 +\c regression +\c - - - :worker_1_port +DROP DATABASE another; +\c - - - :master_port +DROP DATABASE another; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 4c1f4cdda..13ca387cf 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -307,3 +307,37 @@ FROM -- we don't need the schema and the function anymore DROP SCHEMA test_deamon CASCADE; + + +-- verify citus does not crash while creating a table when run against an older worker +-- create_distributed_table piggybacks multiple commands into single one, if one worker +-- did not have the required UDF it should fail instead of crash. + +-- create a test database, configure citus with single node +CREATE DATABASE another; +\c - - - :worker_1_port +CREATE DATABASE another; +\c - - - :master_port + +\c another +CREATE EXTENSION citus; +SELECT FROM master_add_node('localhost', :worker_1_port); + +\c - - - :worker_1_port +CREATE EXTENSION citus; +ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) +RENAME TO dummy_assign_function; + +\c - - - :master_port +SET citus.shard_replication_factor to 1; +-- create_distributed_table command should fail +CREATE TABLE t1(a int, b int); +SELECT create_distributed_table('t1', 'a'); + +\c regression +\c - - - :worker_1_port +DROP DATABASE another; + +\c - - - :master_port +DROP DATABASE another; +