diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index a1aa64c0b..8f6f80c2b 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -30,9 +30,6 @@ #include "utils/builtins.h" -#define SET_APPLICATION_NAME_QUERY \ - "SET application_name TO '" CITUS_RUN_COMMAND_APPLICATION_NAME "'" - PG_FUNCTION_INFO_V1(master_run_on_worker); static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, @@ -256,7 +253,10 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor } /* set the application_name to avoid nested execution checks */ - int querySent = SendRemoteCommand(connection, SET_APPLICATION_NAME_QUERY); + int querySent = SendRemoteCommand(connection, psprintf( + "SET application_name TO '%s%ld'", + CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX, + GetGlobalPID())); if (querySent == 0) { StoreErrorMessage(connection, queryResultString); @@ -444,9 +444,14 @@ ExecuteCommandsAndStoreResults(StringInfo *nodeNameArray, int *nodePortArray, GetNodeConnection(connectionFlags, nodeName, nodePort); /* set the application_name to avoid nested execution checks */ - bool success = ExecuteOptionalSingleResultCommand(connection, - SET_APPLICATION_NAME_QUERY, - queryResultString); + bool success = ExecuteOptionalSingleResultCommand( + connection, + psprintf( + "SET application_name TO '%s%ld'", + CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX, + GetGlobalPID()), + queryResultString + ); if (!success) { statusArray[commandIndex] = false; diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 9b2741893..bebb23b48 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -2085,8 +2085,10 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command) PostPortNumber); List *commandList = NIL; - commandList = lappend(commandList, psprintf("SET LOCAL application_name TO %s;", - CITUS_REBALANCER_NAME)); + commandList = lappend(commandList, psprintf( + "SET LOCAL application_name TO '%s%ld'", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID())); if (PropagateSessionSettingsForLoopbackConnection) { diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 90b15ad1c..43ee6472a 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -92,6 +92,18 @@ typedef enum CitusBackendType EXTERNAL_CLIENT_BACKEND } CitusBackendType; +static const char *CitusBackendPrefixes[] = { + CITUS_APPLICATION_NAME_PREFIX, + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX, +}; + +static const CitusBackendType CitusBackendTypes[] = { + CITUS_INTERNAL_BACKEND, + CITUS_REBALANCER_BACKEND, + CITUS_RUN_COMMAND_BACKEND, +}; + static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); @@ -1066,26 +1078,30 @@ ExtractGlobalPID(const char *applicationName) /* we create our own copy of application name incase the original changes */ char *applicationNameCopy = pstrdup(applicationName); - uint64 prefixLength = strlen(CITUS_APPLICATION_NAME_PREFIX); - - /* does application name start with Citus's application name prefix */ - if (strncmp(applicationNameCopy, CITUS_APPLICATION_NAME_PREFIX, prefixLength) != 0) + for (int i = 0; i < lengthof(CitusBackendPrefixes); i++) { - return INVALID_CITUS_INTERNAL_BACKEND_GPID; - } + uint64 prefixLength = strlen(CitusBackendPrefixes[i]); - char *globalPIDString = &applicationNameCopy[prefixLength]; - uint64 globalPID = strtoul(globalPIDString, NULL, 10); - if (globalPID == 0) - { - /* - * INVALID_CITUS_INTERNAL_BACKEND_GPID is 0, but just to be explicit - * about how we handle strtoul errors. - */ - return INVALID_CITUS_INTERNAL_BACKEND_GPID; - } + /* does application name start with this prefix prefix */ + if (strncmp(applicationNameCopy, CitusBackendPrefixes[i], prefixLength) != 0) + { + continue; + } - return globalPID; + char *globalPIDString = &applicationNameCopy[prefixLength]; + uint64 globalPID = strtoul(globalPIDString, NULL, 10); + if (globalPID == 0) + { + /* + * INVALID_CITUS_INTERNAL_BACKEND_GPID is 0, but just to be explicit + * about how we handle strtoul errors. + */ + return INVALID_CITUS_INTERNAL_BACKEND_GPID; + } + + return globalPID; + } + return INVALID_CITUS_INTERNAL_BACKEND_GPID; } @@ -1438,21 +1454,20 @@ IsExternalClientBackend(void) void DetermineCitusBackendType(const char *applicationName) { - if (ExtractGlobalPID(applicationName) != INVALID_CITUS_INTERNAL_BACKEND_GPID) + if (applicationName && + ExtractGlobalPID(applicationName) != INVALID_CITUS_INTERNAL_BACKEND_GPID) { - CurrentBackendType = CITUS_INTERNAL_BACKEND; - } - else if (applicationName && strcmp(applicationName, CITUS_REBALANCER_NAME) == 0) - { - CurrentBackendType = CITUS_REBALANCER_BACKEND; - } - else if (applicationName && - strcmp(applicationName, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0) - { - CurrentBackendType = CITUS_RUN_COMMAND_BACKEND; - } - else - { - CurrentBackendType = EXTERNAL_CLIENT_BACKEND; + for (int i = 0; i < lengthof(CitusBackendPrefixes); i++) + { + uint64 prefixLength = strlen(CitusBackendPrefixes[i]); + + /* does application name start with this prefix prefix */ + if (strncmp(applicationName, CitusBackendPrefixes[i], prefixLength) == 0) + { + CurrentBackendType = CitusBackendTypes[i]; + return; + } + } } + CurrentBackendType = EXTERNAL_CLIENT_BACKEND; } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 20ac677f8..d279e8c2e 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -15,6 +15,7 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/genam.h" +#include "distributed/backend_data.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/listutils.h" @@ -257,9 +258,10 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * the shard. This is allowed when indicating that the backend is a * rebalancer backend. */ - ExecuteCriticalRemoteCommand(connection, - "SET LOCAL application_name TO " - CITUS_REBALANCER_NAME); + ExecuteCriticalRemoteCommand(connection, psprintf( + "SET LOCAL application_name TO '%s%ld'", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID())); ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data); RemoteTransactionCommit(connection); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 9a4e8a134..f95fb612d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -37,10 +37,10 @@ #define CITUS_APPLICATION_NAME_PREFIX "citus_internal gpid=" /* application name used for internal connections in rebalancer */ -#define CITUS_REBALANCER_NAME "citus_rebalancer" +#define CITUS_REBALANCER_APPLICATION_NAME_PREFIX "citus_rebalancer gpid=" /* application name used for connections made by run_command_on_* */ -#define CITUS_RUN_COMMAND_APPLICATION_NAME "citus_run_command" +#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid=" /* deal with waiteventset errors */ #define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index b9d637a11..cd57a6673 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -301,3 +301,5 @@ s/^(NOTICE: )(clock).*LC:[0-9]+,.*C:[0-9]+,.*$/\1\2 xxxxxx/g # The following 2 lines are to normalize duration and cost in the EXPLAIN output s/LOG: duration: [0-9].[0-9]+ ms/LOG: duration: xxxx ms/g s/"Total Cost": [0-9].[0-9]+/"Total Cost": xxxx/g + +s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+/\1xxxxx/g diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index 1939fcf1c..a04e5c218 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -96,6 +96,22 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; 1 (1 row) +ROLLBACK; +-- also works if done by the rebalancer +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus_rebalancer gpid=10000000001'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); + citus_internal_add_partition_metadata +--------------------------------------------------------------------- + +(1 row) + ROLLBACK; -- application_name with incorrect gpid BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; @@ -109,6 +125,18 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); ERROR: This is an internal Citus function can only be used in a distributed transaction ROLLBACK; +-- also faills if done by the rebalancer +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus_rebalancer gpid=not a correct gpid'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ERROR: This is an internal Citus function can only be used in a distributed transaction +ROLLBACK; -- application_name with suffix is ok (e.g. pgbouncer might add this) BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index f74958d0f..79de1bdf7 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -204,7 +204,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -225,7 +225,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -246,7 +246,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -267,7 +267,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL application_name TO citus_rebalancer; +NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=xxxxx' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET LOCAL citus.log_remote_commands TO 'on'; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index ca32c5f16..2e2958b9a 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -68,6 +68,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT count(*) FROM pg_dist_partition WHERE logicalrelid = 'metadata_sync_helpers.test_2'::regclass; ROLLBACK; +-- also works if done by the rebalancer +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus_rebalancer gpid=10000000001'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + -- application_name with incorrect gpid BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); @@ -75,6 +82,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); ROLLBACK; +-- also faills if done by the rebalancer +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus_rebalancer gpid=not a correct gpid'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + -- application_name with suffix is ok (e.g. pgbouncer might add this) BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');