diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6bdc75e79..c8eb7ec98 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -56,6 +56,8 @@ IsResponseOK(PGresult *result) * * Note that this might require network IO. If that's not acceptable, use * NonblockingForgetResults(). + * + * ClearResults is variant of this function which can also raise errors. */ void ForgetResults(MultiConnection *connection) @@ -81,6 +83,52 @@ ForgetResults(MultiConnection *connection) } +/* + * ClearResults clears a connection from pending activity, + * returns true if all pending commands return success. It raises + * error if raiseErrors flag is set, any command fails and transaction + * is marked critical. + * + * Note that this might require network IO. If that's not acceptable, use + * NonblockingForgetResults(). + */ +bool +ClearResults(MultiConnection *connection, bool raiseErrors) +{ + bool success = true; + + while (true) + { + PGresult *result = GetRemoteCommandResult(connection, raiseErrors); + if (result == NULL) + { + break; + } + + /* + * End any pending copy operation. Transaction will be marked + * as failed by the following part. + */ + if (PQresultStatus(result) == PGRES_COPY_IN) + { + PQputCopyEnd(connection->pgConn, NULL); + } + + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + + success = false; + } + + PQclear(result); + } + + return success; +} + + /* * NonblockingForgetResults clears a connection from pending activity if doing * so does not require network IO. Returns true if successful, false diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index fc131622b..615caf6dd 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -115,26 +115,18 @@ void FinishRemoteTransactionBegin(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; - PGresult *result = NULL; - const bool raiseErrors = true; + bool clearSuccessful = true; + bool raiseErrors = true; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); - result = GetRemoteCommandResult(connection, raiseErrors); - if (!IsResponseOK(result)) - { - ReportResultError(connection, result, WARNING); - MarkRemoteTransactionFailed(connection, raiseErrors); - } - else + clearSuccessful = ClearResults(connection, raiseErrors); + if (clearSuccessful) { transaction->transactionState = REMOTE_TRANS_STARTED; transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact; } - PQclear(result); - ForgetResults(connection); - if (!transaction->transactionFailed) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 2c8029e1b..9cab423fd 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -26,6 +26,7 @@ extern bool LogRemoteCommands; /* simple helpers */ extern bool IsResponseOK(struct pg_result *result); extern void ForgetResults(MultiConnection *connection); +extern bool ClearResults(MultiConnection *connection, bool raiseErrors); extern bool NonblockingForgetResults(MultiConnection *connection); extern bool SqlStateMatchesCategory(char *sqlStateString, int category); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 30aaab73b..61d502e2d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -329,3 +329,42 @@ FROM -- we don't need the schema and the function anymore DROP SCHEMA test_deamon CASCADE; NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text) +-- verify citus does not crash while creating a table when run against an older worker +-- create_distributed_table piggybacks multiple commands into single one, if one worker +-- did not have the required UDF it should fail instead of crash. +-- create a test database, configure citus with single node +CREATE DATABASE another; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :worker_1_port +CREATE DATABASE another; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +DETAIL: Citus does not propagate CREATE DATABASE command to workers +HINT: You can manually create a database and its extensions on workers. +\c - - - :master_port +\c another +CREATE EXTENSION citus; +SELECT FROM master_add_node('localhost', :worker_1_port); +-- +(1 row) + +\c - - - :worker_1_port +CREATE EXTENSION citus; +ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) +RENAME TO dummy_assign_function; +\c - - - :master_port +SET citus.shard_replication_factor to 1; +-- create_distributed_table command should fail +CREATE TABLE t1(a int, b int); +SELECT create_distributed_table('t1', 'a'); +WARNING: function assign_distributed_transaction_id(integer, integer, unknown) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +CONTEXT: while executing command on localhost:57637 +ERROR: current transaction is aborted, commands ignored until end of transaction block +CONTEXT: while executing command on localhost:57637 +\c regression +\c - - - :worker_1_port +DROP DATABASE another; +\c - - - :master_port +DROP DATABASE another; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 4c1f4cdda..13ca387cf 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -307,3 +307,37 @@ FROM -- we don't need the schema and the function anymore DROP SCHEMA test_deamon CASCADE; + + +-- verify citus does not crash while creating a table when run against an older worker +-- create_distributed_table piggybacks multiple commands into single one, if one worker +-- did not have the required UDF it should fail instead of crash. + +-- create a test database, configure citus with single node +CREATE DATABASE another; +\c - - - :worker_1_port +CREATE DATABASE another; +\c - - - :master_port + +\c another +CREATE EXTENSION citus; +SELECT FROM master_add_node('localhost', :worker_1_port); + +\c - - - :worker_1_port +CREATE EXTENSION citus; +ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) +RENAME TO dummy_assign_function; + +\c - - - :master_port +SET citus.shard_replication_factor to 1; +-- create_distributed_table command should fail +CREATE TABLE t1(a int, b int); +SELECT create_distributed_table('t1', 'a'); + +\c regression +\c - - - :worker_1_port +DROP DATABASE another; + +\c - - - :master_port +DROP DATABASE another; +