pull/7286/merge
Ivan Vyazmitinov 2025-02-11 20:22:19 +00:00 committed by GitHub
commit 8cdd42ffdd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1853 additions and 314 deletions

View File

@ -2160,6 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
uint32 connectionFlags = 0;
/* /*
* Colocated intermediate results do not honor citus.max_shared_pool_size, * Colocated intermediate results do not honor citus.max_shared_pool_size,
@ -2181,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
* and cannot switch to local execution (e.g., disabled by user), * and cannot switch to local execution (e.g., disabled by user),
* COPY would fail hinting the user to change the relevant settiing. * COPY would fail hinting the user to change the relevant settiing.
*/ */
EnsureConnectionPossibilityForRemotePrimaryNodes(); EnsureConnectionPossibilityForRemotePrimaryNodes(connectionFlags);
} }
LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
@ -2211,7 +2212,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
*/ */
if (ShardIntervalListHasLocalPlacements(shardIntervalList)) if (ShardIntervalListHasLocalPlacements(shardIntervalList))
{ {
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(
connectionFlags);
copyDest->shouldUseLocalCopy = !reservedConnection; copyDest->shouldUseLocalCopy = !reservedConnection;
} }
} }
@ -3634,7 +3636,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
return connection; return connection;
} }
if (IsReservationPossible()) if (IsReservationPossible(connectionFlags))
{ {
/* /*
* Enforce the requirements for adaptive connection management * Enforce the requirements for adaptive connection management

View File

@ -61,8 +61,8 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection,
cachedConnectionCount); const int cachedConnectionCount);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -354,6 +354,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{
/* Maintenance database may have changed, so cached connection should be closed */
connection->forceCloseAtTransactionEnd = true;
}
return connection; return connection;
} }
} }
@ -377,9 +382,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* these two flags are by nature cannot happen at the same time */ /* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));
int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION
: 0;
if (flags & WAIT_FOR_CONNECTION) if (flags & WAIT_FOR_CONNECTION)
{ {
WaitLoopForSharedConnection(hostname, port); WaitLoopForSharedConnection(sharedCounterFlags, hostname, port);
} }
else if (flags & OPTIONAL_CONNECTION) else if (flags & OPTIONAL_CONNECTION)
{ {
@ -389,7 +397,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* cannot reserve the right to establish a connection, we prefer to * cannot reserve the right to establish a connection, we prefer to
* error out. * error out.
*/ */
if (!TryToIncrementSharedConnectionCounter(hostname, port)) if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port))
{ {
/* do not track the connection anymore */ /* do not track the connection anymore */
dlist_delete(&connection->connectionNode); dlist_delete(&connection->connectionNode);
@ -409,7 +417,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* *
* Still, we keep track of the connection counter. * Still, we keep track of the connection counter.
*/ */
IncrementSharedConnectionCounter(hostname, port); IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port);
} }
@ -423,11 +431,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if (flags & REQUIRE_METADATA_CONNECTION)
if ((flags & REQUIRE_METADATA_CONNECTION))
{ {
connection->useForMetadataOperations = true; connection->useForMetadataOperations = true;
} }
else if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{
connection->useForMaintenanceOperations = true;
connection->forceCloseAtTransactionEnd = true;
}
/* fully initialized the connection, record it */ /* fully initialized the connection, record it */
connection->initializationState = POOL_STATE_INITIALIZED; connection->initializationState = POOL_STATE_INITIALIZED;
@ -495,6 +507,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
continue; continue;
} }
if ((flags & REQUIRE_MAINTENANCE_CONNECTION) &&
!connection->useForMaintenanceOperations)
{
continue;
}
if ((flags & REQUIRE_METADATA_CONNECTION) && if ((flags & REQUIRE_METADATA_CONNECTION) &&
!connection->useForMetadataOperations) !connection->useForMetadataOperations)
{ {
@ -1191,7 +1209,11 @@ CitusPQFinish(MultiConnection *connection)
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED)
{ {
DecrementSharedConnectionCounter(connection->hostname, connection->port); int sharedCounterFlags = (connection->useForMaintenanceOperations)
? MAINTENANCE_CONNECTION
: 0;
DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname,
connection->port);
connection->initializationState = POOL_STATE_NOT_INITIALIZED; connection->initializationState = POOL_STATE_NOT_INITIALIZED;
} }
} }

View File

@ -92,9 +92,11 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *
userId, Oid userId, Oid
databaseOid, databaseOid,
bool *found); bool *found);
static void EnsureConnectionPossibilityForNodeList(List *nodeList); static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32
connectionFlags);
static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode,
bool waitForConnection); bool waitForConnection,
uint32 connectionFlags);
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);
@ -240,7 +242,9 @@ DeallocateReservedConnections(void)
* We have not used this reservation, make sure to clean-up from * We have not used this reservation, make sure to clean-up from
* the shared memory as well. * the shared memory as well.
*/ */
DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); int sharedCounterFlags = 0;
DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname,
entry->key.port);
/* for completeness, set it to true */ /* for completeness, set it to true */
entry->usedReservation = true; entry->usedReservation = true;
@ -295,7 +299,7 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId,
* EnsureConnectionPossibilityForNodeList. * EnsureConnectionPossibilityForNodeList.
*/ */
void void
EnsureConnectionPossibilityForRemotePrimaryNodes(void) EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags)
{ {
/* /*
* By using NoLock there is a tiny risk of that we miss to reserve a * By using NoLock there is a tiny risk of that we miss to reserve a
@ -304,7 +308,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* going to access would be on the new node. * going to access would be on the new node.
*/ */
List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock); List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(remoteNodeList); EnsureConnectionPossibilityForNodeList(remoteNodeList, connectionFlags);
} }
@ -314,7 +318,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* If not, the function returns false. * If not, the function returns false.
*/ */
bool bool
TryConnectionPossibilityForLocalPrimaryNode(void) TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags)
{ {
bool nodeIsInMetadata = false; bool nodeIsInMetadata = false;
WorkerNode *localNode = WorkerNode *localNode =
@ -330,7 +334,8 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
} }
bool waitForConnection = false; bool waitForConnection = false;
return EnsureConnectionPossibilityForNode(localNode, waitForConnection); return EnsureConnectionPossibilityForNode(localNode, waitForConnection,
connectionFlags);
} }
@ -344,7 +349,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
* single reservation per backend) * single reservation per backend)
*/ */
static void static void
EnsureConnectionPossibilityForNodeList(List *nodeList) EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags)
{ {
/* /*
* We sort the workerList because adaptive connection management * We sort the workerList because adaptive connection management
@ -363,7 +368,8 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
foreach_ptr(workerNode, nodeList) foreach_ptr(workerNode, nodeList)
{ {
bool waitForConnection = true; bool waitForConnection = true;
EnsureConnectionPossibilityForNode(workerNode, waitForConnection); EnsureConnectionPossibilityForNode(workerNode, waitForConnection,
connectionFlags);
} }
} }
@ -382,9 +388,10 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
* return false. * return false.
*/ */
static bool static bool
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32
connectionFlags)
{ {
if (!IsReservationPossible()) if (!IsReservationPossible(connectionFlags))
{ {
return false; return false;
} }
@ -439,12 +446,16 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* Increment the shared counter, we may need to wait if there are * Increment the shared counter, we may need to wait if there are
* no space left. * no space left.
*/ */
WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); int sharedCounterFlags = 0;
WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName,
workerNode->workerPort);
} }
else else
{ {
bool incremented = int sharedCounterFlags = 0;
TryToIncrementSharedConnectionCounter(workerNode->workerName, bool incremented = TryToIncrementSharedConnectionCounter(
sharedCounterFlags,
workerNode->workerName,
workerNode->workerPort); workerNode->workerPort);
if (!incremented) if (!incremented)
{ {
@ -475,9 +486,13 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* session is eligible for shared connection reservation. * session is eligible for shared connection reservation.
*/ */
bool bool
IsReservationPossible(void) IsReservationPossible(uint32 connectionFlags)
{ {
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) bool connectionThrottlingDisabled =
connectionFlags & REQUIRE_MAINTENANCE_CONNECTION
? GetMaxMaintenanceSharedPoolSize() <= 0
: GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING;
if (connectionThrottlingDisabled)
{ {
/* connection throttling disabled */ /* connection throttling disabled */
return false; return false;

View File

@ -13,12 +13,12 @@
#include "postgres.h" #include "postgres.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "math.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "access/hash.h" #include "access/hash.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "common/hashfn.h" #include "common/hashfn.h"
#include "storage/ipc.h" #include "storage/ipc.h"
@ -27,19 +27,15 @@
#include "pg_version_constants.h" #include "pg_version_constants.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/locally_reserved_shared_connections.h" #include "distributed/locally_reserved_shared_connections.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#define REMOTE_CONNECTION_STATS_COLUMNS 4 #define REMOTE_CONNECTION_STATS_COLUMNS 4
@ -55,11 +51,18 @@ typedef struct ConnectionStatsSharedData
char *sharedConnectionHashTrancheName; char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock; LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable; ConditionVariable regularConnectionWaitersConditionVariable;
ConditionVariable maintenanceConnectionWaitersConditionVariable;
} ConnectionStatsSharedData; } ConnectionStatsSharedData;
/*
typedef struct SharedConnStatsHashKey * There are two hash tables:
*
* 1. The first one tracks the connection count per worker node and used for the connection throttling
* 2. The second one tracks the connection count per database on a worker node and used for statistics
*
*/
typedef struct SharedWorkerNodeConnStatsHashKey
{ {
/* /*
* We keep the entries in the shared memory even after master_update_node() * We keep the entries in the shared memory even after master_update_node()
@ -68,21 +71,29 @@ typedef struct SharedConnStatsHashKey
*/ */
char hostname[MAX_NODE_LENGTH]; char hostname[MAX_NODE_LENGTH];
int32 port; int32 port;
} SharedWorkerNodeConnStatsHashKey;
/* typedef struct SharedWorkerNodeDatabaseConnStatsHashKey
* Given that citus.shared_max_pool_size can be defined per database, we {
* should keep track of shared connections per database. SharedWorkerNodeConnStatsHashKey workerNodeKey;
*/ Oid database;
Oid databaseOid; } SharedWorkerNodeDatabaseConnStatsHashKey;
} SharedConnStatsHashKey;
/* hash entry for per worker stats */ /* hash entry for per worker stats */
typedef struct SharedConnStatsHashEntry typedef struct SharedWorkerNodeConnStatsHashEntry
{ {
SharedConnStatsHashKey key; SharedWorkerNodeConnStatsHashKey key;
int connectionCount; int regularConnectionsCount;
} SharedConnStatsHashEntry; int maintenanceConnectionsCount;
} SharedWorkerNodeConnStatsHashEntry;
/* hash entry for per database on worker stats */
typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry
{
SharedWorkerNodeDatabaseConnStatsHashKey key;
int count;
} SharedWorkerNodeDatabaseConnStatsHashEntry;
/* /*
@ -93,6 +104,14 @@ typedef struct SharedConnStatsHashEntry
*/ */
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
/*
* Controlled via a GUC, never access directly, use GetMaxMaintenanceSharedPoolSize().
* Pool size for maintenance connections exclusively
* "0" or "-1" means do not apply connection throttling
*/
int MaxMaintenanceSharedPoolSize = -1;
int MaintenanceConnectionPoolTimeout = 30 * MS_PER_SECOND;
/* /*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
* "0" means adjust LocalSharedPoolSize automatically by using MaxConnections. * "0" means adjust LocalSharedPoolSize automatically by using MaxConnections.
@ -106,7 +125,8 @@ int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS;
/* the following two structs are used for accessing shared memory */ /* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedWorkerNodeConnStatsHash = NULL;
static HTAB *SharedWorkerNodeDatabaseConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -121,6 +141,25 @@ static void UnLockConnectionSharedMemory(void);
static bool ShouldWaitForConnection(int currentConnectionCount); static bool ShouldWaitForConnection(int currentConnectionCount);
static uint32 SharedConnectionHashHash(const void *key, Size keysize); static uint32 SharedConnectionHashHash(const void *key, Size keysize);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize);
static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size
keysize);
static bool isConnectionThrottlingDisabled(uint32 externalFlags);
static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool
checkLimits, const char *hostname,
int port,
Oid database);
static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int
port);
static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const
char *
hostname,
int port,
Oid
database);
static void DecrementSharedConnectionCounterInternal(uint32 externalFlags, const
char *hostname, int port, Oid
database);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats); PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
@ -160,26 +199,29 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri
LockConnectionSharedMemory(LW_SHARED); LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL; SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL;
hash_seq_init(&status, SharedConnStatsHash); hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash);
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) while ((connectionEntry =
(SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search(
&status)) != 0)
{ {
/* get ready for the next tuple */ /* get ready for the next tuple */
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
char *databaseName = get_database_name(connectionEntry->key.databaseOid); char *databaseName = get_database_name(connectionEntry->key.database);
if (databaseName == NULL) if (databaseName == NULL)
{ {
/* database might have been dropped */ /* database might have been dropped */
continue; continue;
} }
values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); values[0] = PointerGetDatum(cstring_to_text(
values[1] = Int32GetDatum(connectionEntry->key.port); connectionEntry->key.workerNodeKey.hostname));
values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port);
values[2] = PointerGetDatum(cstring_to_text(databaseName)); values[2] = PointerGetDatum(cstring_to_text(databaseName));
values[3] = Int32GetDatum(connectionEntry->connectionCount); values[3] = Int32GetDatum(connectionEntry->count);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
@ -226,6 +268,13 @@ GetMaxSharedPoolSize(void)
} }
int
GetMaxMaintenanceSharedPoolSize(void)
{
return MaxMaintenanceSharedPoolSize;
}
/* /*
* GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is * GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is
* controlled via a GUC. * controlled via a GUC.
@ -248,18 +297,18 @@ GetLocalSharedPoolSize(void)
/* /*
* WaitLoopForSharedConnection tries to increment the shared connection * WaitLoopForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in * counter for the given hostname/port and the current database in
* SharedConnStatsHash. * SharedWorkerNodeConnStatsHash.
* *
* The function implements a retry mechanism via a condition variable. * The function implements a retry mechanism via a condition variable.
*/ */
void void
WaitLoopForSharedConnection(const char *hostname, int port) WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port)
{ {
while (!TryToIncrementSharedConnectionCounter(hostname, port)) while (!TryToIncrementSharedConnectionCounter(flags, hostname, port))
{ {
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
WaitForSharedConnection(); WaitForSharedConnection(flags);
} }
ConditionVariableCancelSleep(); ConditionVariableCancelSleep();
@ -269,32 +318,20 @@ WaitLoopForSharedConnection(const char *hostname, int port)
/* /*
* TryToIncrementSharedConnectionCounter tries to increment the shared * TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in * connection counter for the given nodeId and the current database in
* SharedConnStatsHash. * SharedWorkerNodeConnStatsHash.
* *
* If the function returns true, the caller is allowed (and expected) * If the function returns true, the caller is allowed (and expected)
* to establish a new connection to the given node. Else, the caller * to establish a new connection to the given node. Else, the caller
* is not allowed to establish a new connection. * is not allowed to establish a new connection.
*/ */
bool bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port) TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
{ {
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) if (isConnectionThrottlingDisabled(flags))
{ {
/* connection throttling disabled */
return true; return true;
} }
bool counterIncremented = false;
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
/* /*
* The local session might already have some reserved connections to the given * The local session might already have some reserved connections to the given
* node. In that case, we don't need to go through the shared memory. * node. In that case, we don't need to go through the shared memory.
@ -307,35 +344,41 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
return true; return true;
} }
connKey.port = port; return IncrementSharedConnectionCounterInternal(flags,
connKey.databaseOid = MyDatabaseId; true,
hostname,
port,
MyDatabaseId);
}
/*
* Handle adaptive connection management for the local node slightly different /*
* as local node can failover to local execution. * IncrementSharedConnectionCounter increments the shared counter
* for the given hostname and port.
*/ */
bool connectionToLocalNode = false; void
int activeBackendCount = 0; IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port)
WorkerNode *workerNode = FindWorkerNode(hostname, port); {
if (workerNode) if (isConnectionThrottlingDisabled(flags))
{ {
connectionToLocalNode = (workerNode->groupId == GetLocalGroupId()); return;
if (connectionToLocalNode &&
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES)
{
/*
* This early return is required as LocalNodeParallelExecutionFactor
* is ignored for the first connection below. This check makes the
* user experience is more accurate and also makes it easy for
* having regression tests which emulates the local node adaptive
* connection management.
*/
return false;
} }
activeBackendCount = GetExternalClientBackendCount(); IncrementSharedConnectionCounterInternal(flags,
} false,
hostname,
port,
MyDatabaseId);
}
static bool
IncrementSharedConnectionCounterInternal(uint32 externalFlags,
bool checkLimits,
const char *hostname,
int port,
Oid database)
{
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
/* /*
@ -344,30 +387,83 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
* space in the shared memory. That's why we prefer continuing the execution * space in the shared memory. That's why we prefer continuing the execution
* instead of throwing an error. * instead of throwing an error.
*/ */
bool entryFound = false; SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname,
SharedConnStatsHashEntry *connectionEntry = port);
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
hash_search(SharedWorkerNodeConnStatsHash,
&workerNodeKey,
HASH_ENTER_NULL,
&workerNodeEntryFound);
/* /*
* It is possible to throw an error at this point, but that doesn't help us in anyway. * It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the * Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling. * connection throttling.
*/ */
if (!connectionEntry) if (!workerNodeConnectionEntry)
{ {
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return true; return true;
} }
if (!entryFound) if (!workerNodeEntryFound)
{ {
/* we successfully allocated the entry for the first time, so initialize it */ /* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 1; workerNodeConnectionEntry->regularConnectionsCount = 0;
workerNodeConnectionEntry->maintenanceConnectionsCount = 0;
counterIncremented = true;
} }
else if (connectionToLocalNode)
/* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
PrepareWorkerNodeDatabaseHashKey(hostname, port, database);
bool workerNodeDatabaseEntryFound = false;
SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
hash_search(SharedWorkerNodeDatabaseConnStatsHash,
&workerNodeDatabaseKey,
HASH_ENTER_NULL,
&workerNodeDatabaseEntryFound);
if (!workerNodeDatabaseEntry)
{ {
UnLockConnectionSharedMemory();
return true;
}
if (!workerNodeDatabaseEntryFound)
{
workerNodeDatabaseEntry->count = 0;
}
/* Increment counter if a slot available */
bool connectionSlotAvailable = true;
bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
if (checkLimits)
{
WorkerNode *workerNode = FindWorkerNode(hostname, port);
bool connectionToLocalNode = workerNode && (workerNode->groupId ==
GetLocalGroupId());
int currentConnectionsLimit;
int currentConnectionsCount;
if (maintenanceConnection)
{
currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize();
currentConnectionsCount =
workerNodeConnectionEntry->maintenanceConnectionsCount;
}
else
{
currentConnectionsLimit = connectionToLocalNode
? GetLocalSharedPoolSize()
: GetMaxSharedPoolSize();
currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount;
}
bool currentConnectionsLimitExceeded = currentConnectionsCount + 1 >
currentConnectionsLimit;
/* /*
* For local nodes, solely relying on citus.max_shared_pool_size or * For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us * max_connections might not be sufficient. The former gives us
@ -380,98 +476,49 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
* a reasonable pace. The latter limit typically kicks in when the database * a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks. * is issued lots of concurrent sessions at the same time, such as benchmarks.
*/ */
if (activeBackendCount + 1 > GetLocalSharedPoolSize()) bool localNodeConnectionsLimitExceeded =
connectionToLocalNode &&
(GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES ||
GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize());
if (currentConnectionsLimitExceeded || localNodeConnectionsLimitExceeded)
{ {
counterIncremented = false; connectionSlotAvailable = false;
} }
else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize()) }
if (connectionSlotAvailable)
{ {
counterIncremented = false; if (maintenanceConnection)
{
workerNodeConnectionEntry->maintenanceConnectionsCount += 1;
} }
else else
{ {
connectionEntry->connectionCount++; workerNodeConnectionEntry->regularConnectionsCount += 1;
counterIncremented = true;
} }
workerNodeDatabaseEntry->count += 1;
} }
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
if (IsLoggableLevel(DEBUG4))
{ {
/* there is no space left for this connection */ ereport(DEBUG4, errmsg(
counterIncremented = false; "Incrementing %s connection counter. "
} "Current regular connections: %i, maintenance connections: %i. "
else "Connection slot to %s:%i database %i is %s",
{ maintenanceConnection ? "maintenance" : "regular",
connectionEntry->connectionCount++; workerNodeConnectionEntry->regularConnectionsCount,
counterIncremented = true; workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname,
port,
database,
connectionSlotAvailable ? "available" : "not available"
));
} }
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return counterIncremented;
}
return connectionSlotAvailable;
/*
* IncrementSharedConnectionCounter increments the shared counter
* for the given hostname and port.
*/
void
IncrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
* As the hash map is allocated in shared memory, it doesn't rely on palloc for
* memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer
* continuing the execution instead of throwing an error.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing "
"connection counter", hostname, port)));
return;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 0;
}
connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory();
} }
@ -480,77 +527,119 @@ IncrementSharedConnectionCounter(const char *hostname, int port)
* for the given hostname and port for the given count. * for the given hostname and port for the given count.
*/ */
void void
DecrementSharedConnectionCounter(const char *hostname, int port) DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port)
{ {
SharedConnStatsHashKey connKey; /* TODO: possible bug, remove this check? */
if (isConnectionThrottlingDisabled(externalFlags))
/*
* Do not call GetMaxSharedPoolSize() here, since it may read from
* the catalog and we may be in the process exit handler.
*/
if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING)
{ {
/* connection throttling disabled */
return; return;
} }
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false; DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId);
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection(externalFlags);
}
static void
DecrementSharedConnectionCounterInternal(uint32 externalFlags,
const char *hostname,
int port,
Oid database)
{
bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname,
port);
SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND,
&workerNodeEntryFound);
/* this worker node is removed or updated, no need to care */ /* this worker node is removed or updated, no need to care */
if (!entryFound) if (!workerNodeEntryFound)
{ {
UnLockConnectionSharedMemory();
/* wake up any waiters in case any backend is waiting for this node */
WakeupWaiterBackendsForSharedConnection();
ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing "
"connection counter", hostname, port))); "connection counter", hostname, port)));
return; return;
} }
/* we should never go below 0 */ /* we should never go below 0 */
Assert(connectionEntry->connectionCount > 0); Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 ||
workerNodeConnectionEntry->maintenanceConnectionsCount > 0);
connectionEntry->connectionCount -= 1; bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
if (maintenanceConnection)
if (connectionEntry->connectionCount == 0)
{ {
workerNodeConnectionEntry->maintenanceConnectionsCount -= 1;
}
else
{
workerNodeConnectionEntry->regularConnectionsCount -= 1;
}
if (IsLoggableLevel(DEBUG4))
{
ereport(DEBUG4, errmsg(
"Decrementing %s connection counter. "
"Current regular connections: %i, maintenance connections: %i. "
"Connection slot to %s:%i database %i is released",
maintenanceConnection ? "maintenance" : "regular",
workerNodeConnectionEntry->regularConnectionsCount,
workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname,
port,
database
));
}
/* /*
* We don't have to remove at this point as the node might be still active * We don't have to remove at this point as the node might be still active
* and will have new connections open to it. Still, this seems like a convenient * and will have new connections open to it. Still, this seems like a convenient
* place to remove the entry, as connectionCount == 0 implies that the server is * place to remove the entry, as count == 0 implies that the server is
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
* we're unlikely to trigger this often. * we're unlikely to trigger this often.
*/ */
hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); if (workerNodeConnectionEntry->regularConnectionsCount == 0 &&
workerNodeConnectionEntry->maintenanceConnectionsCount == 0)
{
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL);
} }
UnLockConnectionSharedMemory(); /*
* Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey
*/
WakeupWaiterBackendsForSharedConnection(); SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey =
PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId);
bool workerNodeDatabaseEntryFound = false;
SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry =
hash_search(SharedWorkerNodeDatabaseConnStatsHash,
&workerNodeDatabaseKey,
HASH_FIND,
&workerNodeDatabaseEntryFound);
if (!workerNodeDatabaseEntryFound)
{
return;
}
Assert(workerNodeDatabaseEntry->count > 0);
workerNodeDatabaseEntry->count -= 1;
if (workerNodeDatabaseEntry->count == 0)
{
hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey,
HASH_REMOVE, NULL);
}
} }
/* /*
* LockConnectionSharedMemory is a utility function that should be used when * LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory. * accessing to the SharedWorkerNodeConnStatsHash, which is in the shared memory.
*/ */
static void static void
LockConnectionSharedMemory(LWLockMode lockMode) LockConnectionSharedMemory(LWLockMode lockMode)
@ -587,9 +676,18 @@ UnLockConnectionSharedMemory(void)
* this function. * this function.
*/ */
void void
WakeupWaiterBackendsForSharedConnection(void) WakeupWaiterBackendsForSharedConnection(uint32 flags)
{ {
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); if (flags & MAINTENANCE_CONNECTION)
{
ConditionVariableBroadcast(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable);
}
else
{
ConditionVariableBroadcast(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable);
}
} }
@ -600,10 +698,30 @@ WakeupWaiterBackendsForSharedConnection(void)
* WakeupWaiterBackendsForSharedConnection(). * WakeupWaiterBackendsForSharedConnection().
*/ */
void void
WaitForSharedConnection(void) WaitForSharedConnection(uint32 flags)
{ {
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, if (flags & MAINTENANCE_CONNECTION)
{
bool connectionSlotNotAcquired = ConditionVariableTimedSleep(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable,
MaintenanceConnectionPoolTimeout,
PG_WAIT_EXTENSION); PG_WAIT_EXTENSION);
if (connectionSlotNotAcquired)
{
ereport(ERROR, (errmsg("Failed to acquire maintenance connection for %i ms",
MaintenanceConnectionPoolTimeout),
errhint(
"Try increasing citus.max_maintenance_shared_pool_size or "
"citus.maintenance_connection_pool_timeout"
)));
}
}
else
{
ConditionVariableSleep(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable,
PG_WAIT_EXTENSION);
}
} }
@ -640,10 +758,18 @@ SharedConnectionStatsShmemSize(void)
size = add_size(size, sizeof(ConnectionStatsSharedData)); size = add_size(size, sizeof(ConnectionStatsSharedData));
Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedConnStatsHashEntry)); sizeof(
SharedWorkerNodeConnStatsHashEntry));
size = add_size(size, hashSize); size = add_size(size, workerNodeConnHashSize);
Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked *
MaxDatabasesPerWorkerNodesTracked,
sizeof(
SharedWorkerNodeDatabaseConnStatsHashEntry));
size = add_size(size, workerNodeDatabaseConnSize);
return size; return size;
} }
@ -657,15 +783,6 @@ void
SharedConnectionStatsShmemInit(void) SharedConnectionStatsShmemInit(void)
{ {
bool alreadyInitialized = false; bool alreadyInitialized = false;
HASHCTL info;
/* create (hostname, port, database) -> [counter] */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(SharedConnStatsHashKey);
info.entrysize = sizeof(SharedConnStatsHashEntry);
info.hash = SharedConnectionHashHash;
info.match = SharedConnectionHashCompare;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
/* /*
* Currently the lock isn't required because allocation only happens at * Currently the lock isn't required because allocation only happens at
@ -691,17 +808,54 @@ SharedConnectionStatsShmemInit(void)
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId); ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); ConditionVariableInit(
&ConnectionStatsSharedState->regularConnectionWaitersConditionVariable);
ConditionVariableInit(
&ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable);
} }
/* allocate hash table */ /* allocate hash tables */
SharedConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, /* create (hostname, port) -> [counter] */
MaxWorkerNodesTracked, &info, hashFlags); HASHCTL sharedWorkerNodeConnStatsHashInfo;
memset(&sharedWorkerNodeConnStatsHashInfo, 0,
sizeof(sharedWorkerNodeConnStatsHashInfo));
sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey);
sharedWorkerNodeConnStatsHashInfo.entrysize =
sizeof(SharedWorkerNodeConnStatsHashEntry);
sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash;
sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare;
SharedWorkerNodeConnStatsHash =
ShmemInitHash("Shared Conn. Stats Hash",
MaxWorkerNodesTracked,
MaxWorkerNodesTracked,
&sharedWorkerNodeConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
/* create (hostname, port, database) -> [counter] */
HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo;
memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0,
sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo));
sharedWorkerNodeDatabaseConnStatsHashInfo.keysize =
sizeof(SharedWorkerNodeDatabaseConnStatsHashKey);
sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize =
sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry);
sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash;
sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare;
int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked *
MaxDatabasesPerWorkerNodesTracked;
SharedWorkerNodeDatabaseConnStatsHash =
ShmemInitHash("Shared Conn Per Database. Stats Hash",
sharedWorkerNodeDatabaseConnStatsHashSize,
sharedWorkerNodeDatabaseConnStatsHashSize,
&sharedWorkerNodeDatabaseConnStatsHashInfo,
(HASH_ELEM | HASH_FUNCTION | HASH_COMPARE));
LWLockRelease(AddinShmemInitLock); LWLockRelease(AddinShmemInitLock);
Assert(SharedConnStatsHash != NULL); Assert(SharedWorkerNodeConnStatsHash != NULL);
Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL);
Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0);
if (prev_shmem_startup_hook != NULL) if (prev_shmem_startup_hook != NULL)
@ -800,14 +954,53 @@ ShouldWaitForConnection(int currentConnectionCount)
} }
static SharedWorkerNodeConnStatsHashKey
PrepareWorkerNodeHashKey(const char *hostname, int port)
{
SharedWorkerNodeConnStatsHashKey key;
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
return key;
}
static SharedWorkerNodeDatabaseConnStatsHashKey
PrepareWorkerNodeDatabaseHashKey(const char *hostname,
int port,
Oid database)
{
SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey;
workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
workerNodeDatabaseKey.database = database;
return workerNodeDatabaseKey;
}
static uint32 static uint32
SharedConnectionHashHash(const void *key, Size keysize) SharedConnectionHashHash(const void *key, Size keysize)
{ {
SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key;
uint32 hash = string_hash(entry->hostname, NAMEDATALEN); uint32 hash = string_hash(entry->hostname, NAMEDATALEN);
hash = hash_combine(hash, hash_uint32(entry->port)); hash = hash_combine(hash, hash_uint32(entry->port));
hash = hash_combine(hash, hash_uint32(entry->databaseOid));
return hash;
}
static uint32
SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize)
{
SharedWorkerNodeDatabaseConnStatsHashKey *entry =
(SharedWorkerNodeDatabaseConnStatsHashKey *) key;
uint32 hash = SharedConnectionHashHash(&(entry->workerNodeKey), keysize);
hash = hash_combine(hash, hash_uint32(entry->database));
return hash; return hash;
} }
@ -816,17 +1009,39 @@ SharedConnectionHashHash(const void *key, Size keysize)
static int static int
SharedConnectionHashCompare(const void *a, const void *b, Size keysize) SharedConnectionHashCompare(const void *a, const void *b, Size keysize)
{ {
SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a;
SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b;
if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 ||
ca->port != cb->port || ca->port != cb->port;
ca->databaseOid != cb->databaseOid) }
{
return 1;
} static int
else SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize)
{ {
return 0; SharedWorkerNodeDatabaseConnStatsHashKey *ca =
} (SharedWorkerNodeDatabaseConnStatsHashKey *) a;
SharedWorkerNodeDatabaseConnStatsHashKey *cb =
(SharedWorkerNodeDatabaseConnStatsHashKey *) b;
int sharedConnectionHashCompare =
SharedConnectionHashCompare(&(ca->workerNodeKey), &(cb->workerNodeKey), keysize);
return sharedConnectionHashCompare ||
ca->database != cb->database;
}
static bool
isConnectionThrottlingDisabled(uint32 externalFlags)
{
bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION;
/*
* Do not call Get*PoolSize() functions here, since it may read from
* the catalog and we may be in the process exit handler.
*/
return maintenanceConnection
? MaxMaintenanceSharedPoolSize <= 0
: MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING;
} }

View File

@ -39,6 +39,7 @@
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
char *WorkerListFileName; char *WorkerListFileName;
int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */
int MaxDatabasesPerWorkerNodesTracked = 1; /* determine database per worker hash table size */
/* Local functions forward declarations */ /* Local functions forward declarations */

View File

@ -1844,6 +1844,19 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.maintenance_connection_pool_timeout",
gettext_noop(
"Timeout for acquiring a connection from a maintenance shared pool size. "
"Applicable only when the maintenance pool is enabled via citus.max_maintenance_shared_pool_size. "
"Setting it to 0 or -1 disables the timeout"),
NULL,
&MaintenanceConnectionPoolTimeout,
30 * MS_PER_SECOND, -1, MS_PER_HOUR,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_adaptive_executor_pool_size", "citus.max_adaptive_executor_pool_size",
gettext_noop("Sets the maximum number of connections per worker node used by " gettext_noop("Sets the maximum number of connections per worker node used by "
@ -1933,6 +1946,20 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_databases_per_worker_tracked",
gettext_noop("Sets the amount of databases per worker tracked."),
gettext_noop(
"This configuration value complements the citus.max_worker_nodes_tracked."
"It should be used when there are more then one database with Citus in cluster,"
"and, effectively, limits the size of the hash table with connections per worker + database."
"Currently, it does not affect the connection management logic and serves only statistical purposes."),
&MaxDatabasesPerWorkerNodesTracked,
1, 1, INT_MAX,
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_high_priority_background_processes", "citus.max_high_priority_background_processes",
gettext_noop("Sets the maximum number of background processes " gettext_noop("Sets the maximum number of background processes "
@ -1958,6 +1985,18 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB | GUC_STANDARD, GUC_UNIT_KB | GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_maintenance_shared_pool_size",
gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections "
"for maintenance operations only. "
"Setting it to 0 or -1 disables maintenance connection throttling."),
NULL,
&MaxMaintenanceSharedPoolSize,
-1, -1, INT_MAX,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_matview_size_to_auto_recreate", "citus.max_matview_size_to_auto_recreate",
gettext_noop("Sets the maximum size of materialized views in MB to " gettext_noop("Sets the maximum size of materialized views in MB to "

View File

@ -33,7 +33,8 @@ PG_FUNCTION_INFO_V1(set_max_shared_pool_size);
Datum Datum
wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) wake_up_connection_pool_waiters(PG_FUNCTION_ARGS)
{ {
WakeupWaiterBackendsForSharedConnection(); WakeupWaiterBackendsForSharedConnection(0);
WakeupWaiterBackendsForSharedConnection(MAINTENANCE_CONNECTION);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -153,7 +153,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = 0; int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION;
if (workerNode->groupId == localGroupId) if (workerNode->groupId == localGroupId)
{ {

View File

@ -160,7 +160,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
bool recoveryFailed = false; bool recoveryFailed = false;
int connectionFlags = 0; int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
{ {

View File

@ -49,6 +49,7 @@
#include "distributed/background_jobs.h" #include "distributed/background_jobs.h"
#include "distributed/citus_safe_lib.h" #include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/distributed_deadlock_detection.h" #include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
@ -89,12 +90,14 @@ typedef struct MaintenanceDaemonDBData
Oid userOid; Oid userOid;
pid_t workerPid; pid_t workerPid;
bool daemonStarted; bool daemonStarted;
bool daemonShuttingDown;
bool triggerNodeMetadataSync; bool triggerNodeMetadataSync;
Latch *latch; /* pointer to the background worker's latch */ Latch *latch; /* pointer to the background worker's latch */
} MaintenanceDaemonDBData; } MaintenanceDaemonDBData;
/* config variable for distributed deadlock detection timeout */ /* config variable for distributed deadlock detection timeout */
double DistributedDeadlockDetectionTimeoutFactor = 2.0; double DistributedDeadlockDetectionTimeoutFactor = 2.0;
char *MaintenanceManagementDatabase = "";
int Recover2PCInterval = 60000; int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000; int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000; int BackgroundTaskQueueCheckInterval = 5000;
@ -118,7 +121,7 @@ static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false; static volatile sig_atomic_t got_SIGTERM = false;
/* set to true when becoming a maintenance daemon */ /* set to true when becoming a maintenance daemon */
static bool IsMaintenanceDaemon = false; bool IsMaintenanceDaemon = false;
static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
@ -241,6 +244,14 @@ InitializeMaintenanceDaemonBackend(void)
return; return;
} }
if (dbData->daemonShuttingDown)
{
elog(DEBUG1, "Another maintenance daemon for database %u is shutting down. "
"Aborting current initialization", MyDatabaseId);
LWLockRelease(&MaintenanceDaemonControl->lock);
return;
}
if (IsMaintenanceDaemon) if (IsMaintenanceDaemon)
{ {
/* /*
@ -1056,20 +1067,11 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid, hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL); HASH_REMOVE, NULL);
/* myDbData is NULL after StopMaintenanceDaemon */ /* Workaround for -Werror=unused-variable */
if (myDbData != NULL) (void) myDbData;
{
/*
* Confirm that I am still the registered maintenance daemon before exiting.
*/
Assert(myDbData->workerPid == MyProcPid); Assert(myDbData->workerPid == MyProcPid);
myDbData->daemonStarted = false;
myDbData->workerPid = 0;
}
LWLockRelease(&MaintenanceDaemonControl->lock); LWLockRelease(&MaintenanceDaemonControl->lock);
} }
@ -1168,11 +1170,12 @@ StopMaintenanceDaemon(Oid databaseId)
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash, MaintenanceDaemonDBHash,
&databaseId, &databaseId,
HASH_REMOVE, &found); HASH_FIND, &found);
if (found) if (found)
{ {
workerPid = dbData->workerPid; workerPid = dbData->workerPid;
dbData->daemonShuttingDown = true;
} }
LWLockRelease(&MaintenanceDaemonControl->lock); LWLockRelease(&MaintenanceDaemonControl->lock);

View File

@ -123,7 +123,14 @@ enum MultiConnectionMode
* *
* This is need to run 'CREATE_REPLICATION_SLOT' command. * This is need to run 'CREATE_REPLICATION_SLOT' command.
*/ */
REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8,
/*
* This flag specifies that connection is required for maintenance operations, e.g.
* transaction recovery, distributed deadlock detection. Such connections have
* a reserved quota of the MaxSharedPoolSize.
*/
REQUIRE_MAINTENANCE_CONNECTION = 1 << 9
}; };
@ -223,6 +230,9 @@ typedef struct MultiConnection
/* replication option */ /* replication option */
bool requiresReplication; bool requiresReplication;
/* See REQUIRE_MAINTENANCE_CONNECTION */
bool useForMaintenanceOperations;
MultiConnectionStructInitializationState initializationState; MultiConnectionStructInitializationState initializationState;
} MultiConnection; } MultiConnection;

View File

@ -20,8 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort,
extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort,
Oid userId, Oid databaseOid); Oid userId, Oid databaseOid);
extern void DeallocateReservedConnections(void); extern void DeallocateReservedConnections(void);
extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); extern void EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags);
extern bool TryConnectionPossibilityForLocalPrimaryNode(void); extern bool TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags);
extern bool IsReservationPossible(void); extern bool IsReservationPossible(uint32 connectionFlags);
#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */

View File

@ -12,6 +12,8 @@
#ifndef MAINTENANCED_H #ifndef MAINTENANCED_H
#define MAINTENANCED_H #define MAINTENANCED_H
#include "commands/dbcommands.h"
/* collect statistics every 24 hours */ /* collect statistics every 24 hours */
#define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000) #define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000)

View File

@ -16,25 +16,38 @@
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
#define ALLOW_ALL_EXTERNAL_CONNECTIONS -1 #define ALLOW_ALL_EXTERNAL_CONNECTIONS -1
enum SharedPoolCounterMode
{
/*
* Use this flag to reserve a connection from a maintenance quota
*/
MAINTENANCE_CONNECTION = 1 << 0
};
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
extern int MaxMaintenanceSharedPoolSize;
extern int MaintenanceConnectionPoolTimeout;
extern int LocalSharedPoolSize; extern int LocalSharedPoolSize;
extern int MaxClientConnections; extern int MaxClientConnections;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void); extern void WaitForSharedConnection(uint32);
extern void WakeupWaiterBackendsForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(uint32);
extern size_t SharedConnectionStatsShmemSize(void); extern size_t SharedConnectionStatsShmemSize(void);
extern void SharedConnectionStatsShmemInit(void); extern void SharedConnectionStatsShmemInit(void);
extern int GetMaxClientConnections(void); extern int GetMaxClientConnections(void);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern int GetMaxMaintenanceSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void); extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname,
extern void WaitLoopForSharedConnection(const char *hostname, int port); int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port); extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname,
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int int port);
activeConnectionCount); extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname,
int port);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode,
int activeConnectionCount);
#endif /* SHARED_CONNECTION_STATS_H */ #endif /* SHARED_CONNECTION_STATS_H */

View File

@ -59,6 +59,7 @@ typedef struct WorkerNode
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
extern int MaxWorkerNodesTracked; extern int MaxWorkerNodesTracked;
extern int MaxDatabasesPerWorkerNodesTracked;
extern char *WorkerListFileName; extern char *WorkerListFileName;
extern char *CurrentCluster; extern char *CurrentCluster;

View File

@ -48,8 +48,6 @@ get_guc_variables_compat(int *gucCount)
#define pgstat_fetch_stat_local_beentry(a) pgstat_get_local_beentry_by_index(a) #define pgstat_fetch_stat_local_beentry(a) pgstat_get_local_beentry_by_index(a)
#define have_createdb_privilege() have_createdb_privilege()
#else #else
#include "miscadmin.h" #include "miscadmin.h"

View File

@ -823,7 +823,7 @@ class Postgres(QueryRunner):
# of our tests # of our tests
pgconf.write("max_logical_replication_workers = 50\n") pgconf.write("max_logical_replication_workers = 50\n")
pgconf.write("max_wal_senders = 50\n") pgconf.write("max_wal_senders = 50\n")
pgconf.write("max_worker_processes = 50\n") pgconf.write("max_worker_processes = 150\n")
pgconf.write("max_replication_slots = 50\n") pgconf.write("max_replication_slots = 50\n")
# We need to make the log go to stderr so that the tests can # We need to make the log go to stderr so that the tests can
@ -846,6 +846,8 @@ class Postgres(QueryRunner):
# happened # happened
pgconf.write("restart_after_crash = off\n") pgconf.write("restart_after_crash = off\n")
# prevent tests from hanging
pgconf.write("statement_timeout= '5min'\n")
os.truncate(self.hba_path, 0) os.truncate(self.hba_path, 0)
self.ssl_access("all", "trust") self.ssl_access("all", "trust")
self.nossl_access("all", "trust") self.nossl_access("all", "trust")
@ -977,6 +979,14 @@ class Postgres(QueryRunner):
for config in configs: for config in configs:
self.sql(f"alter system set {config}") self.sql(f"alter system set {config}")
def reset_configuration(self, *configs):
"""Reset specific Postgres settings using ALTER SYSTEM RESET
NOTE: after configuring a call to reload or restart is needed for the
settings to become effective.
"""
for config in configs:
self.sql(f"alter system reset {config}")
def log_handle(self): def log_handle(self):
"""Returns the opened logfile at the current end of the log """Returns the opened logfile at the current end of the log
@ -1018,6 +1028,15 @@ class Postgres(QueryRunner):
self.databases.add(name) self.databases.add(name)
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name))) self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
def drop_database(self, name):
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=name)
self.sql(
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier(name)
)
)
self.databases.remove(name)
def create_schema(self, name): def create_schema(self, name):
self.schemas.add(name) self.schemas.add(name)
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
@ -1047,9 +1066,13 @@ class Postgres(QueryRunner):
def cleanup_databases(self): def cleanup_databases(self):
for database in self.databases: for database in self.databases:
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=database)
self.sql( self.sql(
sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database)) sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier(database)
) )
)
self.databases.clear()
def cleanup_schemas(self): def cleanup_schemas(self):
for schema in self.schemas: for schema in self.schemas:

View File

@ -241,7 +241,8 @@ def run_python_test(test_name, args):
"pytest", "pytest",
"pytest", "pytest",
"--numprocesses", "--numprocesses",
"auto", # Tests may be heavy, so limit the concurrency
"2",
"--count", "--count",
str(args["repeat"]), str(args["repeat"]),
str(test_path), str(test_path),

View File

@ -62,7 +62,7 @@ def test_set_maindb(cluster_factory):
wait_until_maintenance_deamons_start(2, cluster) wait_until_maintenance_deamons_start(2, cluster)
cluster.coordinator.sql("DROP DATABASE mymaindb;") cluster.coordinator.drop_database("mymaindb")
wait_until_maintenance_deamons_start(1, cluster) wait_until_maintenance_deamons_start(1, cluster)

View File

@ -0,0 +1,149 @@
import asyncio
import pytest
from psycopg.errors import DeadlockDetected
# For every database there is expected to be 2 queries,
# so ~80 connections will be held by deadlocks. Another 5 is expected to be used by maintenance daemon,
# leaving ~15 available
DATABASES_NUMBER = 40
async def test_multiple_databases_distributed_deadlock_detection(cluster):
# Disable maintenance on all nodes
for node in cluster.nodes:
node.configure(
"citus.recover_2pc_interval = '-1'",
"citus.distributed_deadlock_detection_factor = '-1'",
"citus.max_maintenance_shared_pool_size = 5",
)
node.restart()
# Prepare database names for test
db_names = [f"db{db_index}" for db_index in range(1, DATABASES_NUMBER + 1)]
# Create and configure databases
for db_name in db_names:
nodes = cluster.workers + [cluster.coordinator]
for node in nodes:
node.create_database(f"{db_name}")
with node.cur(dbname=db_name) as node_cursor:
node_cursor.execute("CREATE EXTENSION citus;")
if node == cluster.coordinator:
for worker in cluster.workers:
node_cursor.execute(
"SELECT pg_catalog.citus_add_node(%s, %s)",
(worker.host, worker.port),
)
node_cursor.execute(
"""
CREATE TABLE public.deadlock_detection_test (user_id int UNIQUE, some_val int);
SELECT create_distributed_table('public.deadlock_detection_test', 'user_id');
INSERT INTO public.deadlock_detection_test SELECT i, i FROM generate_series(1,2) i;
"""
)
async def create_deadlock(db_name, run_on_coordinator):
"""Function to prepare a deadlock query in a given database"""
# Init connections and store for later commits
if run_on_coordinator:
first_connection = await cluster.coordinator.aconn(
dbname=db_name, autocommit=False
)
first_cursor = first_connection.cursor()
second_connection = await cluster.coordinator.aconn(
dbname=db_name, autocommit=False
)
second_cursor = second_connection.cursor()
else:
first_connection = await cluster.workers[0].aconn(
dbname=db_name, autocommit=False
)
first_cursor = first_connection.cursor()
second_connection = await cluster.workers[1].aconn(
dbname=db_name, autocommit=False
)
second_cursor = second_connection.cursor()
# initiate deadlock
await first_cursor.execute(
"UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 1;"
)
await second_cursor.execute(
"UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 2;"
)
# Test that deadlock is resolved by a maintenance daemon
with pytest.raises(DeadlockDetected):
async def run_deadlocked_queries():
await asyncio.gather(
second_cursor.execute(
"UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 1;"
),
first_cursor.execute(
"UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 2;"
),
)
await asyncio.wait_for(run_deadlocked_queries(), 300)
await first_connection.rollback()
await second_connection.rollback()
async def enable_maintenance_when_deadlocks_ready():
"""Function to enable maintenance daemons, when all the expected deadlock queries are ready"""
# Let deadlocks commence
await asyncio.sleep(2)
# Check that queries are deadlocked
databases_with_deadlock = set()
while len(databases_with_deadlock) < DATABASES_NUMBER:
for db_name in (db for db in db_names if db not in databases_with_deadlock):
for node in cluster.nodes:
async with node.acur(dbname=db_name) as cursor:
expected_lock_count = 4 if node == cluster.coordinator else 2
await cursor.execute(
"""
SELECT count(*) = %s AS deadlock_created
FROM pg_locks
INNER JOIN pg_class pc ON relation = oid
WHERE relname LIKE 'deadlock_detection_test%%'""",
(expected_lock_count,),
)
queries_deadlocked = await cursor.fetchone()
if queries_deadlocked[0]:
databases_with_deadlock.add(db_name)
# Enable maintenance back
for node in cluster.nodes:
node.reset_configuration(
"citus.recover_2pc_interval",
"citus.distributed_deadlock_detection_factor",
)
node.reload()
# Distribute deadlocked queries among all nodes in the cluster
tasks = list()
for idx, db_name in enumerate(db_names):
run_on_coordinator = True if idx % 3 == 0 else False
tasks.append(
create_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)
)
tasks.append(enable_maintenance_when_deadlocks_ready())
# await for the results
await asyncio.gather(*tasks)
# Check for "too many clients" on all nodes
for node in cluster.nodes:
with node.cur() as cursor:
cursor.execute(
"""
SELECT count(*) AS too_many_clients_errors_count
FROM regexp_split_to_table(pg_read_file(%s), E'\n') AS t(log_line)
WHERE log_line LIKE '%%sorry, too many clients already%%';""",
(node.log_path.as_posix(),),
)
too_many_clients_errors_count = cursor.fetchone()[0]
assert too_many_clients_errors_count == 0

View File

@ -949,10 +949,15 @@ select count(*) from pg_constraint where conname = 'fkey_test_drop';
-- verify we still preserve the child-parent hierarchy after all conversions -- verify we still preserve the child-parent hierarchy after all conversions
-- check the shard partition -- check the shard partition
select inhrelid::regclass from pg_inherits where (select inhparent::regclass::text) ~ '^parent_1_\d{7}$' order by 1; SELECT count(*) = 1 AS child_parent_hierarchy_test
inhrelid FROM pg_inherits
WHERE (SELECT inhparent::regclass::text) ~ '^parent_1_\d{7}$'
-- order of the child shard id is not guaranteed, but should be either 1904004 or 04
AND (inhrelid::regclass::text) IN ('parent_1_child_1_1904004', 'parent_1_child_1_1904006')
ORDER BY 1;
child_parent_hierarchy_test
--------------------------------------------------------------------- ---------------------------------------------------------------------
parent_1_child_1_1904006 t
(1 row) (1 row)
-- check the shell partition -- check the shell partition

View File

@ -144,6 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3;
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: connection not open ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: connection not open
COMMIT; COMMIT;
-- Maintenance connections are not cached
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -157,8 +158,11 @@ SELECT recover_prepared_transactions();
(1 row) (1 row)
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
ERROR: connection not open recover_prepared_transactions
CONTEXT: while executing command on localhost:xxxxx ---------------------------------------------------------------------
0
(1 row)
-- bug from https://github.com/citusdata/citus/issues/1926 -- bug from https://github.com/citusdata/citus/issues/1926
SET citus.max_cached_conns_per_worker TO 0; -- purge cache SET citus.max_cached_conns_per_worker TO 0; -- purge cache
DROP TABLE select_test; DROP TABLE select_test;

View File

@ -56,8 +56,8 @@ SELECT global_pid AS maintenance_daemon_gpid
FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id
WHERE application_name = 'Citus Maintenance Daemon' \gset WHERE application_name = 'Citus Maintenance Daemon' \gset
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
CREATE USER global_cancel_user; CREATE USER global_cancel_user NOSUPERUSER ;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user NOSUPERUSER ;');
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -66,6 +66,12 @@ SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user');
RESET client_min_messages; RESET client_min_messages;
\c - global_cancel_user - :master_port \c - global_cancel_user - :master_port
SELECT current_user;
current_user
---------------------------------------------------------------------
global_cancel_user
(1 row)
SELECT pg_typeof(:maintenance_daemon_gpid); SELECT pg_typeof(:maintenance_daemon_gpid);
pg_typeof pg_typeof
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -0,0 +1,545 @@
-- This test verfies a behavioir of maintenance daemon in multi-database environment
-- It checks that distributed deadlock detection and 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota.
-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there.
SELECT $definition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1';
SELECT pg_reload_conf();
$definition$ AS turn_off_maintenance
\gset
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
\gset
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
db_create_statement text;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('CREATE DATABASE %I', db_name)
INTO db_create_statement;
PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port),
db_create_statement);
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'CREATE EXTENSION citus;');
IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport))
FROM pg_dist_node
WHERE groupid != 0 AND isactive AND noderole = 'primary';
END IF;
END LOOP;
END;
$do$;
$definition$ AS create_databases
\gset
-- Code reiles heavily on dblink for cross-db and cross-node queries
CREATE EXTENSION IF NOT EXISTS dblink;
-- Disable maintenance operations to prepare the environment
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Create databases
\c - - - :worker_1_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
\c - - - :worker_2_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
\c - - - :master_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
-- Generate distributed transactions
\c - - - :master_port
DO
$do$
DECLARE
index int;
db_name text;
transaction_to_abort_name text;
transaction_to_commit_name text;
transaction_to_be_forgotten text;
coordinator_port int;
BEGIN
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('citus_0_1234_3_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_abort_name;
SELECT format('citus_0_1234_4_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_commit_name;
SELECT format('citus_0_should_be_forgotten_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_be_forgotten;
SELECT setting::int
FROM pg_settings
WHERE name = 'port'
INTO coordinator_port;
-- Prepare transactions on workers
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport),
format($worker_cmd$
BEGIN;
CREATE TABLE should_abort
(value int);
PREPARE TRANSACTION '%s';
BEGIN;
CREATE TABLE should_commit
(value int);
PREPARE TRANSACTION '%s';
$worker_cmd$, transaction_to_abort_name, transaction_to_commit_name))
FROM pg_dist_node
WHERE groupid != 0
AND isactive
AND noderole = 'primary';
-- Fill the pg_dist_transaction
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port),
format($coordinator_cmd$
INSERT INTO pg_dist_transaction
SELECT groupid, '%s' FROM pg_dist_node
UNION ALL
SELECT groupid, '%s' FROM pg_dist_node;
$coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten));
END LOOP;
END;
$do$;
-- Verify state before enabling maintenance
\c - - - :master_port
SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT groupid, gid
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
pg_dist_transaction_before_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_before_recover_worker_1_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_worker_1_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_before_recover_worker_2_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_worker_2_test
---------------------------------------------------------------------
t
(1 row)
-- Turn on the maintenance
\c - - - :master_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
-- Verify maintenance result
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
DO
$$
DECLARE
pg_dist_transaction_after_recovery_coordinator_test boolean;
cached_connections_after_recovery_coordinator_test boolean;
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
SELECT count(*) = 0
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT groupid, gid
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%'
INTO pg_dist_transaction_after_recovery_coordinator_test;
SELECT count(*) = 0 AS cached_connections_after_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval
INTO cached_connections_after_recovery_coordinator_test;
IF (pg_dist_transaction_after_recovery_coordinator_test
AND cached_connections_after_recovery_coordinator_test) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
\c - - - :worker_1_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_after_recover_worker_1_test
---------------------------------------------------------------------
t
(1 row)
DO
$$
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
\c - - - :worker_2_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
too_many_clients_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_after_recover_worker_2_test
---------------------------------------------------------------------
t
(1 row)
DO
$$
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
-- Cleanup
\c - - - :master_port
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'DROP EXTENSION citus;');
END LOOP;
END;
$do$;
-- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink
DROP DATABASE db1 WITH (FORCE);
DROP DATABASE db2 WITH (FORCE);
DROP DATABASE db3 WITH (FORCE);
DROP DATABASE db4 WITH (FORCE);
DROP DATABASE db5 WITH (FORCE);
DROP DATABASE db6 WITH (FORCE);
DROP DATABASE db7 WITH (FORCE);
DROP DATABASE db8 WITH (FORCE);
DROP DATABASE db9 WITH (FORCE);
DROP DATABASE db10 WITH (FORCE);
DROP DATABASE db11 WITH (FORCE);
DROP DATABASE db12 WITH (FORCE);
DROP DATABASE db13 WITH (FORCE);
DROP DATABASE db14 WITH (FORCE);
DROP DATABASE db15 WITH (FORCE);
DROP DATABASE db16 WITH (FORCE);
DROP DATABASE db17 WITH (FORCE);
DROP DATABASE db18 WITH (FORCE);
DROP DATABASE db19 WITH (FORCE);
DROP DATABASE db20 WITH (FORCE);
DROP DATABASE db21 WITH (FORCE);
DROP DATABASE db22 WITH (FORCE);
DROP DATABASE db23 WITH (FORCE);
DROP DATABASE db24 WITH (FORCE);
DROP DATABASE db25 WITH (FORCE);
DROP DATABASE db26 WITH (FORCE);
DROP DATABASE db27 WITH (FORCE);
DROP DATABASE db28 WITH (FORCE);
DROP DATABASE db29 WITH (FORCE);
DROP DATABASE db30 WITH (FORCE);
DROP DATABASE db31 WITH (FORCE);
DROP DATABASE db32 WITH (FORCE);
DROP DATABASE db33 WITH (FORCE);
DROP DATABASE db34 WITH (FORCE);
DROP DATABASE db35 WITH (FORCE);
DROP DATABASE db36 WITH (FORCE);
DROP DATABASE db37 WITH (FORCE);
DROP DATABASE db38 WITH (FORCE);
DROP DATABASE db39 WITH (FORCE);
DROP DATABASE db40 WITH (FORCE);
DROP DATABASE db41 WITH (FORCE);
DROP DATABASE db42 WITH (FORCE);
DROP DATABASE db43 WITH (FORCE);
DROP DATABASE db44 WITH (FORCE);
DROP DATABASE db45 WITH (FORCE);
DROP DATABASE db46 WITH (FORCE);
DROP DATABASE db47 WITH (FORCE);
DROP DATABASE db48 WITH (FORCE);
DROP DATABASE db49 WITH (FORCE);
DROP DATABASE db50 WITH (FORCE);
DROP DATABASE db51 WITH (FORCE);
DROP DATABASE db52 WITH (FORCE);
DROP DATABASE db53 WITH (FORCE);
DROP DATABASE db54 WITH (FORCE);
DROP DATABASE db55 WITH (FORCE);
DROP DATABASE db56 WITH (FORCE);
DROP DATABASE db57 WITH (FORCE);
DROP DATABASE db58 WITH (FORCE);
DROP DATABASE db59 WITH (FORCE);
DROP DATABASE db60 WITH (FORCE);
DROP DATABASE db61 WITH (FORCE);
DROP DATABASE db62 WITH (FORCE);
DROP DATABASE db63 WITH (FORCE);
DROP DATABASE db64 WITH (FORCE);
DROP DATABASE db65 WITH (FORCE);
DROP DATABASE db66 WITH (FORCE);
DROP DATABASE db67 WITH (FORCE);
DROP DATABASE db68 WITH (FORCE);
DROP DATABASE db69 WITH (FORCE);
DROP DATABASE db70 WITH (FORCE);
DROP DATABASE db71 WITH (FORCE);
DROP DATABASE db72 WITH (FORCE);
DROP DATABASE db73 WITH (FORCE);
DROP DATABASE db74 WITH (FORCE);
DROP DATABASE db75 WITH (FORCE);
DROP DATABASE db76 WITH (FORCE);
DROP DATABASE db77 WITH (FORCE);
DROP DATABASE db78 WITH (FORCE);
DROP DATABASE db79 WITH (FORCE);
DROP DATABASE db80 WITH (FORCE);
DROP DATABASE db81 WITH (FORCE);
DROP DATABASE db82 WITH (FORCE);
DROP DATABASE db83 WITH (FORCE);
DROP DATABASE db84 WITH (FORCE);
DROP DATABASE db85 WITH (FORCE);
DROP DATABASE db86 WITH (FORCE);
DROP DATABASE db87 WITH (FORCE);
DROP DATABASE db88 WITH (FORCE);
DROP DATABASE db89 WITH (FORCE);
DROP DATABASE db90 WITH (FORCE);
DROP DATABASE db91 WITH (FORCE);
DROP DATABASE db92 WITH (FORCE);
DROP DATABASE db93 WITH (FORCE);
DROP DATABASE db94 WITH (FORCE);
DROP DATABASE db95 WITH (FORCE);
DROP DATABASE db96 WITH (FORCE);
DROP DATABASE db97 WITH (FORCE);
DROP DATABASE db98 WITH (FORCE);
DROP DATABASE db99 WITH (FORCE);
DROP DATABASE db100 WITH (FORCE);
SELECT count(*) = 0 as all_databases_dropped
FROM pg_database
WHERE datname LIKE 'db%';
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size;
SELECT pg_reload_conf();
$definition$ AS cleanup
\gset
:cleanup
all_databases_dropped
---------------------------------------------------------------------
t
(1 row)
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
:cleanup
all_databases_dropped
---------------------------------------------------------------------
t
(1 row)
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
:cleanup
all_databases_dropped
---------------------------------------------------------------------
t
(1 row)
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
DROP EXTENSION IF EXISTS dblink;

View File

@ -220,6 +220,8 @@ test: multi_generate_ddl_commands
test: multi_create_shards test: multi_create_shards
test: multi_transaction_recovery test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases test: multi_transaction_recovery_multiple_databases
test: multi_maintenance_multiple_databases
#test: maintenance_connection_timeout
test: local_dist_join_modifications test: local_dist_join_modifications
test: local_table_join test: local_table_join

View File

@ -464,7 +464,7 @@ push(@pgOptions, "wal_retrieve_retry_interval=250");
push(@pgOptions, "max_logical_replication_workers=50"); push(@pgOptions, "max_logical_replication_workers=50");
push(@pgOptions, "max_wal_senders=50"); push(@pgOptions, "max_wal_senders=50");
push(@pgOptions, "max_worker_processes=50"); push(@pgOptions, "max_worker_processes=150");
if ($majorversion >= "14") { if ($majorversion >= "14") {
# disable compute_query_id so that we don't get Query Identifiers # disable compute_query_id so that we don't get Query Identifiers

View File

@ -581,7 +581,12 @@ alter table parent_1 drop constraint fkey_test_drop;
select count(*) from pg_constraint where conname = 'fkey_test_drop'; select count(*) from pg_constraint where conname = 'fkey_test_drop';
-- verify we still preserve the child-parent hierarchy after all conversions -- verify we still preserve the child-parent hierarchy after all conversions
-- check the shard partition -- check the shard partition
select inhrelid::regclass from pg_inherits where (select inhparent::regclass::text) ~ '^parent_1_\d{7}$' order by 1; SELECT count(*) = 1 AS child_parent_hierarchy_test
FROM pg_inherits
WHERE (SELECT inhparent::regclass::text) ~ '^parent_1_\d{7}$'
-- order of the child shard id is not guaranteed, but should be either 1904004 or 04
AND (inhrelid::regclass::text) IN ('parent_1_child_1_1904004', 'parent_1_child_1_1904006')
ORDER BY 1;
-- check the shell partition -- check the shell partition
select inhrelid::regclass from pg_inherits where inhparent='parent_1'::regclass order by 1; select inhrelid::regclass from pg_inherits where inhparent='parent_1'::regclass order by 1;

View File

@ -77,6 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
-- Maintenance connections are not cached
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();

View File

@ -39,11 +39,12 @@ FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = g
WHERE application_name = 'Citus Maintenance Daemon' \gset WHERE application_name = 'Citus Maintenance Daemon' \gset
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
CREATE USER global_cancel_user; CREATE USER global_cancel_user NOSUPERUSER ;
SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user NOSUPERUSER ;');
RESET client_min_messages; RESET client_min_messages;
\c - global_cancel_user - :master_port \c - global_cancel_user - :master_port
SELECT current_user;
SELECT pg_typeof(:maintenance_daemon_gpid); SELECT pg_typeof(:maintenance_daemon_gpid);

View File

@ -0,0 +1,475 @@
-- This test verfies a behavioir of maintenance daemon in multi-database environment
-- It checks that distributed deadlock detection and 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota.
-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there.
SELECT $definition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1';
SELECT pg_reload_conf();
$definition$ AS turn_off_maintenance
\gset
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
\gset
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
db_create_statement text;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('CREATE DATABASE %I', db_name)
INTO db_create_statement;
PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port),
db_create_statement);
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'CREATE EXTENSION citus;');
IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport))
FROM pg_dist_node
WHERE groupid != 0 AND isactive AND noderole = 'primary';
END IF;
END LOOP;
END;
$do$;
$definition$ AS create_databases
\gset
-- Code reiles heavily on dblink for cross-db and cross-node queries
CREATE EXTENSION IF NOT EXISTS dblink;
-- Disable maintenance operations to prepare the environment
:turn_off_maintenance
\c - - - :worker_1_port
:turn_off_maintenance
\c - - - :worker_2_port
:turn_off_maintenance
-- Create databases
\c - - - :worker_1_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
\c - - - :worker_2_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
\c - - - :master_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
-- Generate distributed transactions
\c - - - :master_port
DO
$do$
DECLARE
index int;
db_name text;
transaction_to_abort_name text;
transaction_to_commit_name text;
transaction_to_be_forgotten text;
coordinator_port int;
BEGIN
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('citus_0_1234_3_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_abort_name;
SELECT format('citus_0_1234_4_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_commit_name;
SELECT format('citus_0_should_be_forgotten_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_be_forgotten;
SELECT setting::int
FROM pg_settings
WHERE name = 'port'
INTO coordinator_port;
-- Prepare transactions on workers
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport),
format($worker_cmd$
BEGIN;
CREATE TABLE should_abort
(value int);
PREPARE TRANSACTION '%s';
BEGIN;
CREATE TABLE should_commit
(value int);
PREPARE TRANSACTION '%s';
$worker_cmd$, transaction_to_abort_name, transaction_to_commit_name))
FROM pg_dist_node
WHERE groupid != 0
AND isactive
AND noderole = 'primary';
-- Fill the pg_dist_transaction
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port),
format($coordinator_cmd$
INSERT INTO pg_dist_transaction
SELECT groupid, '%s' FROM pg_dist_node
UNION ALL
SELECT groupid, '%s' FROM pg_dist_node;
$coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten));
END LOOP;
END;
$do$;
-- Verify state before enabling maintenance
\c - - - :master_port
SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT groupid, gid
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_1_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_2_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
-- Turn on the maintenance
\c - - - :master_port
:turn_on_maintenance
\c - - - :worker_1_port
:turn_on_maintenance
\c - - - :worker_2_port
:turn_on_maintenance
\c - - - :master_port
-- Verify maintenance result
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
DO
$$
DECLARE
pg_dist_transaction_after_recovery_coordinator_test boolean;
cached_connections_after_recovery_coordinator_test boolean;
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
SELECT count(*) = 0
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT groupid, gid
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%'
INTO pg_dist_transaction_after_recovery_coordinator_test;
SELECT count(*) = 0 AS cached_connections_after_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval
INTO cached_connections_after_recovery_coordinator_test;
IF (pg_dist_transaction_after_recovery_coordinator_test
AND cached_connections_after_recovery_coordinator_test) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
\c - - - :worker_1_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
DO
$$
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
\c - - - :worker_2_port
SELECT count(*) = 0 AS too_many_clients_test
FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line)
WHERE log_line LIKE '%sorry, too many clients already%';
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
DO
$$
BEGIN
FOR i IN 0 .. 300
LOOP
IF i = 300 THEN RAISE 'Waited too long'; END IF;
IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval) THEN
EXIT;
END IF;
PERFORM pg_sleep_for('1 SECOND'::interval);
END LOOP;
END
$$;
-- Cleanup
\c - - - :master_port
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'DROP EXTENSION citus;');
END LOOP;
END;
$do$;
-- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink
DROP DATABASE db1 WITH (FORCE);
DROP DATABASE db2 WITH (FORCE);
DROP DATABASE db3 WITH (FORCE);
DROP DATABASE db4 WITH (FORCE);
DROP DATABASE db5 WITH (FORCE);
DROP DATABASE db6 WITH (FORCE);
DROP DATABASE db7 WITH (FORCE);
DROP DATABASE db8 WITH (FORCE);
DROP DATABASE db9 WITH (FORCE);
DROP DATABASE db10 WITH (FORCE);
DROP DATABASE db11 WITH (FORCE);
DROP DATABASE db12 WITH (FORCE);
DROP DATABASE db13 WITH (FORCE);
DROP DATABASE db14 WITH (FORCE);
DROP DATABASE db15 WITH (FORCE);
DROP DATABASE db16 WITH (FORCE);
DROP DATABASE db17 WITH (FORCE);
DROP DATABASE db18 WITH (FORCE);
DROP DATABASE db19 WITH (FORCE);
DROP DATABASE db20 WITH (FORCE);
DROP DATABASE db21 WITH (FORCE);
DROP DATABASE db22 WITH (FORCE);
DROP DATABASE db23 WITH (FORCE);
DROP DATABASE db24 WITH (FORCE);
DROP DATABASE db25 WITH (FORCE);
DROP DATABASE db26 WITH (FORCE);
DROP DATABASE db27 WITH (FORCE);
DROP DATABASE db28 WITH (FORCE);
DROP DATABASE db29 WITH (FORCE);
DROP DATABASE db30 WITH (FORCE);
DROP DATABASE db31 WITH (FORCE);
DROP DATABASE db32 WITH (FORCE);
DROP DATABASE db33 WITH (FORCE);
DROP DATABASE db34 WITH (FORCE);
DROP DATABASE db35 WITH (FORCE);
DROP DATABASE db36 WITH (FORCE);
DROP DATABASE db37 WITH (FORCE);
DROP DATABASE db38 WITH (FORCE);
DROP DATABASE db39 WITH (FORCE);
DROP DATABASE db40 WITH (FORCE);
DROP DATABASE db41 WITH (FORCE);
DROP DATABASE db42 WITH (FORCE);
DROP DATABASE db43 WITH (FORCE);
DROP DATABASE db44 WITH (FORCE);
DROP DATABASE db45 WITH (FORCE);
DROP DATABASE db46 WITH (FORCE);
DROP DATABASE db47 WITH (FORCE);
DROP DATABASE db48 WITH (FORCE);
DROP DATABASE db49 WITH (FORCE);
DROP DATABASE db50 WITH (FORCE);
DROP DATABASE db51 WITH (FORCE);
DROP DATABASE db52 WITH (FORCE);
DROP DATABASE db53 WITH (FORCE);
DROP DATABASE db54 WITH (FORCE);
DROP DATABASE db55 WITH (FORCE);
DROP DATABASE db56 WITH (FORCE);
DROP DATABASE db57 WITH (FORCE);
DROP DATABASE db58 WITH (FORCE);
DROP DATABASE db59 WITH (FORCE);
DROP DATABASE db60 WITH (FORCE);
DROP DATABASE db61 WITH (FORCE);
DROP DATABASE db62 WITH (FORCE);
DROP DATABASE db63 WITH (FORCE);
DROP DATABASE db64 WITH (FORCE);
DROP DATABASE db65 WITH (FORCE);
DROP DATABASE db66 WITH (FORCE);
DROP DATABASE db67 WITH (FORCE);
DROP DATABASE db68 WITH (FORCE);
DROP DATABASE db69 WITH (FORCE);
DROP DATABASE db70 WITH (FORCE);
DROP DATABASE db71 WITH (FORCE);
DROP DATABASE db72 WITH (FORCE);
DROP DATABASE db73 WITH (FORCE);
DROP DATABASE db74 WITH (FORCE);
DROP DATABASE db75 WITH (FORCE);
DROP DATABASE db76 WITH (FORCE);
DROP DATABASE db77 WITH (FORCE);
DROP DATABASE db78 WITH (FORCE);
DROP DATABASE db79 WITH (FORCE);
DROP DATABASE db80 WITH (FORCE);
DROP DATABASE db81 WITH (FORCE);
DROP DATABASE db82 WITH (FORCE);
DROP DATABASE db83 WITH (FORCE);
DROP DATABASE db84 WITH (FORCE);
DROP DATABASE db85 WITH (FORCE);
DROP DATABASE db86 WITH (FORCE);
DROP DATABASE db87 WITH (FORCE);
DROP DATABASE db88 WITH (FORCE);
DROP DATABASE db89 WITH (FORCE);
DROP DATABASE db90 WITH (FORCE);
DROP DATABASE db91 WITH (FORCE);
DROP DATABASE db92 WITH (FORCE);
DROP DATABASE db93 WITH (FORCE);
DROP DATABASE db94 WITH (FORCE);
DROP DATABASE db95 WITH (FORCE);
DROP DATABASE db96 WITH (FORCE);
DROP DATABASE db97 WITH (FORCE);
DROP DATABASE db98 WITH (FORCE);
DROP DATABASE db99 WITH (FORCE);
DROP DATABASE db100 WITH (FORCE);
SELECT count(*) = 0 as all_databases_dropped
FROM pg_database
WHERE datname LIKE 'db%';
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size;
SELECT pg_reload_conf();
$definition$ AS cleanup
\gset
:cleanup
\c - - - :worker_1_port
:cleanup
\c - - - :worker_2_port
:cleanup
\c - - - :master_port
DROP EXTENSION IF EXISTS dblink;