Run make reindent

pull/7286/head
Jelte Fennema-Nio 2024-01-18 12:36:13 +01:00 committed by ivyazmitinov
parent 115ed00c06
commit 61714862a6
10 changed files with 516 additions and 462 deletions

View File

@ -2160,7 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
uint32 connectionFlags = 0; uint32 connectionFlags = 0;
/* /*
* Colocated intermediate results do not honor citus.max_shared_pool_size, * Colocated intermediate results do not honor citus.max_shared_pool_size,
@ -2212,7 +2212,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
*/ */
if (ShardIntervalListHasLocalPlacements(shardIntervalList)) if (ShardIntervalListHasLocalPlacements(shardIntervalList))
{ {
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(connectionFlags); bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(
connectionFlags);
copyDest->shouldUseLocalCopy = !reservedConnection; copyDest->shouldUseLocalCopy = !reservedConnection;
} }
} }
@ -3635,7 +3636,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
return connection; return connection;
} }
if (IsReservationPossible(connectionFlags)) if (IsReservationPossible(connectionFlags))
{ {
/* /*
* Enforce the requirements for adaptive connection management * Enforce the requirements for adaptive connection management

View File

@ -68,7 +68,7 @@ static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, static bool ShouldShutdownConnection(MultiConnection *connection,
const int cachedConnectionCount); const int cachedConnectionCount);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -360,11 +360,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
if (flags & REQUIRE_MAINTENANCE_CONNECTION) if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{ {
// Maintenance database may have changed, so cached connection should be closed /* Maintenance database may have changed, so cached connection should be closed */
connection->forceCloseAtTransactionEnd = true; connection->forceCloseAtTransactionEnd = true;
} }
return connection; return connection;
} }
} }
@ -388,12 +388,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* these two flags are by nature cannot happen at the same time */ /* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION ? MAINTENANCE_CONNECTION
: 0; : 0;
if (flags & WAIT_FOR_CONNECTION) if (flags & WAIT_FOR_CONNECTION)
{ {
WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); WaitLoopForSharedConnection(sharedCounterFlags, hostname, port);
} }
else if (flags & OPTIONAL_CONNECTION) else if (flags & OPTIONAL_CONNECTION)
{ {
@ -403,7 +403,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* cannot reserve the right to establish a connection, we prefer to * cannot reserve the right to establish a connection, we prefer to
* error out. * error out.
*/ */
if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port))
{ {
/* do not track the connection anymore */ /* do not track the connection anymore */
dlist_delete(&connection->connectionNode); dlist_delete(&connection->connectionNode);
@ -423,7 +423,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* *
* Still, we keep track of the connection counter. * Still, we keep track of the connection counter.
*/ */
IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port);
} }
@ -437,15 +437,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if (flags & REQUIRE_METADATA_CONNECTION) if (flags & REQUIRE_METADATA_CONNECTION)
{ {
connection->useForMetadataOperations = true; connection->useForMetadataOperations = true;
} }
else if (flags & REQUIRE_MAINTENANCE_CONNECTION) else if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{ {
connection->useForMaintenanceOperations = true; connection->useForMaintenanceOperations = true;
connection->forceCloseAtTransactionEnd = true; connection->forceCloseAtTransactionEnd = true;
} }
/* fully initialized the connection, record it */ /* fully initialized the connection, record it */
connection->initializationState = POOL_STATE_INITIALIZED; connection->initializationState = POOL_STATE_INITIALIZED;
@ -513,11 +513,11 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
continue; continue;
} }
if ((flags & REQUIRE_MAINTENANCE_CONNECTION) && if ((flags & REQUIRE_MAINTENANCE_CONNECTION) &&
!connection->useForMaintenanceOperations) !connection->useForMaintenanceOperations)
{ {
continue; continue;
} }
if ((flags & REQUIRE_METADATA_CONNECTION) && if ((flags & REQUIRE_METADATA_CONNECTION) &&
!connection->useForMetadataOperations) !connection->useForMetadataOperations)
@ -1215,10 +1215,11 @@ CitusPQFinish(MultiConnection *connection)
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED)
{ {
int sharedCounterFlags = (connection->useForMaintenanceOperations) int sharedCounterFlags = (connection->useForMaintenanceOperations)
? MAINTENANCE_CONNECTION ? MAINTENANCE_CONNECTION
: 0; : 0;
DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, connection->port); DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname,
connection->port);
connection->initializationState = POOL_STATE_NOT_INITIALIZED; connection->initializationState = POOL_STATE_NOT_INITIALIZED;
} }
} }
@ -1515,16 +1516,17 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
* escalating the number of cached connections. We can recognize such backends * escalating the number of cached connections. We can recognize such backends
* from their application name. * from their application name.
*/ */
return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) || return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
connection->initializationState != POOL_STATE_INITIALIZED || connection->initializationState != POOL_STATE_INITIALIZED ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker || cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd || connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK || PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection) || !RemoteTransactionIdle(connection) ||
connection->requiresReplication || connection->requiresReplication ||
connection->isReplicationOriginSessionSetup || connection->isReplicationOriginSessionSetup ||
(MaxCachedConnectionLifetime >= 0 && (MaxCachedConnectionLifetime >= 0 &&
MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0); MillisecondsToTimeout(connection->connectionEstablishmentStart,
MaxCachedConnectionLifetime) <= 0);
} }

View File

@ -92,10 +92,11 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *
userId, Oid userId, Oid
databaseOid, databaseOid,
bool *found); bool *found);
static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags); static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32
connectionFlags);
static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode,
bool waitForConnection, bool waitForConnection,
uint32 connectionFlags); uint32 connectionFlags);
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);
@ -241,8 +242,9 @@ DeallocateReservedConnections(void)
* We have not used this reservation, make sure to clean-up from * We have not used this reservation, make sure to clean-up from
* the shared memory as well. * the shared memory as well.
*/ */
int sharedCounterFlags = 0; int sharedCounterFlags = 0;
DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, entry->key.port); DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname,
entry->key.port);
/* for completeness, set it to true */ /* for completeness, set it to true */
entry->usedReservation = true; entry->usedReservation = true;
@ -332,7 +334,8 @@ TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags)
} }
bool waitForConnection = false; bool waitForConnection = false;
return EnsureConnectionPossibilityForNode(localNode, waitForConnection, connectionFlags); return EnsureConnectionPossibilityForNode(localNode, waitForConnection,
connectionFlags);
} }
@ -365,7 +368,8 @@ EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags)
foreach_ptr(workerNode, nodeList) foreach_ptr(workerNode, nodeList)
{ {
bool waitForConnection = true; bool waitForConnection = true;
EnsureConnectionPossibilityForNode(workerNode, waitForConnection, connectionFlags); EnsureConnectionPossibilityForNode(workerNode, waitForConnection,
connectionFlags);
} }
} }
@ -384,9 +388,10 @@ EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags)
* return false. * return false.
*/ */
static bool static bool
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 connectionFlags) EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32
connectionFlags)
{ {
if (!IsReservationPossible(connectionFlags)) if (!IsReservationPossible(connectionFlags))
{ {
return false; return false;
} }
@ -441,16 +446,17 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* Increment the shared counter, we may need to wait if there are * Increment the shared counter, we may need to wait if there are
* no space left. * no space left.
*/ */
int sharedCounterFlags = 0; int sharedCounterFlags = 0;
WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, workerNode->workerPort); WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName,
workerNode->workerPort);
} }
else else
{ {
int sharedCounterFlags = 0; int sharedCounterFlags = 0;
bool incremented = TryToIncrementSharedConnectionCounter( bool incremented = TryToIncrementSharedConnectionCounter(
sharedCounterFlags, sharedCounterFlags,
workerNode->workerName, workerNode->workerName,
workerNode->workerPort); workerNode->workerPort);
if (!incremented) if (!incremented)
{ {
/* /*
@ -482,11 +488,11 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
bool bool
IsReservationPossible(uint32 connectionFlags) IsReservationPossible(uint32 connectionFlags)
{ {
bool connectionThrottlingDisabled = bool connectionThrottlingDisabled =
connectionFlags & REQUIRE_MAINTENANCE_CONNECTION connectionFlags & REQUIRE_MAINTENANCE_CONNECTION
? GetMaxMaintenanceSharedPoolSize() <= 0 ? GetMaxMaintenanceSharedPoolSize() <= 0
: GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING; : GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING;
if (connectionThrottlingDisabled) if (connectionThrottlingDisabled)
{ {
/* connection throttling disabled */ /* connection throttling disabled */
return false; return false;

View File

@ -75,24 +75,24 @@ typedef struct SharedWorkerNodeConnStatsHashKey
typedef struct SharedWorkerNodeDatabaseConnStatsHashKey typedef struct SharedWorkerNodeDatabaseConnStatsHashKey
{ {
SharedWorkerNodeConnStatsHashKey workerNodeKey; SharedWorkerNodeConnStatsHashKey workerNodeKey;
Oid database; Oid database;
} SharedWorkerNodeDatabaseConnStatsHashKey; } SharedWorkerNodeDatabaseConnStatsHashKey;
/* hash entry for per worker stats */ /* hash entry for per worker stats */
typedef struct SharedWorkerNodeConnStatsHashEntry typedef struct SharedWorkerNodeConnStatsHashEntry
{ {
SharedWorkerNodeConnStatsHashKey key; SharedWorkerNodeConnStatsHashKey key;
int regularConnectionsCount; int regularConnectionsCount;
int maintenanceConnectionsCount; int maintenanceConnectionsCount;
} SharedWorkerNodeConnStatsHashEntry; } SharedWorkerNodeConnStatsHashEntry;
/* hash entry for per database on worker stats */ /* hash entry for per database on worker stats */
typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry
{ {
SharedWorkerNodeDatabaseConnStatsHashKey key; SharedWorkerNodeDatabaseConnStatsHashKey key;
int count; int count;
} SharedWorkerNodeDatabaseConnStatsHashEntry; } SharedWorkerNodeDatabaseConnStatsHashEntry;
@ -141,17 +141,24 @@ static bool ShouldWaitForConnection(int currentConnectionCount);
static uint32 SharedConnectionHashHash(const void *key, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize); static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size
keysize);
static bool isConnectionThrottlingDisabled(uint32 externalFlags); static bool isConnectionThrottlingDisabled(uint32 externalFlags);
static bool static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool checkLimits, const char *hostname, int port, checkLimits, const char *hostname,
Oid database); int port,
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port); Oid database);
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname, static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int
int port, port);
Oid database); static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const
static void char *
DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port, Oid database); hostname,
int port,
Oid
database);
static void DecrementSharedConnectionCounterInternal(uint32 externalFlags, const
char *hostname, int port, Oid
database);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats); PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
@ -192,26 +199,29 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
LockConnectionSharedMemory(LW_SHARED); LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL; SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash); hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash);
while ((connectionEntry = (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(&status)) != 0) while ((connectionEntry =
(SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(
&status)) != 0)
{ {
/* get ready for the next tuple */ /* get ready for the next tuple */
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
char *databaseName = get_database_name(connectionEntry->key.database); char *databaseName = get_database_name(connectionEntry->key.database);
if (databaseName == NULL) if (databaseName == NULL)
{ {
/* database might have been dropped */ /* database might have been dropped */
continue; continue;
} }
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.workerNodeKey.hostname)); values[0] = PointerGetDatum(cstring_to_text(
values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port); connectionEntry->key.workerNodeKey.hostname));
values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName)); values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->count); values[3] = Int32GetDatum(connectionEntry->count);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
@ -257,10 +267,11 @@ GetMaxSharedPoolSize(void)
return MaxSharedPoolSize; return MaxSharedPoolSize;
} }
int int
GetMaxMaintenanceSharedPoolSize(void) GetMaxMaintenanceSharedPoolSize(void)
{ {
return MaxMaintenanceSharedPoolSize; return MaxMaintenanceSharedPoolSize;
} }
@ -293,7 +304,7 @@ GetLocalSharedPoolSize(void)
void void
WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
{ {
while (!TryToIncrementSharedConnectionCounter(flags, hostname, port)) while (!TryToIncrementSharedConnectionCounter(flags, hostname, port))
{ {
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
@ -303,6 +314,7 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
ConditionVariableCancelSleep(); ConditionVariableCancelSleep();
} }
/* /*
* TryToIncrementSharedConnectionCounter tries to increment the shared * TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in * connection counter for the given nodeId and the current database in
@ -315,28 +327,28 @@ WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
bool bool
TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (isConnectionThrottlingDisabled(flags)) if (isConnectionThrottlingDisabled(flags))
{ {
return true; return true;
} }
/* /*
* The local session might already have some reserved connections to the given * The local session might already have some reserved connections to the given
* node. In that case, we don't need to go through the shared memory. * node. In that case, we don't need to go through the shared memory.
*/ */
Oid userId = GetUserId(); Oid userId = GetUserId();
if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId)) if (CanUseReservedConnection(hostname, port, userId, MyDatabaseId))
{ {
MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId); MarkReservedConnectionUsed(hostname, port, userId, MyDatabaseId);
return true; return true;
} }
return IncrementSharedConnectionCounterInternal(flags, return IncrementSharedConnectionCounterInternal(flags,
true, true,
hostname, hostname,
port, port,
MyDatabaseId); MyDatabaseId);
} }
@ -347,161 +359,165 @@ TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int po
void void
IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (isConnectionThrottlingDisabled(flags)) if (isConnectionThrottlingDisabled(flags))
{ {
return; return;
} }
IncrementSharedConnectionCounterInternal(flags, IncrementSharedConnectionCounterInternal(flags,
false, false,
hostname, hostname,
port, port,
MyDatabaseId); MyDatabaseId);
} }
static bool static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags, IncrementSharedConnectionCounterInternal(uint32 externalFlags,
bool checkLimits, bool checkLimits,
const char *hostname, const char *hostname,
int port, int port,
Oid database) Oid database)
{ {
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
/* /*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for * As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no * memory allocation, so we could get NULL via HASH_ENTER_NULL when there is no
* space in the shared memory. That's why we prefer continuing the execution * space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error. * instead of throwing an error.
*/ */
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname,
bool workerNodeEntryFound = false; port);
SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = bool workerNodeEntryFound = false;
hash_search(SharedWorkerNodeConnStatsHash, SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
&workerNodeKey, hash_search(SharedWorkerNodeConnStatsHash,
HASH_ENTER_NULL, &workerNodeKey,
&workerNodeEntryFound); HASH_ENTER_NULL,
&workerNodeEntryFound);
/* /*
* It is possible to throw an error at this point, but that doesn't help us in anyway. * It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the * Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling. * connection throttling.
*/ */
if (!workerNodeConnectionEntry) if (!workerNodeConnectionEntry)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return true; return true;
} }
if (!workerNodeEntryFound) if (!workerNodeEntryFound)
{ {
/* we successfully allocated the entry for the first time, so initialize it */ /* we successfully allocated the entry for the first time, so initialize it */
workerNodeConnectionEntry->regularConnectionsCount = 0; workerNodeConnectionEntry->regularConnectionsCount = 0;
workerNodeConnectionEntry->maintenanceConnectionsCount = 0; workerNodeConnectionEntry->maintenanceConnectionsCount = 0;
} }
/* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
PrepareWorkerNodeDatabaseHashKey(hostname, port, database); PrepareWorkerNodeDatabaseHashKey(hostname, port, database);
bool workerNodeDatabaseEntryFound = false; bool workerNodeDatabaseEntryFound = false;
SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
hash_search(SharedWorkerNodeDatabaseConnStatsHash, hash_search(SharedWorkerNodeDatabaseConnStatsHash,
&workerNodeDatabaseKey, &workerNodeDatabaseKey,
HASH_ENTER_NULL, HASH_ENTER_NULL,
&workerNodeDatabaseEntryFound); &workerNodeDatabaseEntryFound);
if (!workerNodeDatabaseEntry) if (!workerNodeDatabaseEntry)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return true; return true;
} }
if (!workerNodeDatabaseEntryFound) if (!workerNodeDatabaseEntryFound)
{ {
workerNodeDatabaseEntry->count = 0; workerNodeDatabaseEntry->count = 0;
} }
/* Increment counter if a slot available */ /* Increment counter if a slot available */
bool connectionSlotAvailable = true; bool connectionSlotAvailable = true;
bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
if (checkLimits) if (checkLimits)
{ {
WorkerNode *workerNode = FindWorkerNode(hostname, port); WorkerNode *workerNode = FindWorkerNode(hostname, port);
bool connectionToLocalNode = workerNode && (workerNode->groupId == GetLocalGroupId()); bool connectionToLocalNode = workerNode && (workerNode->groupId ==
GetLocalGroupId());
int currentConnectionsLimit; int currentConnectionsLimit;
int currentConnectionsCount; int currentConnectionsCount;
if (maintenanceConnection) if (maintenanceConnection)
{ {
currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize(); currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize();
currentConnectionsCount = workerNodeConnectionEntry->maintenanceConnectionsCount; currentConnectionsCount =
} workerNodeConnectionEntry->maintenanceConnectionsCount;
else }
{ else
currentConnectionsLimit = connectionToLocalNode {
? GetLocalSharedPoolSize() currentConnectionsLimit = connectionToLocalNode
: GetMaxSharedPoolSize(); ? GetLocalSharedPoolSize()
currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; : GetMaxSharedPoolSize();
} currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount;
}
bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > currentConnectionsLimit; bool remoteNodeLimitExceeded = currentConnectionsCount + 1 >
currentConnectionsLimit;
/* /*
* For local nodes, solely relying on citus.max_shared_pool_size or * For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us * max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish, * a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to * but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends). * precise view of the past (e.g., the active number of client backends).
* *
* Overall, we want to limit both of the metrics. The former limit typically * Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in * kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database * a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks. * is issued lots of concurrent sessions at the same time, such as benchmarks.
*/ */
bool localNodeLimitExceeded = bool localNodeLimitExceeded =
connectionToLocalNode && connectionToLocalNode &&
(GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES ||
GetExternalClientBackendCount() + 1 > currentConnectionsLimit); GetExternalClientBackendCount() + 1 > currentConnectionsLimit);
if (remoteNodeLimitExceeded || localNodeLimitExceeded) if (remoteNodeLimitExceeded || localNodeLimitExceeded)
{ {
connectionSlotAvailable = false; connectionSlotAvailable = false;
} }
} }
if (connectionSlotAvailable) if (connectionSlotAvailable)
{ {
if (maintenanceConnection) if (maintenanceConnection)
{ {
workerNodeConnectionEntry->maintenanceConnectionsCount += 1; workerNodeConnectionEntry->maintenanceConnectionsCount += 1;
} }
else else
{ {
workerNodeConnectionEntry->regularConnectionsCount += 1; workerNodeConnectionEntry->regularConnectionsCount += 1;
} }
workerNodeDatabaseEntry->count += 1; workerNodeDatabaseEntry->count += 1;
} }
if (IsLoggableLevel(DEBUG4)) if (IsLoggableLevel(DEBUG4))
{ {
ereport(DEBUG4, errmsg( ereport(DEBUG4, errmsg(
"Incrementing connection counter. " "Incrementing connection counter. "
"Current regular connections: %i, maintenance connections: %i. " "Current regular connections: %i, maintenance connections: %i. "
"Connection slot to %s:%i database %i is %s", "Connection slot to %s:%i database %i is %s",
workerNodeConnectionEntry->regularConnectionsCount, workerNodeConnectionEntry->regularConnectionsCount,
workerNodeConnectionEntry->maintenanceConnectionsCount, workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname, hostname,
port, port,
database, database,
connectionSlotAvailable ? "available" : "not available" connectionSlotAvailable ? "available" : "not available"
)); ));
} }
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return connectionSlotAvailable; return connectionSlotAvailable;
} }
@ -512,105 +528,109 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
void void
DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port)
{ {
// TODO: possible bug, remove this check? /* TODO: possible bug, remove this check? */
if (isConnectionThrottlingDisabled(externalFlags)) if (isConnectionThrottlingDisabled(externalFlags))
{ {
return; return;
} }
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId);
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection(); WakeupWaiterBackendsForSharedConnection();
} }
static void static void
DecrementSharedConnectionCounterInternal(uint32 externalFlags, DecrementSharedConnectionCounterInternal(uint32 externalFlags,
const char *hostname, const char *hostname,
int port, int port,
Oid database) Oid database)
{ {
bool workerNodeEntryFound = false; bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname,
SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = port);
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound); SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND,
&workerNodeEntryFound);
/* this worker node is removed or updated, no need to care */ /* this worker node is removed or updated, no need to care */
if (!workerNodeEntryFound) if (!workerNodeEntryFound)
{ {
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing "
"connection counter", hostname, port))); "connection counter", hostname, port)));
return; return;
} }
/* we should never go below 0 */ /* we should never go below 0 */
Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 ||
workerNodeConnectionEntry->maintenanceConnectionsCount > 0); workerNodeConnectionEntry->maintenanceConnectionsCount > 0);
if (externalFlags & MAINTENANCE_CONNECTION) if (externalFlags & MAINTENANCE_CONNECTION)
{ {
workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; workerNodeConnectionEntry->maintenanceConnectionsCount -= 1;
} }
else else
{ {
workerNodeConnectionEntry->regularConnectionsCount -= 1; workerNodeConnectionEntry->regularConnectionsCount -= 1;
} }
if (IsLoggableLevel(DEBUG4)) if (IsLoggableLevel(DEBUG4))
{ {
ereport(DEBUG4, errmsg( ereport(DEBUG4, errmsg(
"Decrementing connection counter. " "Decrementing connection counter. "
"Current regular connections: %i, maintenance connections: %i. " "Current regular connections: %i, maintenance connections: %i. "
"Connection slot to %s:%i database %i is released", "Connection slot to %s:%i database %i is released",
workerNodeConnectionEntry->regularConnectionsCount, workerNodeConnectionEntry->regularConnectionsCount,
workerNodeConnectionEntry->maintenanceConnectionsCount, workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname, hostname,
port, port,
database database
)); ));
} }
/* /*
* We don't have to remove at this point as the node might be still active * We don't have to remove at this point as the node might be still active
* and will have new connections open to it. Still, this seems like a convenient * and will have new connections open to it. Still, this seems like a convenient
* place to remove the entry, as count == 0 implies that the server is * place to remove the entry, as count == 0 implies that the server is
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
* we're unlikely to trigger this often. * we're unlikely to trigger this often.
*/ */
if (workerNodeConnectionEntry->regularConnectionsCount == 0 && if (workerNodeConnectionEntry->regularConnectionsCount == 0 &&
workerNodeConnectionEntry->maintenanceConnectionsCount == 0) workerNodeConnectionEntry->maintenanceConnectionsCount == 0)
{ {
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL);
} }
/* /*
* Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey * Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey
*/ */
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId); PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId);
bool workerNodeDatabaseEntryFound = false; bool workerNodeDatabaseEntryFound = false;
SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
hash_search(SharedWorkerNodeDatabaseConnStatsHash, hash_search(SharedWorkerNodeDatabaseConnStatsHash,
&workerNodeDatabaseKey, &workerNodeDatabaseKey,
HASH_FIND, HASH_FIND,
&workerNodeDatabaseEntryFound); &workerNodeDatabaseEntryFound);
if (!workerNodeDatabaseEntryFound) if (!workerNodeDatabaseEntryFound)
{ {
return; return;
} }
Assert(workerNodeDatabaseEntry->count > 0); Assert(workerNodeDatabaseEntry->count > 0);
workerNodeDatabaseEntry->count -= 1; workerNodeDatabaseEntry->count -= 1;
if (workerNodeDatabaseEntry->count == 0) if (workerNodeDatabaseEntry->count == 0)
{ {
hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, HASH_REMOVE, NULL); hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey,
} HASH_REMOVE, NULL);
}
} }
@ -621,7 +641,7 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags,
static void static void
LockConnectionSharedMemory(LWLockMode lockMode) LockConnectionSharedMemory(LWLockMode lockMode)
{ {
LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
} }
@ -706,17 +726,20 @@ SharedConnectionStatsShmemSize(void)
size = add_size(size, sizeof(ConnectionStatsSharedData)); size = add_size(size, sizeof(ConnectionStatsSharedData));
Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked, Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedWorkerNodeConnStatsHashEntry)); sizeof(
SharedWorkerNodeConnStatsHashEntry));
size = add_size(size, workerNodeConnHashSize); size = add_size(size, workerNodeConnHashSize);
Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked * MaxDatabasesPerWorkerNodesTracked, Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked *
sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry)); MaxDatabasesPerWorkerNodesTracked,
sizeof(
SharedWorkerNodeDatabaseConnStatsHashEntry));
size = add_size(size, workerNodeDatabaseConnSize); size = add_size(size, workerNodeDatabaseConnSize);
return size; return size;
} }
@ -756,42 +779,48 @@ SharedConnectionStatsShmemInit(void)
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
} }
/* allocate hash tables */ /* allocate hash tables */
/* create (hostname, port) -> [counter] */ /* create (hostname, port) -> [counter] */
HASHCTL sharedWorkerNodeConnStatsHashInfo; HASHCTL sharedWorkerNodeConnStatsHashInfo;
memset(&sharedWorkerNodeConnStatsHashInfo, 0, sizeof(sharedWorkerNodeConnStatsHashInfo)); memset(&sharedWorkerNodeConnStatsHashInfo, 0,
sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey); sizeof(sharedWorkerNodeConnStatsHashInfo));
sharedWorkerNodeConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeConnStatsHashEntry); sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey);
sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash; sharedWorkerNodeConnStatsHashInfo.entrysize =
sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare; sizeof(SharedWorkerNodeConnStatsHashEntry);
SharedWorkerNodeConnStatsHash = sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash;
ShmemInitHash("Shared Conn. Stats Hash", sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare;
MaxWorkerNodesTracked, SharedWorkerNodeConnStatsHash =
MaxWorkerNodesTracked, ShmemInitHash("Shared Conn. Stats Hash",
&sharedWorkerNodeConnStatsHashInfo, MaxWorkerNodesTracked,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); MaxWorkerNodesTracked,
&sharedWorkerNodeConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
/* create (hostname, port, database) -> [counter] */ /* create (hostname, port, database) -> [counter] */
HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo; HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo;
memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo)); memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0,
sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashKey); sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo));
sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry); sharedWorkerNodeDatabaseConnStatsHashInfo.keysize =
sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash; sizeof(SharedWorkerNodeDatabaseConnStatsHashKey);
sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare; sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize =
sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry);
sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash;
sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare;
int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * MaxDatabasesPerWorkerNodesTracked; int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked *
SharedWorkerNodeDatabaseConnStatsHash = MaxDatabasesPerWorkerNodesTracked;
ShmemInitHash("Shared Conn Per Database. Stats Hash", SharedWorkerNodeDatabaseConnStatsHash =
sharedWorkerNodeDatabaseConnStatsHashSize, ShmemInitHash("Shared Conn Per Database. Stats Hash",
sharedWorkerNodeDatabaseConnStatsHashSize, sharedWorkerNodeDatabaseConnStatsHashSize,
&sharedWorkerNodeDatabaseConnStatsHashInfo, sharedWorkerNodeDatabaseConnStatsHashSize,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); &sharedWorkerNodeDatabaseConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
Assert(SharedWorkerNodeConnStatsHash != NULL); Assert(SharedWorkerNodeConnStatsHash != NULL);
Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL); Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL);
Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0);
if (prev_shmem_startup_hook != NULL) if (prev_shmem_startup_hook != NULL)
@ -889,35 +918,39 @@ ShouldWaitForConnection(int currentConnectionCount)
return false; return false;
} }
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int port)
static SharedWorkerNodeConnStatsHashKey
PrepareWorkerNodeHashKey(const char *hostname, int port)
{ {
SharedWorkerNodeConnStatsHashKey key; SharedWorkerNodeConnStatsHashKey key;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH) if (strlen(hostname) > MAX_NODE_LENGTH)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d", errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH))); MAX_NODE_LENGTH)));
} }
key.port = port; key.port = port;
return key; return key;
} }
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port, static SharedWorkerNodeDatabaseConnStatsHashKey
Oid database) PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database)
{ {
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey;
workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
workerNodeDatabaseKey.database = database; workerNodeDatabaseKey.database = database;
return workerNodeDatabaseKey; return workerNodeDatabaseKey;
} }
static uint32 static uint32
SharedConnectionHashHash(const void *key, Size keysize) SharedConnectionHashHash(const void *key, Size keysize)
{ {
SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key; SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key;
uint32 hash = string_hash(entry->hostname, NAMEDATALEN); uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port)); hash = hash_combine(hash, hash_uint32(entry->port));
@ -925,47 +958,56 @@ SharedConnectionHashHash(const void *key, Size keysize)
return hash; return hash;
} }
static uint32 static uint32
SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize) SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize)
{ {
SharedWorkerNodeDatabaseConnStatsHashKey *entry = (SharedWorkerNodeDatabaseConnStatsHashKey *) key; SharedWorkerNodeDatabaseConnStatsHashKey *entry =
uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN); (SharedWorkerNodeDatabaseConnStatsHashKey *) key;
hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port)); uint32 hash = string_hash(entry->workerNodeKey.hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->database)); hash = hash_combine(hash, hash_uint32(entry->workerNodeKey.port));
hash = hash_combine(hash, hash_uint32(entry->database));
return hash; return hash;
} }
static int static int
SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
{ {
SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a; SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a;
SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b; SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b;
return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port; ca->port != cb->port;
} }
static int static int
SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize)
{ {
SharedWorkerNodeDatabaseConnStatsHashKey *ca = (SharedWorkerNodeDatabaseConnStatsHashKey *) a; SharedWorkerNodeDatabaseConnStatsHashKey *ca =
SharedWorkerNodeDatabaseConnStatsHashKey *cb = (SharedWorkerNodeDatabaseConnStatsHashKey *) b; (SharedWorkerNodeDatabaseConnStatsHashKey *) a;
SharedWorkerNodeDatabaseConnStatsHashKey *cb =
(SharedWorkerNodeDatabaseConnStatsHashKey *) b;
return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname, MAX_NODE_LENGTH) != 0 || return strncmp(ca->workerNodeKey.hostname, cb->workerNodeKey.hostname,
ca->workerNodeKey.port != cb->workerNodeKey.port || MAX_NODE_LENGTH) != 0 ||
ca->database != cb->database; ca->workerNodeKey.port != cb->workerNodeKey.port ||
ca->database != cb->database;
} }
static bool isConnectionThrottlingDisabled(uint32 externalFlags)
static bool
isConnectionThrottlingDisabled(uint32 externalFlags)
{ {
bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
/*
* Do not call Get*PoolSize() functions here, since it may read from /*
* the catalog and we may be in the process exit handler. * Do not call Get*PoolSize() functions here, since it may read from
*/ * the catalog and we may be in the process exit handler.
return maintenanceConnection */
? MaxMaintenanceSharedPoolSize <= 0 return maintenanceConnection
: MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; ? MaxMaintenanceSharedPoolSize <= 0
: MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING;
} }

View File

@ -1993,17 +1993,17 @@ RegisterCitusConfigVariables(void)
GUC_SUPERUSER_ONLY, GUC_SUPERUSER_ONLY,
NULL, NULL, MaxSharedPoolSizeGucShowHook); NULL, NULL, MaxSharedPoolSizeGucShowHook);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_maintenance_shared_pool_size", "citus.max_maintenance_shared_pool_size",
gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections "
"for maintenance operations only." "for maintenance operations only."
"Setting it to 0 or -1 disables maintenance connection throttling."), "Setting it to 0 or -1 disables maintenance connection throttling."),
NULL, NULL,
&MaxMaintenanceSharedPoolSize, &MaxMaintenanceSharedPoolSize,
5, -1, INT_MAX, 5, -1, INT_MAX,
PGC_SIGHUP, PGC_SIGHUP,
GUC_SUPERUSER_ONLY, GUC_SUPERUSER_ONLY,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_worker_nodes_tracked", "citus.max_worker_nodes_tracked",
@ -2024,19 +2024,19 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_databases_per_worker_tracked", "citus.max_databases_per_worker_tracked",
gettext_noop("Sets the amount of databases per worker tracked."), gettext_noop("Sets the amount of databases per worker tracked."),
gettext_noop( gettext_noop(
"This configuration value complements the citus.max_worker_nodes_tracked." "This configuration value complements the citus.max_worker_nodes_tracked."
"It should be used when there are more then one database with Citus in cluster," "It should be used when there are more then one database with Citus in cluster,"
"and, effectively, limits the size of the hash table with connections per worker + database." "and, effectively, limits the size of the hash table with connections per worker + database."
"Currently, it does not affect the connection management logic and serves only statistical purposes."), "Currently, it does not affect the connection management logic and serves only statistical purposes."),
&MaxDatabasesPerWorkerNodesTracked, &MaxDatabasesPerWorkerNodesTracked,
1, 1, INT_MAX, 1, 1, INT_MAX,
PGC_POSTMASTER, PGC_POSTMASTER,
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.metadata_sync_interval", "citus.metadata_sync_interval",

View File

@ -1071,6 +1071,7 @@ citus_pid_for_gpid(PG_FUNCTION_ARGS)
PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID)); PG_RETURN_INT32(ExtractProcessIdFromGlobalPID(globalPID));
} }
/* /*
* ExtractGlobalPID extracts the global process id from the application name and returns it * ExtractGlobalPID extracts the global process id from the application name and returns it
* if the application name is not compatible with Citus' application names returns 0. * if the application name is not compatible with Citus' application names returns 0.

View File

@ -159,7 +159,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
bool recoveryFailed = false; bool recoveryFailed = false;
int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{ {

View File

@ -706,35 +706,35 @@ CitusMaintenanceDaemonMain(Datum main_arg)
timeout = Min(timeout, Recover2PCInterval); timeout = Min(timeout, Recover2PCInterval);
} }
/* /*
* Execute only on the maintenance database, if it configured, otherwise run from every daemon. * Execute only on the maintenance database, if it configured, otherwise run from every daemon.
* The config value -1 disables the distributed deadlock detection * The config value -1 disables the distributed deadlock detection
*/ */
if (DistributedDeadlockDetectionTimeoutFactor != -1.0) if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
{ {
double deadlockTimeout = double deadlockTimeout =
DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout; DistributedDeadlockDetectionTimeoutFactor * (double) DeadlockTimeout;
InvalidateMetadataSystemCache(); InvalidateMetadataSystemCache();
StartTransactionCommand(); StartTransactionCommand();
/* /*
* We skip the deadlock detection if citus extension * We skip the deadlock detection if citus extension
* is not accessible. * is not accessible.
* *
* Similarly, we skip to run the deadlock checks if * Similarly, we skip to run the deadlock checks if
* there exists any version mismatch or the extension * there exists any version mismatch or the extension
* is not fully created yet. * is not fully created yet.
*/ */
if (!LockCitusExtension()) if (!LockCitusExtension())
{ {
ereport(DEBUG1, (errmsg("could not lock the citus extension, " ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping deadlock detection"))); "skipping deadlock detection")));
} }
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{ {
foundDeadlock = CheckForDistributedDeadlocks(); foundDeadlock = CheckForDistributedDeadlocks();
} }
CommitTransactionCommand(); CommitTransactionCommand();
@ -1233,4 +1233,3 @@ MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData)
return metadataSyncTriggered; return metadataSyncTriggered;
} }

View File

@ -125,12 +125,12 @@ enum MultiConnectionMode
*/ */
REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8, REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8,
/* /*
* This flag specifies that connection is required for maintenance operations, e.g. * This flag specifies that connection is required for maintenance operations, e.g.
* transaction recovery, distributed deadlock detection. Such connections have * transaction recovery, distributed deadlock detection. Such connections have
* a reserved quota of the MaxSharedPoolSize. * a reserved quota of the MaxSharedPoolSize.
*/ */
REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 REQUIRE_MAINTENANCE_CONNECTION = 1 << 9
}; };
@ -230,8 +230,8 @@ typedef struct MultiConnection
/* replication option */ /* replication option */
bool requiresReplication; bool requiresReplication;
/* See REQUIRE_MAINTENANCE_CONNECTION */ /* See REQUIRE_MAINTENANCE_CONNECTION */
bool useForMaintenanceOperations; bool useForMaintenanceOperations;
MultiConnectionStructInitializationState initializationState; MultiConnectionStructInitializationState initializationState;
} MultiConnection; } MultiConnection;

View File

@ -18,10 +18,10 @@
enum SharedPoolCounterMode enum SharedPoolCounterMode
{ {
/* /*
* Use this flag to reserve a connection from a maintenance quota * Use this flag to reserve a connection from a maintenance quota
*/ */
MAINTENANCE_CONNECTION = 1 << 0 MAINTENANCE_CONNECTION = 1 << 0
}; };
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
@ -39,11 +39,14 @@ extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern int GetMaxMaintenanceSharedPoolSize(void); extern int GetMaxMaintenanceSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname,
int port);
extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);
extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port); extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname,
extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port); int port);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname,
activeConnectionCount); int port);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode,
int activeConnectionCount);
#endif /* SHARED_CONNECTION_STATS_H */ #endif /* SHARED_CONNECTION_STATS_H */