diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 41017fab9..ee20b891e 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -234,8 +234,16 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, char nodePortString[12] = ""; StringInfo applicationName = makeStringInfo(); - appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX, - GetGlobalPID()); +elog(WARNING, "InternalConnectionName::%s::", InternalConnectionName); + if (strcmp(InternalConnectionName, "") == 0) + { + appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + } + else + { + appendStringInfo(applicationName, "%s", InternalConnectionName); + } /* * This function has three sections: diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 63ac316a8..62648930f 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -66,6 +66,10 @@ int GroupSize = 1; /* config variable managed via guc.c */ char *CurrentCluster = "default"; +/*TODO: add comments, move correct file */ +char *InternalConnectionName = ""; + + /* * Config variable to control whether we should replicate reference tables on * node activation or we should defer it to shard creation. diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 166423cf0..e145f7e6b 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -58,6 +58,8 @@ #include "utils/memutils.h" #include "utils/syscache.h" +#include "distributed/distributed_deadlock_detection.h" + #if PG_VERSION_NUM >= PG_VERSION_13 #include "common/hashfn.h" #endif @@ -824,6 +826,9 @@ rebalance_table_shards(PG_FUNCTION_ARGS) .rebalanceStrategy = strategy, .improvementThreshold = strategy->improvementThreshold, }; + + SetLocalInternalConnectionName("rebalancer"); + Oid shardTransferModeOid = PG_GETARG_OID(4); RebalanceTableShards(&options, shardTransferModeOid); PG_RETURN_VOID(); @@ -1696,6 +1701,7 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ + ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data); UpdateColocatedShardPlacementProgress(shardId, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index aacccd8a6..b207b7a79 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1142,6 +1142,17 @@ RegisterCitusConfigVariables(void) HideShardsFromAppNamePrefixesAssignHook, NULL); + DefineCustomStringVariable( + "citus.internal_connection_name", + gettext_noop("TODO: add comment"), + NULL, + &InternalConnectionName, + "", + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + + DefineCustomIntVariable( "citus.isolation_test_session_process_id", NULL, diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index f9e4adca0..1125380bf 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -63,6 +63,8 @@ static void LogCancellingBackend(TransactionNode *transactionNode); static void LogTransactionNode(TransactionNode *transactionNode); static void LogDistributedDeadlockDebugMessage(const char *errorMessage); + + PG_FUNCTION_INFO_V1(check_distributed_deadlocks); @@ -119,6 +121,8 @@ CheckForDistributedDeadlocks(void) return false; } + SetLocalInternalConnectionName("distributed deadlock detection"); + WaitGraph *waitGraph = BuildGlobalWaitGraph(); HTAB *adjacencyLists = BuildAdjacencyListsForWaitGraph(waitGraph); @@ -219,6 +223,15 @@ CheckForDistributedDeadlocks(void) } +void +SetLocalInternalConnectionName(char *internalConnectionName) +{ + set_config_option("citus.internal_connection_name", internalConnectionName, + (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); +} + + /* * CheckDeadlockForTransactionNode does a DFS starting with the given * transaction node and checks for a cycle (i.e., the node can be reached again diff --git a/src/include/distributed/distributed_deadlock_detection.h b/src/include/distributed/distributed_deadlock_detection.h index 23f6554ef..2350beb12 100644 --- a/src/include/distributed/distributed_deadlock_detection.h +++ b/src/include/distributed/distributed_deadlock_detection.h @@ -37,6 +37,7 @@ typedef struct TransactionNode /* GUC, determining whether debug messages for deadlock detection sent to LOG */ extern bool LogDistributedDeadlockDetection; +extern void SetLocalInternalConnectionName(char *internalConnectionName); extern bool CheckForDistributedDeadlocks(void); extern HTAB * BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 82118f103..daf72ad3c 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -63,6 +63,8 @@ extern char *WorkerListFileName; extern char *CurrentCluster; extern bool ReplicateReferenceTablesOnActivate; +extern char *InternalConnectionName; + /* Function declarations for finding worker nodes to place shards on */ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);