diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0db780d1b..1c2408a1a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2160,7 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); - uint32 connectionFlags = 0; + uint32 connectionFlags = 0; /* * Colocated intermediate results do not honor citus.max_shared_pool_size, @@ -2212,7 +2212,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, */ if (ShardIntervalListHasLocalPlacements(shardIntervalList)) { - bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(connectionFlags); + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode( + connectionFlags); copyDest->shouldUseLocalCopy = !reservedConnection; } } @@ -3635,7 +3636,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, return connection; } - if (IsReservationPossible(connectionFlags)) + if (IsReservationPossible(connectionFlags)) { /* * Enforce the requirements for adaptive connection management diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index a7d4c605d..fa7f9f3bf 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -68,7 +68,7 @@ static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, - const int cachedConnectionCount); + const int cachedConnectionCount); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -360,11 +360,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { - if (flags & REQUIRE_MAINTENANCE_CONNECTION) - { - // Maintenance database may have changed, so cached connection should be closed - connection->forceCloseAtTransactionEnd = true; - } + if (flags & REQUIRE_MAINTENANCE_CONNECTION) + { + /* Maintenance database may have changed, so cached connection should be closed */ + connection->forceCloseAtTransactionEnd = true; + } return connection; } } @@ -388,12 +388,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* these two flags are by nature cannot happen at the same time */ Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); - int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) - ? MAINTENANCE_CONNECTION - : 0; + int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) + ? MAINTENANCE_CONNECTION + : 0; if (flags & WAIT_FOR_CONNECTION) { - WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); + WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); } else if (flags & OPTIONAL_CONNECTION) { @@ -403,7 +403,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * cannot reserve the right to establish a connection, we prefer to * error out. */ - if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) + if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) { /* do not track the connection anymore */ dlist_delete(&connection->connectionNode); @@ -423,7 +423,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * * Still, we keep track of the connection counter. */ - IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); + IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); } @@ -437,15 +437,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); - if (flags & REQUIRE_METADATA_CONNECTION) + if (flags & REQUIRE_METADATA_CONNECTION) { connection->useForMetadataOperations = true; - } - else if (flags & REQUIRE_MAINTENANCE_CONNECTION) - { - connection->useForMaintenanceOperations = true; - connection->forceCloseAtTransactionEnd = true; - } + } + else if (flags & REQUIRE_MAINTENANCE_CONNECTION) + { + connection->useForMaintenanceOperations = true; + connection->forceCloseAtTransactionEnd = true; + } /* fully initialized the connection, record it */ connection->initializationState = POOL_STATE_INITIALIZED; @@ -513,11 +513,11 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } - if ((flags & REQUIRE_MAINTENANCE_CONNECTION) && - !connection->useForMaintenanceOperations) - { - continue; - } + if ((flags & REQUIRE_MAINTENANCE_CONNECTION) && + !connection->useForMaintenanceOperations) + { + continue; + } if ((flags & REQUIRE_METADATA_CONNECTION) && !connection->useForMetadataOperations) @@ -1215,10 +1215,11 @@ CitusPQFinish(MultiConnection *connection) /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) { - int sharedCounterFlags = (connection->useForMaintenanceOperations) - ? MAINTENANCE_CONNECTION - : 0; - DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, connection->port); + int sharedCounterFlags = (connection->useForMaintenanceOperations) + ? MAINTENANCE_CONNECTION + : 0; + DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, + connection->port); connection->initializationState = POOL_STATE_NOT_INITIALIZED; } } @@ -1515,16 +1516,17 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection * escalating the number of cached connections. We can recognize such backends * from their application name. */ - return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) || + return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) || connection->initializationState != POOL_STATE_INITIALIZED || cachedConnectionCount >= MaxCachedConnectionsPerWorker || - connection->forceCloseAtTransactionEnd || + connection->forceCloseAtTransactionEnd || PQstatus(connection->pgConn) != CONNECTION_OK || - !RemoteTransactionIdle(connection) || - connection->requiresReplication || - connection->isReplicationOriginSessionSetup || - (MaxCachedConnectionLifetime >= 0 && - MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0); + !RemoteTransactionIdle(connection) || + connection->requiresReplication || + connection->isReplicationOriginSessionSetup || + (MaxCachedConnectionLifetime >= 0 && + MillisecondsToTimeout(connection->connectionEstablishmentStart, + MaxCachedConnectionLifetime) <= 0); } diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a9079f752..cbe125dda 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -92,10 +92,11 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char * userId, Oid databaseOid, bool *found); -static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags); +static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 + connectionFlags); static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, - bool waitForConnection, - uint32 connectionFlags); + bool waitForConnection, + uint32 connectionFlags); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -241,8 +242,9 @@ DeallocateReservedConnections(void) * We have not used this reservation, make sure to clean-up from * the shared memory as well. */ - int sharedCounterFlags = 0; - DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, entry->key.port); + int sharedCounterFlags = 0; + DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, + entry->key.port); /* for completeness, set it to true */ entry->usedReservation = true; @@ -332,7 +334,8 @@ TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags) } bool waitForConnection = false; - return EnsureConnectionPossibilityForNode(localNode, waitForConnection, connectionFlags); + return EnsureConnectionPossibilityForNode(localNode, waitForConnection, + connectionFlags); } @@ -365,7 +368,8 @@ EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags) foreach_ptr(workerNode, nodeList) { bool waitForConnection = true; - EnsureConnectionPossibilityForNode(workerNode, waitForConnection, connectionFlags); + EnsureConnectionPossibilityForNode(workerNode, waitForConnection, + connectionFlags); } } @@ -384,9 +388,10 @@ EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags) * return false. */ static bool -EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 connectionFlags) +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 + connectionFlags) { - if (!IsReservationPossible(connectionFlags)) + if (!IsReservationPossible(connectionFlags)) { return false; } @@ -441,16 +446,17 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio * Increment the shared counter, we may need to wait if there are * no space left. */ - int sharedCounterFlags = 0; - WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, workerNode->workerPort); + int sharedCounterFlags = 0; + WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, + workerNode->workerPort); } else { - int sharedCounterFlags = 0; - bool incremented = TryToIncrementSharedConnectionCounter( - sharedCounterFlags, - workerNode->workerName, - workerNode->workerPort); + int sharedCounterFlags = 0; + bool incremented = TryToIncrementSharedConnectionCounter( + sharedCounterFlags, + workerNode->workerName, + workerNode->workerPort); if (!incremented) { /* @@ -482,11 +488,11 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio bool IsReservationPossible(uint32 connectionFlags) { - bool connectionThrottlingDisabled = - connectionFlags & REQUIRE_MAINTENANCE_CONNECTION - ? GetMaxMaintenanceSharedPoolSize() <= 0 - : GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING; - if (connectionThrottlingDisabled) + bool connectionThrottlingDisabled = + connectionFlags & REQUIRE_MAINTENANCE_CONNECTION + ? GetMaxMaintenanceSharedPoolSize() <= 0 + : GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING; + if (connectionThrottlingDisabled) { /* connection throttling disabled */ return false; diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 1cab33306..69ad0166a 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -75,24 +75,24 @@ typedef struct SharedWorkerNodeConnStatsHashKey typedef struct SharedWorkerNodeDatabaseConnStatsHashKey { - SharedWorkerNodeConnStatsHashKey workerNodeKey; - Oid database; + SharedWorkerNodeConnStatsHashKey workerNodeKey; + Oid database; } SharedWorkerNodeDatabaseConnStatsHashKey; /* hash entry for per worker stats */ typedef struct SharedWorkerNodeConnStatsHashEntry { - SharedWorkerNodeConnStatsHashKey key; + SharedWorkerNodeConnStatsHashKey key; - int regularConnectionsCount; - int maintenanceConnectionsCount; + int regularConnectionsCount; + int maintenanceConnectionsCount; } SharedWorkerNodeConnStatsHashEntry; /* hash entry for per database on worker stats */ typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry { - SharedWorkerNodeDatabaseConnStatsHashKey key; - int count; + SharedWorkerNodeDatabaseConnStatsHashKey key; + int count; } SharedWorkerNodeDatabaseConnStatsHashEntry; @@ -141,17 +141,24 @@ static bool ShouldWaitForConnection(int currentConnectionCount); static uint32 SharedConnectionHashHash(const void *key, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); -static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); +static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size + keysize); static bool isConnectionThrottlingDisabled(uint32 externalFlags); -static bool -IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, - Oid database); -static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port); -static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, - int port, - Oid database); -static void -DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port, Oid database); +static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool + checkLimits, const char *hostname, + int port, + Oid database); +static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int + port); +static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const + char * + hostname, + int port, + Oid + database); +static void DecrementSharedConnectionCounterInternal(uint32 externalFlags, const + char *hostname, int port, Oid + database); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -192,26 +199,29 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; - SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL; + SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL; - hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash); - while ((connectionEntry = (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(&status)) != 0) + hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash); + while ((connectionEntry = + (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search( + &status)) != 0) { /* get ready for the next tuple */ memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - char *databaseName = get_database_name(connectionEntry->key.database); + char *databaseName = get_database_name(connectionEntry->key.database); if (databaseName == NULL) { /* database might have been dropped */ continue; } - values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.workerNodeKey.hostname)); - values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port); + values[0] = PointerGetDatum(cstring_to_text( + connectionEntry->key.workerNodeKey.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port); values[2] = PointerGetDatum(cstring_to_text(databaseName)); - values[3] = Int32GetDatum(connectionEntry->count); + values[3] = Int32GetDatum(connectionEntry->count); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -257,10 +267,11 @@ GetMaxSharedPoolSize(void) return MaxSharedPoolSize; } + int GetMaxMaintenanceSharedPoolSize(void) { - return MaxMaintenanceSharedPoolSize; + return MaxMaintenanceSharedPoolSize; } @@ -293,7 +304,7 @@ GetLocalSharedPoolSize(void) void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) { - while (!TryToIncrementSharedConnectionCounter(flags, hostname, port)) + while (!TryToIncrementSharedConnectionCounter(flags, hostname, port)) { CHECK_FOR_INTERRUPTS(); @@ -303,6 +314,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) ConditionVariableCancelSleep(); } + /* * TryToIncrementSharedConnectionCounter tries to increment the shared * connection counter for the given nodeId and the current database in @@ -315,28 +327,28 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (isConnectionThrottlingDisabled(flags)) - { - return true; - } + if (isConnectionThrottlingDisabled(flags)) + { + return true; + } - /* + /* * The local session might already have some reserved connections to the given * node. In that case, we don't need to go through the shared memory. */ - Oid userId = GetUserId(); - if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) - { - MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); + Oid userId = GetUserId(); + if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) + { + MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); - return true; - } + return true; + } - return IncrementSharedConnectionCounterInternal(flags, - true, - hostname, - port, - MyDatabaseId); + return IncrementSharedConnectionCounterInternal(flags, + true, + hostname, + port, + MyDatabaseId); } @@ -347,161 +359,165 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (isConnectionThrottlingDisabled(flags)) - { - return; - } + if (isConnectionThrottlingDisabled(flags)) + { + return; + } - IncrementSharedConnectionCounterInternal(flags, - false, - hostname, - port, - MyDatabaseId); + IncrementSharedConnectionCounterInternal(flags, + false, + hostname, + port, + MyDatabaseId); } static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, - bool checkLimits, - const char *hostname, - int port, - Oid database) + bool checkLimits, + const char *hostname, + int port, + Oid database) { - LockConnectionSharedMemory(LW_EXCLUSIVE); + LockConnectionSharedMemory(LW_EXCLUSIVE); - /* - * As the hash map is allocated in shared memory, it doesn't rely on palloc for - * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no - * space in the shared memory. That's why we prefer continuing the execution - * instead of throwing an error. - */ - SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); - bool workerNodeEntryFound = false; - SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = - hash_search(SharedWorkerNodeConnStatsHash, - &workerNodeKey, - HASH_ENTER_NULL, - &workerNodeEntryFound); + /* + * As the hash map is allocated in shared memory, it doesn't rely on palloc for + * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no + * space in the shared memory. That's why we prefer continuing the execution + * instead of throwing an error. + */ + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, + port); + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = + hash_search(SharedWorkerNodeConnStatsHash, + &workerNodeKey, + HASH_ENTER_NULL, + &workerNodeEntryFound); - /* - * It is possible to throw an error at this point, but that doesn't help us in anyway. - * Instead, we try our best, let the connection establishment continue by-passing the - * connection throttling. - */ - if (!workerNodeConnectionEntry) - { - UnLockConnectionSharedMemory(); - return true; - } + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!workerNodeConnectionEntry) + { + UnLockConnectionSharedMemory(); + return true; + } - if (!workerNodeEntryFound) - { - /* we successfully allocated the entry for the first time, so initialize it */ - workerNodeConnectionEntry->regularConnectionsCount = 0; - workerNodeConnectionEntry->maintenanceConnectionsCount = 0; - } + if (!workerNodeEntryFound) + { + /* we successfully allocated the entry for the first time, so initialize it */ + workerNodeConnectionEntry->regularConnectionsCount = 0; + workerNodeConnectionEntry->maintenanceConnectionsCount = 0; + } - /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ - SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = - PrepareWorkerNodeDatabaseHashKey(hostname, port, database); - bool workerNodeDatabaseEntryFound = false; - SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = - hash_search(SharedWorkerNodeDatabaseConnStatsHash, - &workerNodeDatabaseKey, - HASH_ENTER_NULL, - &workerNodeDatabaseEntryFound); + /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, database); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_ENTER_NULL, + &workerNodeDatabaseEntryFound); - if (!workerNodeDatabaseEntry) - { - UnLockConnectionSharedMemory(); - return true; - } + if (!workerNodeDatabaseEntry) + { + UnLockConnectionSharedMemory(); + return true; + } - if (!workerNodeDatabaseEntryFound) - { - workerNodeDatabaseEntry->count = 0; - } + if (!workerNodeDatabaseEntryFound) + { + workerNodeDatabaseEntry->count = 0; + } - /* Increment counter if a slot available */ - bool connectionSlotAvailable = true; + /* Increment counter if a slot available */ + bool connectionSlotAvailable = true; - bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; - if (checkLimits) - { - WorkerNode *workerNode = FindWorkerNode(hostname, port); - bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId()); + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + if (checkLimits) + { + WorkerNode *workerNode = FindWorkerNode(hostname, port); + bool connectionToLocalNode = workerNode && (workerNode->groupId == + GetLocalGroupId()); - int currentConnectionsLimit; - int currentConnectionsCount; - if (maintenanceConnection) - { - currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize(); - currentConnectionsCount = workerNodeConnectionEntry->maintenanceConnectionsCount; - } - else - { - currentConnectionsLimit = connectionToLocalNode - ? GetLocalSharedPoolSize() - : GetMaxSharedPoolSize(); - currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; - } + int currentConnectionsLimit; + int currentConnectionsCount; + if (maintenanceConnection) + { + currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize(); + currentConnectionsCount = + workerNodeConnectionEntry->maintenanceConnectionsCount; + } + else + { + currentConnectionsLimit = connectionToLocalNode + ? GetLocalSharedPoolSize() + : GetMaxSharedPoolSize(); + currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; + } - bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; + bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > + currentConnectionsLimit; - /* - * For local nodes, solely relying on citus.max_shared_pool_size or - * max_connections might not be sufficient. The former gives us - * a preview of the future (e.g., we let the new connections to establish, - * but they are not established yet). The latter gives us the close to - * precise view of the past (e.g., the active number of client backends). - * - * Overall, we want to limit both of the metrics. The former limit typically - * kicks in under regular loads, where the load of the database increases in - * a reasonable pace. The latter limit typically kicks in when the database - * is issued lots of concurrent sessions at the same time, such as benchmarks. - */ - bool localNodeLimitExceeded = - connectionToLocalNode && - (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || - GetExternalClientBackendCount() + 1 > currentConnectionsLimit); - if (remoteNodeLimitExceeded || localNodeLimitExceeded) - { - connectionSlotAvailable = false; - } - } + /* + * For local nodes, solely relying on citus.max_shared_pool_size or + * max_connections might not be sufficient. The former gives us + * a preview of the future (e.g., we let the new connections to establish, + * but they are not established yet). The latter gives us the close to + * precise view of the past (e.g., the active number of client backends). + * + * Overall, we want to limit both of the metrics. The former limit typically + * kicks in under regular loads, where the load of the database increases in + * a reasonable pace. The latter limit typically kicks in when the database + * is issued lots of concurrent sessions at the same time, such as benchmarks. + */ + bool localNodeLimitExceeded = + connectionToLocalNode && + (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || + GetExternalClientBackendCount() + 1 > currentConnectionsLimit); + if (remoteNodeLimitExceeded || localNodeLimitExceeded) + { + connectionSlotAvailable = false; + } + } - if (connectionSlotAvailable) - { - if (maintenanceConnection) - { - workerNodeConnectionEntry->maintenanceConnectionsCount += 1; - } - else - { - workerNodeConnectionEntry->regularConnectionsCount += 1; - } - workerNodeDatabaseEntry->count += 1; - } + if (connectionSlotAvailable) + { + if (maintenanceConnection) + { + workerNodeConnectionEntry->maintenanceConnectionsCount += 1; + } + else + { + workerNodeConnectionEntry->regularConnectionsCount += 1; + } + workerNodeDatabaseEntry->count += 1; + } - if (IsLoggableLevel(DEBUG4)) - { - ereport(DEBUG4, errmsg( - "Incrementing connection counter. " - "Current regular connections: %i, maintenance connections: %i. " - "Connection slot to %s:%i database %i is %s", - workerNodeConnectionEntry->regularConnectionsCount, - workerNodeConnectionEntry->maintenanceConnectionsCount, - hostname, - port, - database, - connectionSlotAvailable ? "available" : "not available" - )); - } + if (IsLoggableLevel(DEBUG4)) + { + ereport(DEBUG4, errmsg( + "Incrementing connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is %s", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database, + connectionSlotAvailable ? "available" : "not available" + )); + } - UnLockConnectionSharedMemory(); + UnLockConnectionSharedMemory(); - return connectionSlotAvailable; + return connectionSlotAvailable; } @@ -512,105 +528,109 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) { - // TODO: possible bug, remove this check? - if (isConnectionThrottlingDisabled(externalFlags)) - { - return; - } + /* TODO: possible bug, remove this check? */ + if (isConnectionThrottlingDisabled(externalFlags)) + { + return; + } - LockConnectionSharedMemory(LW_EXCLUSIVE); + LockConnectionSharedMemory(LW_EXCLUSIVE); - DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); + DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); - UnLockConnectionSharedMemory(); - WakeupWaiterBackendsForSharedConnection(); + UnLockConnectionSharedMemory(); + WakeupWaiterBackendsForSharedConnection(); } + static void DecrementSharedConnectionCounterInternal(uint32 externalFlags, - const char *hostname, - int port, - Oid database) + const char *hostname, + int port, + Oid database) { - bool workerNodeEntryFound = false; - SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); - SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = - hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound); + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, + port); + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, + &workerNodeEntryFound); - /* this worker node is removed or updated, no need to care */ - if (!workerNodeEntryFound) - { - ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " - "connection counter", hostname, port))); - return; - } + /* this worker node is removed or updated, no need to care */ + if (!workerNodeEntryFound) + { + ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " + "connection counter", hostname, port))); + return; + } - /* we should never go below 0 */ - Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || - workerNodeConnectionEntry->maintenanceConnectionsCount > 0); + /* we should never go below 0 */ + Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || + workerNodeConnectionEntry->maintenanceConnectionsCount > 0); - if (externalFlags & MAINTENANCE_CONNECTION) - { - workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; - } - else - { - workerNodeConnectionEntry->regularConnectionsCount -= 1; - } + if (externalFlags & MAINTENANCE_CONNECTION) + { + workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; + } + else + { + workerNodeConnectionEntry->regularConnectionsCount -= 1; + } - if (IsLoggableLevel(DEBUG4)) - { - ereport(DEBUG4, errmsg( - "Decrementing connection counter. " - "Current regular connections: %i, maintenance connections: %i. " - "Connection slot to %s:%i database %i is released", - workerNodeConnectionEntry->regularConnectionsCount, - workerNodeConnectionEntry->maintenanceConnectionsCount, - hostname, - port, - database - )); - } + if (IsLoggableLevel(DEBUG4)) + { + ereport(DEBUG4, errmsg( + "Decrementing connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is released", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database + )); + } - /* - * We don't have to remove at this point as the node might be still active - * and will have new connections open to it. Still, this seems like a convenient - * place to remove the entry, as count == 0 implies that the server is - * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, - * we're unlikely to trigger this often. - */ - if (workerNodeConnectionEntry->regularConnectionsCount == 0 && - workerNodeConnectionEntry->maintenanceConnectionsCount == 0) - { - hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); - } + /* + * We don't have to remove at this point as the node might be still active + * and will have new connections open to it. Still, this seems like a convenient + * place to remove the entry, as count == 0 implies that the server is + * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, + * we're unlikely to trigger this often. + */ + if (workerNodeConnectionEntry->regularConnectionsCount == 0 && + workerNodeConnectionEntry->maintenanceConnectionsCount == 0) + { + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); + } - /* - * Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey - */ + /* + * Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey + */ - SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = - PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId); - bool workerNodeDatabaseEntryFound = false; - SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = - hash_search(SharedWorkerNodeDatabaseConnStatsHash, - &workerNodeDatabaseKey, - HASH_FIND, - &workerNodeDatabaseEntryFound); + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_FIND, + &workerNodeDatabaseEntryFound); - if (!workerNodeDatabaseEntryFound) - { - return; - } + if (!workerNodeDatabaseEntryFound) + { + return; + } - Assert(workerNodeDatabaseEntry->count > 0); + Assert(workerNodeDatabaseEntry->count > 0); - workerNodeDatabaseEntry->count -= 1; + workerNodeDatabaseEntry->count -= 1; - if (workerNodeDatabaseEntry->count == 0) - { - hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, HASH_REMOVE, NULL); - } + if (workerNodeDatabaseEntry->count == 0) + { + hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, + HASH_REMOVE, NULL); + } } @@ -621,7 +641,7 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, static void LockConnectionSharedMemory(LWLockMode lockMode) { - LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); + LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); } @@ -706,17 +726,20 @@ SharedConnectionStatsShmemSize(void) size = add_size(size, sizeof(ConnectionStatsSharedData)); - Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked, - sizeof(SharedWorkerNodeConnStatsHashEntry)); + Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked, + sizeof( + SharedWorkerNodeConnStatsHashEntry)); - size = add_size(size, workerNodeConnHashSize); + size = add_size(size, workerNodeConnHashSize); - Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked * MaxDatabasesPerWorkerNodesTracked, - sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry)); + Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked * + MaxDatabasesPerWorkerNodesTracked, + sizeof( + SharedWorkerNodeDatabaseConnStatsHashEntry)); - size = add_size(size, workerNodeDatabaseConnSize); + size = add_size(size, workerNodeDatabaseConnSize); - return size; + return size; } @@ -756,42 +779,48 @@ SharedConnectionStatsShmemInit(void) ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } - /* allocate hash tables */ + /* allocate hash tables */ - /* create (hostname, port) -> [counter] */ - HASHCTL sharedWorkerNodeConnStatsHashInfo; - memset(&sharedWorkerNodeConnStatsHashInfo, 0, sizeof(sharedWorkerNodeConnStatsHashInfo)); - sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey); - sharedWorkerNodeConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeConnStatsHashEntry); - sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash; - sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare; - SharedWorkerNodeConnStatsHash = - ShmemInitHash("Shared Conn. Stats Hash", - MaxWorkerNodesTracked, - MaxWorkerNodesTracked, - &sharedWorkerNodeConnStatsHashInfo, - (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); + /* create (hostname, port) -> [counter] */ + HASHCTL sharedWorkerNodeConnStatsHashInfo; + memset(&sharedWorkerNodeConnStatsHashInfo, 0, + sizeof(sharedWorkerNodeConnStatsHashInfo)); + sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey); + sharedWorkerNodeConnStatsHashInfo.entrysize = + sizeof(SharedWorkerNodeConnStatsHashEntry); + sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash; + sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare; + SharedWorkerNodeConnStatsHash = + ShmemInitHash("Shared Conn. Stats Hash", + MaxWorkerNodesTracked, + MaxWorkerNodesTracked, + &sharedWorkerNodeConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); - /* create (hostname, port, database) -> [counter] */ - HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo; - memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo)); - sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashKey); - sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry); - sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash; - sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare; + /* create (hostname, port, database) -> [counter] */ + HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo; + memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, + sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo)); + sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = + sizeof(SharedWorkerNodeDatabaseConnStatsHashKey); + sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = + sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry); + sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash; + sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare; - int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * MaxDatabasesPerWorkerNodesTracked; - SharedWorkerNodeDatabaseConnStatsHash = - ShmemInitHash("Shared Conn Per Database. Stats Hash", - sharedWorkerNodeDatabaseConnStatsHashSize, - sharedWorkerNodeDatabaseConnStatsHashSize, - &sharedWorkerNodeDatabaseConnStatsHashInfo, - (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); + int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * + MaxDatabasesPerWorkerNodesTracked; + SharedWorkerNodeDatabaseConnStatsHash = + ShmemInitHash("Shared Conn Per Database. Stats Hash", + sharedWorkerNodeDatabaseConnStatsHashSize, + sharedWorkerNodeDatabaseConnStatsHashSize, + &sharedWorkerNodeDatabaseConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); LWLockRelease(AddinShmemInitLock); - Assert(SharedWorkerNodeConnStatsHash != NULL); - Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL); + Assert(SharedWorkerNodeConnStatsHash != NULL); + Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); if (prev_shmem_startup_hook != NULL) @@ -889,35 +918,39 @@ ShouldWaitForConnection(int currentConnectionCount) return false; } -static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port) + +static SharedWorkerNodeConnStatsHashKey +PrepareWorkerNodeHashKey(const char *hostname, int port) { - SharedWorkerNodeConnStatsHashKey key; - strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - key.port = port; - return key; + SharedWorkerNodeConnStatsHashKey key; + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + key.port = port; + return key; } -static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, - int port, - Oid database) + +static SharedWorkerNodeDatabaseConnStatsHashKey +PrepareWorkerNodeDatabaseHashKey(const char *hostname, + int port, + Oid database) { - SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; - workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); - workerNodeDatabaseKey.database = database; - return workerNodeDatabaseKey; + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; + workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + workerNodeDatabaseKey.database = database; + return workerNodeDatabaseKey; } static uint32 SharedConnectionHashHash(const void *key, Size keysize) { - SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key; + SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key; uint32 hash = string_hash(entry->hostname, NAMEDATALEN); hash = hash_combine(hash, hash_uint32(entry->port)); @@ -925,47 +958,56 @@ SharedConnectionHashHash(const void *key, Size keysize) return hash; } + static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize) { - SharedWorkerNodeDatabaseConnStatsHashKey *entry = (SharedWorkerNodeDatabaseConnStatsHashKey *) key; - uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN); - hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port)); - hash = hash_combine(hash, hash_uint32(entry->database)); + SharedWorkerNodeDatabaseConnStatsHashKey *entry = + (SharedWorkerNodeDatabaseConnStatsHashKey *) key; + uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN); + hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port)); + hash = hash_combine(hash, hash_uint32(entry->database)); - return hash; + return hash; } static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize) { - SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a; - SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b; + SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a; + SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b; - return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || - ca->port != cb->port; + return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port; } + static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) { - SharedWorkerNodeDatabaseConnStatsHashKey *ca = (SharedWorkerNodeDatabaseConnStatsHashKey *) a; - SharedWorkerNodeDatabaseConnStatsHashKey *cb = (SharedWorkerNodeDatabaseConnStatsHashKey *) b; + SharedWorkerNodeDatabaseConnStatsHashKey *ca = + (SharedWorkerNodeDatabaseConnStatsHashKey *) a; + SharedWorkerNodeDatabaseConnStatsHashKey *cb = + (SharedWorkerNodeDatabaseConnStatsHashKey *) b; - return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname, MAX_NODE_LENGTH) != 0 || - ca->workerNodeKey.port != cb->workerNodeKey.port || - ca->database != cb->database; + return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname, + MAX_NODE_LENGTH) != 0 || + ca->workerNodeKey.port != cb->workerNodeKey.port || + ca->database != cb->database; } -static bool isConnectionThrottlingDisabled(uint32 externalFlags) + +static bool +isConnectionThrottlingDisabled(uint32 externalFlags) { - bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; - /* - * Do not call Get*PoolSize() functions here, since it may read from - * the catalog and we may be in the process exit handler. - */ - return maintenanceConnection - ? MaxMaintenanceSharedPoolSize <= 0 - : MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + + /* + * Do not call Get*PoolSize() functions here, since it may read from + * the catalog and we may be in the process exit handler. + */ + return maintenanceConnection + ? MaxMaintenanceSharedPoolSize <= 0 + : MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d0f8dfbd6..f06acf009 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1993,17 +1993,17 @@ RegisterCitusConfigVariables(void) GUC_SUPERUSER_ONLY, NULL, NULL, MaxSharedPoolSizeGucShowHook); - DefineCustomIntVariable( - "citus.max_maintenance_shared_pool_size", - gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " - "for maintenance operations only." - "Setting it to 0 or -1 disables maintenance connection throttling."), - NULL, - &MaxMaintenanceSharedPoolSize, - 5, -1, INT_MAX, - PGC_SIGHUP, - GUC_SUPERUSER_ONLY, - NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_maintenance_shared_pool_size", + gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " + "for maintenance operations only." + "Setting it to 0 or -1 disables maintenance connection throttling."), + NULL, + &MaxMaintenanceSharedPoolSize, + 5, -1, INT_MAX, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); DefineCustomIntVariable( "citus.max_worker_nodes_tracked", @@ -2024,19 +2024,19 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - DefineCustomIntVariable( - "citus.max_databases_per_worker_tracked", - gettext_noop("Sets the amount of databases per worker tracked."), - gettext_noop( - "This configuration value complements the citus.max_worker_nodes_tracked." - "It should be used when there are more then one database with Citus in cluster," - "and, effectively, limits the size of the hash table with connections per worker + database." - "Currently, it does not affect the connection management logic and serves only statistical purposes."), - &MaxDatabasesPerWorkerNodesTracked, - 1, 1, INT_MAX, - PGC_POSTMASTER, - GUC_STANDARD, - NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_databases_per_worker_tracked", + gettext_noop("Sets the amount of databases per worker tracked."), + gettext_noop( + "This configuration value complements the citus.max_worker_nodes_tracked." + "It should be used when there are more then one database with Citus in cluster," + "and, effectively, limits the size of the hash table with connections per worker + database." + "Currently, it does not affect the connection management logic and serves only statistical purposes."), + &MaxDatabasesPerWorkerNodesTracked, + 1, 1, INT_MAX, + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); DefineCustomIntVariable( "citus.metadata_sync_interval", diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 829f19850..5f868f548 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -1071,6 +1071,7 @@ citus_pid_for_gpid(PG_FUNCTION_ARGS) PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID)); } + /* * ExtractGlobalPID extracts the global process id from the application name and returns it * if the application name is not compatible with Citus' application names returns 0. diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index bc960796d..8054d2927 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -159,7 +159,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) bool recoveryFailed = false; - int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; + int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 117507c15..f329ae001 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -706,35 +706,35 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, Recover2PCInterval); } - /* - * Execute only on the maintenance database, if it configured, otherwise run from every daemon. - * The config value -1 disables the distributed deadlock detection - */ - if (DistributedDeadlockDetectionTimeoutFactor != -1.0) + /* + * Execute only on the maintenance database, if it configured, otherwise run from every daemon. + * The config value -1 disables the distributed deadlock detection + */ + if (DistributedDeadlockDetectionTimeoutFactor != -1.0) { - double deadlockTimeout = - DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; + double deadlockTimeout = + DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; - InvalidateMetadataSystemCache(); - StartTransactionCommand(); + InvalidateMetadataSystemCache(); + StartTransactionCommand(); - /* - * We skip the deadlock detection if citus extension - * is not accessible. - * - * Similarly, we skip to run the deadlock checks if - * there exists any version mismatch or the extension - * is not fully created yet. - */ - if (!LockCitusExtension()) - { - ereport(DEBUG1, (errmsg("could not lock the citus extension, " - "skipping deadlock detection"))); - } - else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) - { - foundDeadlock = CheckForDistributedDeadlocks(); - } + /* + * We skip the deadlock detection if citus extension + * is not accessible. + * + * Similarly, we skip to run the deadlock checks if + * there exists any version mismatch or the extension + * is not fully created yet. + */ + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping deadlock detection"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + foundDeadlock = CheckForDistributedDeadlocks(); + } CommitTransactionCommand(); @@ -1233,4 +1233,3 @@ MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData) return metadataSyncTriggered; } - diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 91e1e9222..cc2fa3403 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -125,12 +125,12 @@ enum MultiConnectionMode */ REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8, - /* - * This flag specifies that connection is required for maintenance operations, e.g. - * transaction recovery, distributed deadlock detection. Such connections have - * a reserved quota of the MaxSharedPoolSize. - */ - REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 + /* + * This flag specifies that connection is required for maintenance operations, e.g. + * transaction recovery, distributed deadlock detection. Such connections have + * a reserved quota of the MaxSharedPoolSize. + */ + REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 }; @@ -230,8 +230,8 @@ typedef struct MultiConnection /* replication option */ bool requiresReplication; - /* See REQUIRE_MAINTENANCE_CONNECTION */ - bool useForMaintenanceOperations; + /* See REQUIRE_MAINTENANCE_CONNECTION */ + bool useForMaintenanceOperations; MultiConnectionStructInitializationState initializationState; } MultiConnection; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index d5901014a..ea8346769 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -18,10 +18,10 @@ enum SharedPoolCounterMode { - /* - * Use this flag to reserve a connection from a maintenance quota - */ - MAINTENANCE_CONNECTION = 1 << 0 + /* + * Use this flag to reserve a connection from a maintenance quota + */ + MAINTENANCE_CONNECTION = 1 << 0 }; extern int MaxSharedPoolSize; @@ -39,11 +39,14 @@ extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); extern int GetMaxMaintenanceSharedPoolSize(void); extern int GetLocalSharedPoolSize(void); -extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); +extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, + int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); -extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port); -extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); -extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int - activeConnectionCount); +extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, + int port); +extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, + int port); +extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, + int activeConnectionCount); #endif /* SHARED_CONNECTION_STATS_H */