diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index cb64ef7f5..d8a222387 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2160,6 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); + uint32 connectionFlags = 0; /* * Colocated intermediate results do not honor citus.max_shared_pool_size, @@ -2181,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, * and cannot switch to local execution (e.g., disabled by user), * COPY would fail hinting the user to change the relevant settiing. */ - EnsureConnectionPossibilityForRemotePrimaryNodes(); + EnsureConnectionPossibilityForRemotePrimaryNodes(connectionFlags); } LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); @@ -2211,7 +2212,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, */ if (ShardIntervalListHasLocalPlacements(shardIntervalList)) { - bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode( + connectionFlags); copyDest->shouldUseLocalCopy = !reservedConnection; } } @@ -3634,7 +3636,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, return connection; } - if (IsReservationPossible()) + if (IsReservationPossible(connectionFlags)) { /* * Enforce the requirements for adaptive connection management diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f8e4816ed..1d3eee0c4 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -61,8 +61,8 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); -static bool ShouldShutdownConnection(MultiConnection *connection, const int - cachedConnectionCount); +static bool ShouldShutdownConnection(MultiConnection *connection, + const int cachedConnectionCount); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -354,6 +354,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { + if (flags & REQUIRE_MAINTENANCE_CONNECTION) + { + /* Maintenance database may have changed, so cached connection should be closed */ + connection->forceCloseAtTransactionEnd = true; + } return connection; } } @@ -377,9 +382,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, /* these two flags are by nature cannot happen at the same time */ Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION))); + int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION) + ? MAINTENANCE_CONNECTION + : 0; if (flags & WAIT_FOR_CONNECTION) { - WaitLoopForSharedConnection(hostname, port); + WaitLoopForSharedConnection(sharedCounterFlags, hostname, port); } else if (flags & OPTIONAL_CONNECTION) { @@ -389,7 +397,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * cannot reserve the right to establish a connection, we prefer to * error out. */ - if (!TryToIncrementSharedConnectionCounter(hostname, port)) + if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port)) { /* do not track the connection anymore */ dlist_delete(&connection->connectionNode); @@ -409,7 +417,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * * Still, we keep track of the connection counter. */ - IncrementSharedConnectionCounter(hostname, port); + IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port); } @@ -423,11 +431,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); - - if ((flags & REQUIRE_METADATA_CONNECTION)) + if (flags & REQUIRE_METADATA_CONNECTION) { connection->useForMetadataOperations = true; } + else if (flags & REQUIRE_MAINTENANCE_CONNECTION) + { + connection->useForMaintenanceOperations = true; + connection->forceCloseAtTransactionEnd = true; + } /* fully initialized the connection, record it */ connection->initializationState = POOL_STATE_INITIALIZED; @@ -495,6 +507,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } + if ((flags & REQUIRE_MAINTENANCE_CONNECTION) && + !connection->useForMaintenanceOperations) + { + continue; + } + if ((flags & REQUIRE_METADATA_CONNECTION) && !connection->useForMetadataOperations) { @@ -1191,7 +1209,11 @@ CitusPQFinish(MultiConnection *connection) /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED) { - DecrementSharedConnectionCounter(connection->hostname, connection->port); + int sharedCounterFlags = (connection->useForMaintenanceOperations) + ? MAINTENANCE_CONNECTION + : 0; + DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname, + connection->port); connection->initializationState = POOL_STATE_NOT_INITIALIZED; } } diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a64930b32..cbe125dda 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -92,9 +92,11 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char * userId, Oid databaseOid, bool *found); -static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 + connectionFlags); static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, - bool waitForConnection); + bool waitForConnection, + uint32 connectionFlags); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -240,7 +242,9 @@ DeallocateReservedConnections(void) * We have not used this reservation, make sure to clean-up from * the shared memory as well. */ - DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port); + int sharedCounterFlags = 0; + DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname, + entry->key.port); /* for completeness, set it to true */ entry->usedReservation = true; @@ -295,7 +299,7 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, * EnsureConnectionPossibilityForNodeList. */ void -EnsureConnectionPossibilityForRemotePrimaryNodes(void) +EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags) { /* * By using NoLock there is a tiny risk of that we miss to reserve a @@ -304,7 +308,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void) * going to access would be on the new node. */ List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock); - EnsureConnectionPossibilityForNodeList(remoteNodeList); + EnsureConnectionPossibilityForNodeList(remoteNodeList, connectionFlags); } @@ -314,7 +318,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void) * If not, the function returns false. */ bool -TryConnectionPossibilityForLocalPrimaryNode(void) +TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags) { bool nodeIsInMetadata = false; WorkerNode *localNode = @@ -330,7 +334,8 @@ TryConnectionPossibilityForLocalPrimaryNode(void) } bool waitForConnection = false; - return EnsureConnectionPossibilityForNode(localNode, waitForConnection); + return EnsureConnectionPossibilityForNode(localNode, waitForConnection, + connectionFlags); } @@ -344,7 +349,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void) * single reservation per backend) */ static void -EnsureConnectionPossibilityForNodeList(List *nodeList) +EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags) { /* * We sort the workerList because adaptive connection management @@ -363,7 +368,8 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) foreach_ptr(workerNode, nodeList) { bool waitForConnection = true; - EnsureConnectionPossibilityForNode(workerNode, waitForConnection); + EnsureConnectionPossibilityForNode(workerNode, waitForConnection, + connectionFlags); } } @@ -382,9 +388,10 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) * return false. */ static bool -EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32 + connectionFlags) { - if (!IsReservationPossible()) + if (!IsReservationPossible(connectionFlags)) { return false; } @@ -439,13 +446,17 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio * Increment the shared counter, we may need to wait if there are * no space left. */ - WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); + int sharedCounterFlags = 0; + WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName, + workerNode->workerPort); } else { - bool incremented = - TryToIncrementSharedConnectionCounter(workerNode->workerName, - workerNode->workerPort); + int sharedCounterFlags = 0; + bool incremented = TryToIncrementSharedConnectionCounter( + sharedCounterFlags, + workerNode->workerName, + workerNode->workerPort); if (!incremented) { /* @@ -475,9 +486,13 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio * session is eligible for shared connection reservation. */ bool -IsReservationPossible(void) +IsReservationPossible(uint32 connectionFlags) { - if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + bool connectionThrottlingDisabled = + connectionFlags & REQUIRE_MAINTENANCE_CONNECTION + ? GetMaxMaintenanceSharedPoolSize() <= 0 + : GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING; + if (connectionThrottlingDisabled) { /* connection throttling disabled */ return false; diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 26598b465..4338a7860 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -13,12 +13,12 @@ #include "postgres.h" #include "libpq-fe.h" +#include "math.h" #include "miscadmin.h" #include "pgstat.h" #include "access/hash.h" #include "access/htup_details.h" -#include "catalog/pg_authid.h" #include "commands/dbcommands.h" #include "common/hashfn.h" #include "storage/ipc.h" @@ -27,19 +27,15 @@ #include "pg_version_constants.h" #include "distributed/backend_data.h" -#include "distributed/cancel_utils.h" #include "distributed/connection_management.h" -#include "distributed/listutils.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_executor.h" #include "distributed/placement_connection.h" #include "distributed/shared_connection_stats.h" #include "distributed/time_constants.h" #include "distributed/tuplestore.h" #include "distributed/worker_manager.h" - #define REMOTE_CONNECTION_STATS_COLUMNS 4 @@ -55,11 +51,18 @@ typedef struct ConnectionStatsSharedData char *sharedConnectionHashTrancheName; LWLock sharedConnectionHashLock; - ConditionVariable waitersConditionVariable; + ConditionVariable regularConnectionWaitersConditionVariable; + ConditionVariable maintenanceConnectionWaitersConditionVariable; } ConnectionStatsSharedData; - -typedef struct SharedConnStatsHashKey +/* + * There are two hash tables: + * + * 1. The first one tracks the connection count per worker node and used for the connection throttling + * 2. The second one tracks the connection count per database on a worker node and used for statistics + * + */ +typedef struct SharedWorkerNodeConnStatsHashKey { /* * We keep the entries in the shared memory even after master_update_node() @@ -68,21 +71,29 @@ typedef struct SharedConnStatsHashKey */ char hostname[MAX_NODE_LENGTH]; int32 port; +} SharedWorkerNodeConnStatsHashKey; - /* - * Given that citus.shared_max_pool_size can be defined per database, we - * should keep track of shared connections per database. - */ - Oid databaseOid; -} SharedConnStatsHashKey; +typedef struct SharedWorkerNodeDatabaseConnStatsHashKey +{ + SharedWorkerNodeConnStatsHashKey workerNodeKey; + Oid database; +} SharedWorkerNodeDatabaseConnStatsHashKey; /* hash entry for per worker stats */ -typedef struct SharedConnStatsHashEntry +typedef struct SharedWorkerNodeConnStatsHashEntry { - SharedConnStatsHashKey key; + SharedWorkerNodeConnStatsHashKey key; - int connectionCount; -} SharedConnStatsHashEntry; + int regularConnectionsCount; + int maintenanceConnectionsCount; +} SharedWorkerNodeConnStatsHashEntry; + +/* hash entry for per database on worker stats */ +typedef struct SharedWorkerNodeDatabaseConnStatsHashEntry +{ + SharedWorkerNodeDatabaseConnStatsHashKey key; + int count; +} SharedWorkerNodeDatabaseConnStatsHashEntry; /* @@ -93,6 +104,14 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; +/* + * Controlled via a GUC, never access directly, use GetMaxMaintenanceSharedPoolSize(). + * Pool size for maintenance connections exclusively + * "0" or "-1" means do not apply connection throttling + */ +int MaxMaintenanceSharedPoolSize = -1; +int MaintenanceConnectionPoolTimeout = 30 * MS_PER_SECOND; + /* * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). * "0" means adjust LocalSharedPoolSize automatically by using MaxConnections. @@ -106,7 +125,8 @@ int MaxClientConnections = ALLOW_ALL_EXTERNAL_CONNECTIONS; /* the following two structs are used for accessing shared memory */ -static HTAB *SharedConnStatsHash = NULL; +static HTAB *SharedWorkerNodeConnStatsHash = NULL; +static HTAB *SharedWorkerNodeDatabaseConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -121,6 +141,25 @@ static void UnLockConnectionSharedMemory(void); static bool ShouldWaitForConnection(int currentConnectionCount); static uint32 SharedConnectionHashHash(const void *key, Size keysize); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); +static uint32 SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize); +static int SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size + keysize); +static bool isConnectionThrottlingDisabled(uint32 externalFlags); +static bool IncrementSharedConnectionCounterInternal(uint32 externalFlags, bool + checkLimits, const char *hostname, + int port, + Oid database); +static SharedWorkerNodeConnStatsHashKey PrepareWorkerNodeHashKey(const char *hostname, int + port); +static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey(const + char * + hostname, + int port, + Oid + database); +static void DecrementSharedConnectionCounterInternal(uint32 externalFlags, const + char *hostname, int port, Oid + database); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -160,26 +199,29 @@ StoreAllRemoteConnectionStats(Tuplestorestate *tupleStore, TupleDesc tupleDescri LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; - SharedConnStatsHashEntry *connectionEntry = NULL; + SharedWorkerNodeDatabaseConnStatsHashEntry *connectionEntry = NULL; - hash_seq_init(&status, SharedConnStatsHash); - while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0) + hash_seq_init(&status, SharedWorkerNodeDatabaseConnStatsHash); + while ((connectionEntry = + (SharedWorkerNodeDatabaseConnStatsHashEntry *) hash_seq_search( + &status)) != 0) { /* get ready for the next tuple */ memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - char *databaseName = get_database_name(connectionEntry->key.databaseOid); + char *databaseName = get_database_name(connectionEntry->key.database); if (databaseName == NULL) { /* database might have been dropped */ continue; } - values[0] = PointerGetDatum(cstring_to_text(connectionEntry->key.hostname)); - values[1] = Int32GetDatum(connectionEntry->key.port); + values[0] = PointerGetDatum(cstring_to_text( + connectionEntry->key.workerNodeKey.hostname)); + values[1] = Int32GetDatum(connectionEntry->key.workerNodeKey.port); values[2] = PointerGetDatum(cstring_to_text(databaseName)); - values[3] = Int32GetDatum(connectionEntry->connectionCount); + values[3] = Int32GetDatum(connectionEntry->count); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -226,6 +268,13 @@ GetMaxSharedPoolSize(void) } +int +GetMaxMaintenanceSharedPoolSize(void) +{ + return MaxMaintenanceSharedPoolSize; +} + + /* * GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is * controlled via a GUC. @@ -248,18 +297,18 @@ GetLocalSharedPoolSize(void) /* * WaitLoopForSharedConnection tries to increment the shared connection * counter for the given hostname/port and the current database in - * SharedConnStatsHash. + * SharedWorkerNodeConnStatsHash. * * The function implements a retry mechanism via a condition variable. */ void -WaitLoopForSharedConnection(const char *hostname, int port) +WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port) { - while (!TryToIncrementSharedConnectionCounter(hostname, port)) + while (!TryToIncrementSharedConnectionCounter(flags, hostname, port)) { CHECK_FOR_INTERRUPTS(); - WaitForSharedConnection(); + WaitForSharedConnection(flags); } ConditionVariableCancelSleep(); @@ -269,32 +318,20 @@ WaitLoopForSharedConnection(const char *hostname, int port) /* * TryToIncrementSharedConnectionCounter tries to increment the shared * connection counter for the given nodeId and the current database in - * SharedConnStatsHash. + * SharedWorkerNodeConnStatsHash. * * If the function returns true, the caller is allowed (and expected) * to establish a new connection to the given node. Else, the caller * is not allowed to establish a new connection. */ bool -TryToIncrementSharedConnectionCounter(const char *hostname, int port) +TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) { - if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING) + if (isConnectionThrottlingDisabled(flags)) { - /* connection throttling disabled */ return true; } - bool counterIncremented = false; - SharedConnStatsHashKey connKey; - - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - /* * The local session might already have some reserved connections to the given * node. In that case, we don't need to go through the shared memory. @@ -307,35 +344,41 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) return true; } - connKey.port = port; - connKey.databaseOid = MyDatabaseId; + return IncrementSharedConnectionCounterInternal(flags, + true, + hostname, + port, + MyDatabaseId); +} - /* - * Handle adaptive connection management for the local node slightly different - * as local node can failover to local execution. - */ - bool connectionToLocalNode = false; - int activeBackendCount = 0; - WorkerNode *workerNode = FindWorkerNode(hostname, port); - if (workerNode) + +/* + * IncrementSharedConnectionCounter increments the shared counter + * for the given hostname and port. + */ +void +IncrementSharedConnectionCounter(uint32 flags, const char *hostname, int port) +{ + if (isConnectionThrottlingDisabled(flags)) { - connectionToLocalNode = (workerNode->groupId == GetLocalGroupId()); - if (connectionToLocalNode && - GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES) - { - /* - * This early return is required as LocalNodeParallelExecutionFactor - * is ignored for the first connection below. This check makes the - * user experience is more accurate and also makes it easy for - * having regression tests which emulates the local node adaptive - * connection management. - */ - return false; - } - - activeBackendCount = GetExternalClientBackendCount(); + return; } + IncrementSharedConnectionCounterInternal(flags, + false, + hostname, + port, + MyDatabaseId); +} + + +static bool +IncrementSharedConnectionCounterInternal(uint32 externalFlags, + bool checkLimits, + const char *hostname, + int port, + Oid database) +{ LockConnectionSharedMemory(LW_EXCLUSIVE); /* @@ -344,30 +387,83 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) * space in the shared memory. That's why we prefer continuing the execution * instead of throwing an error. */ - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, + port); + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = + hash_search(SharedWorkerNodeConnStatsHash, + &workerNodeKey, + HASH_ENTER_NULL, + &workerNodeEntryFound); /* * It is possible to throw an error at this point, but that doesn't help us in anyway. * Instead, we try our best, let the connection establishment continue by-passing the * connection throttling. */ - if (!connectionEntry) + if (!workerNodeConnectionEntry) { UnLockConnectionSharedMemory(); return true; } - if (!entryFound) + if (!workerNodeEntryFound) { /* we successfully allocated the entry for the first time, so initialize it */ - connectionEntry->connectionCount = 1; - - counterIncremented = true; + workerNodeConnectionEntry->regularConnectionsCount = 0; + workerNodeConnectionEntry->maintenanceConnectionsCount = 0; } - else if (connectionToLocalNode) + + /* Initialize SharedWorkerNodeDatabaseConnStatsHash the same way */ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, database); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_ENTER_NULL, + &workerNodeDatabaseEntryFound); + + if (!workerNodeDatabaseEntry) { + UnLockConnectionSharedMemory(); + return true; + } + + if (!workerNodeDatabaseEntryFound) + { + workerNodeDatabaseEntry->count = 0; + } + + /* Increment counter if a slot available */ + bool connectionSlotAvailable = true; + + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + if (checkLimits) + { + WorkerNode *workerNode = FindWorkerNode(hostname, port); + bool connectionToLocalNode = workerNode && (workerNode->groupId == + GetLocalGroupId()); + + int currentConnectionsLimit; + int currentConnectionsCount; + if (maintenanceConnection) + { + currentConnectionsLimit = GetMaxMaintenanceSharedPoolSize(); + currentConnectionsCount = + workerNodeConnectionEntry->maintenanceConnectionsCount; + } + else + { + currentConnectionsLimit = connectionToLocalNode + ? GetLocalSharedPoolSize() + : GetMaxSharedPoolSize(); + currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; + } + + bool currentConnectionsLimitExceeded = currentConnectionsCount + 1 > + currentConnectionsLimit; + /* * For local nodes, solely relying on citus.max_shared_pool_size or * max_connections might not be sufficient. The former gives us @@ -380,98 +476,49 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) * a reasonable pace. The latter limit typically kicks in when the database * is issued lots of concurrent sessions at the same time, such as benchmarks. */ - if (activeBackendCount + 1 > GetLocalSharedPoolSize()) + bool localNodeConnectionsLimitExceeded = + connectionToLocalNode && + (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || + GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize()); + if (currentConnectionsLimitExceeded || localNodeConnectionsLimitExceeded) { - counterIncremented = false; + connectionSlotAvailable = false; } - else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize()) + } + + if (connectionSlotAvailable) + { + if (maintenanceConnection) { - counterIncremented = false; + workerNodeConnectionEntry->maintenanceConnectionsCount += 1; } else { - connectionEntry->connectionCount++; - counterIncremented = true; + workerNodeConnectionEntry->regularConnectionsCount += 1; } + workerNodeDatabaseEntry->count += 1; } - else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) + + if (IsLoggableLevel(DEBUG4)) { - /* there is no space left for this connection */ - counterIncremented = false; - } - else - { - connectionEntry->connectionCount++; - counterIncremented = true; + ereport(DEBUG4, errmsg( + "Incrementing %s connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is %s", + maintenanceConnection ? "maintenance" : "regular", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database, + connectionSlotAvailable ? "available" : "not available" + )); } UnLockConnectionSharedMemory(); - return counterIncremented; -} - -/* - * IncrementSharedConnectionCounter increments the shared counter - * for the given hostname and port. - */ -void -IncrementSharedConnectionCounter(const char *hostname, int port) -{ - SharedConnStatsHashKey connKey; - - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) - { - /* connection throttling disabled */ - return; - } - - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - - connKey.port = port; - connKey.databaseOid = MyDatabaseId; - - LockConnectionSharedMemory(LW_EXCLUSIVE); - - /* - * As the hash map is allocated in shared memory, it doesn't rely on palloc for - * memory allocation, so we could get NULL via HASH_ENTER_NULL. That's why we prefer - * continuing the execution instead of throwing an error. - */ - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); - - /* - * It is possible to throw an error at this point, but that doesn't help us in anyway. - * Instead, we try our best, let the connection establishment continue by-passing the - * connection throttling. - */ - if (!connectionEntry) - { - UnLockConnectionSharedMemory(); - - ereport(DEBUG4, (errmsg("No entry found for node %s:%d while incrementing " - "connection counter", hostname, port))); - - return; - } - - if (!entryFound) - { - /* we successfully allocated the entry for the first time, so initialize it */ - connectionEntry->connectionCount = 0; - } - - connectionEntry->connectionCount += 1; - - UnLockConnectionSharedMemory(); + return connectionSlotAvailable; } @@ -480,77 +527,119 @@ IncrementSharedConnectionCounter(const char *hostname, int port) * for the given hostname and port for the given count. */ void -DecrementSharedConnectionCounter(const char *hostname, int port) +DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int port) { - SharedConnStatsHashKey connKey; - - /* - * Do not call GetMaxSharedPoolSize() here, since it may read from - * the catalog and we may be in the process exit handler. - */ - if (MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING) + /* TODO: possible bug, remove this check? */ + if (isConnectionThrottlingDisabled(externalFlags)) { - /* connection throttling disabled */ return; } - strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); - if (strlen(hostname) > MAX_NODE_LENGTH) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); - } - - connKey.port = port; - connKey.databaseOid = MyDatabaseId; - LockConnectionSharedMemory(LW_EXCLUSIVE); - bool entryFound = false; - SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); + + UnLockConnectionSharedMemory(); + WakeupWaiterBackendsForSharedConnection(externalFlags); +} + + +static void +DecrementSharedConnectionCounterInternal(uint32 externalFlags, + const char *hostname, + int port, + Oid database) +{ + bool workerNodeEntryFound = false; + SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, + port); + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, + &workerNodeEntryFound); /* this worker node is removed or updated, no need to care */ - if (!entryFound) + if (!workerNodeEntryFound) { - UnLockConnectionSharedMemory(); - - /* wake up any waiters in case any backend is waiting for this node */ - WakeupWaiterBackendsForSharedConnection(); - ereport(DEBUG4, (errmsg("No entry found for node %s:%d while decrementing " "connection counter", hostname, port))); - return; } /* we should never go below 0 */ - Assert(connectionEntry->connectionCount > 0); + Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || + workerNodeConnectionEntry->maintenanceConnectionsCount > 0); - connectionEntry->connectionCount -= 1; - - if (connectionEntry->connectionCount == 0) + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + if (maintenanceConnection) { - /* - * We don't have to remove at this point as the node might be still active - * and will have new connections open to it. Still, this seems like a convenient - * place to remove the entry, as connectionCount == 0 implies that the server is - * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, - * we're unlikely to trigger this often. - */ - hash_search(SharedConnStatsHash, &connKey, HASH_REMOVE, &entryFound); + workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; + } + else + { + workerNodeConnectionEntry->regularConnectionsCount -= 1; } - UnLockConnectionSharedMemory(); + if (IsLoggableLevel(DEBUG4)) + { + ereport(DEBUG4, errmsg( + "Decrementing %s connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is released", + maintenanceConnection ? "maintenance" : "regular", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database + )); + } - WakeupWaiterBackendsForSharedConnection(); + /* + * We don't have to remove at this point as the node might be still active + * and will have new connections open to it. Still, this seems like a convenient + * place to remove the entry, as count == 0 implies that the server is + * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, + * we're unlikely to trigger this often. + */ + if (workerNodeConnectionEntry->regularConnectionsCount == 0 && + workerNodeConnectionEntry->maintenanceConnectionsCount == 0) + { + hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); + } + + /* + * Perform the same with SharedWorkerNodeDatabaseConnStatsHashKey + */ + + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey = + PrepareWorkerNodeDatabaseHashKey(hostname, port, MyDatabaseId); + bool workerNodeDatabaseEntryFound = false; + SharedWorkerNodeDatabaseConnStatsHashEntry *workerNodeDatabaseEntry = + hash_search(SharedWorkerNodeDatabaseConnStatsHash, + &workerNodeDatabaseKey, + HASH_FIND, + &workerNodeDatabaseEntryFound); + + if (!workerNodeDatabaseEntryFound) + { + return; + } + + Assert(workerNodeDatabaseEntry->count > 0); + + workerNodeDatabaseEntry->count -= 1; + + if (workerNodeDatabaseEntry->count == 0) + { + hash_search(SharedWorkerNodeDatabaseConnStatsHash, &workerNodeDatabaseKey, + HASH_REMOVE, NULL); + } } /* * LockConnectionSharedMemory is a utility function that should be used when - * accessing to the SharedConnStatsHash, which is in the shared memory. + * accessing to the SharedWorkerNodeConnStatsHash, which is in the shared memory. */ static void LockConnectionSharedMemory(LWLockMode lockMode) @@ -587,9 +676,18 @@ UnLockConnectionSharedMemory(void) * this function. */ void -WakeupWaiterBackendsForSharedConnection(void) +WakeupWaiterBackendsForSharedConnection(uint32 flags) { - ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); + if (flags & MAINTENANCE_CONNECTION) + { + ConditionVariableBroadcast( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable); + } + else + { + ConditionVariableBroadcast( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable); + } } @@ -600,10 +698,30 @@ WakeupWaiterBackendsForSharedConnection(void) * WakeupWaiterBackendsForSharedConnection(). */ void -WaitForSharedConnection(void) +WaitForSharedConnection(uint32 flags) { - ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, - PG_WAIT_EXTENSION); + if (flags & MAINTENANCE_CONNECTION) + { + bool connectionSlotNotAcquired = ConditionVariableTimedSleep( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable, + MaintenanceConnectionPoolTimeout, + PG_WAIT_EXTENSION); + if (connectionSlotNotAcquired) + { + ereport(ERROR, (errmsg("Failed to acquire maintenance connection for %i ms", + MaintenanceConnectionPoolTimeout), + errhint( + "Try increasing citus.max_maintenance_shared_pool_size or " + "citus.maintenance_connection_pool_timeout" + ))); + } + } + else + { + ConditionVariableSleep( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable, + PG_WAIT_EXTENSION); + } } @@ -640,10 +758,18 @@ SharedConnectionStatsShmemSize(void) size = add_size(size, sizeof(ConnectionStatsSharedData)); - Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, - sizeof(SharedConnStatsHashEntry)); + Size workerNodeConnHashSize = hash_estimate_size(MaxWorkerNodesTracked, + sizeof( + SharedWorkerNodeConnStatsHashEntry)); - size = add_size(size, hashSize); + size = add_size(size, workerNodeConnHashSize); + + Size workerNodeDatabaseConnSize = hash_estimate_size(MaxWorkerNodesTracked * + MaxDatabasesPerWorkerNodesTracked, + sizeof( + SharedWorkerNodeDatabaseConnStatsHashEntry)); + + size = add_size(size, workerNodeDatabaseConnSize); return size; } @@ -657,15 +783,6 @@ void SharedConnectionStatsShmemInit(void) { bool alreadyInitialized = false; - HASHCTL info; - - /* create (hostname, port, database) -> [counter] */ - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(SharedConnStatsHashKey); - info.entrysize = sizeof(SharedConnStatsHashEntry); - info.hash = SharedConnectionHashHash; - info.match = SharedConnectionHashCompare; - uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); /* * Currently the lock isn't required because allocation only happens at @@ -691,17 +808,54 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); - ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); + ConditionVariableInit( + &ConnectionStatsSharedState->regularConnectionWaitersConditionVariable); + ConditionVariableInit( + &ConnectionStatsSharedState->maintenanceConnectionWaitersConditionVariable); } - /* allocate hash table */ - SharedConnStatsHash = - ShmemInitHash("Shared Conn. Stats Hash", MaxWorkerNodesTracked, - MaxWorkerNodesTracked, &info, hashFlags); + /* allocate hash tables */ + + /* create (hostname, port) -> [counter] */ + HASHCTL sharedWorkerNodeConnStatsHashInfo; + memset(&sharedWorkerNodeConnStatsHashInfo, 0, + sizeof(sharedWorkerNodeConnStatsHashInfo)); + sharedWorkerNodeConnStatsHashInfo.keysize = sizeof(SharedWorkerNodeConnStatsHashKey); + sharedWorkerNodeConnStatsHashInfo.entrysize = + sizeof(SharedWorkerNodeConnStatsHashEntry); + sharedWorkerNodeConnStatsHashInfo.hash = SharedConnectionHashHash; + sharedWorkerNodeConnStatsHashInfo.match = SharedConnectionHashCompare; + SharedWorkerNodeConnStatsHash = + ShmemInitHash("Shared Conn. Stats Hash", + MaxWorkerNodesTracked, + MaxWorkerNodesTracked, + &sharedWorkerNodeConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); + + /* create (hostname, port, database) -> [counter] */ + HASHCTL sharedWorkerNodeDatabaseConnStatsHashInfo; + memset(&sharedWorkerNodeDatabaseConnStatsHashInfo, 0, + sizeof(sharedWorkerNodeDatabaseConnStatsHashInfo)); + sharedWorkerNodeDatabaseConnStatsHashInfo.keysize = + sizeof(SharedWorkerNodeDatabaseConnStatsHashKey); + sharedWorkerNodeDatabaseConnStatsHashInfo.entrysize = + sizeof(SharedWorkerNodeDatabaseConnStatsHashEntry); + sharedWorkerNodeDatabaseConnStatsHashInfo.hash = SharedWorkerNodeDatabaseHashHash; + sharedWorkerNodeDatabaseConnStatsHashInfo.match = SharedWorkerNodeDatabaseHashCompare; + + int sharedWorkerNodeDatabaseConnStatsHashSize = MaxWorkerNodesTracked * + MaxDatabasesPerWorkerNodesTracked; + SharedWorkerNodeDatabaseConnStatsHash = + ShmemInitHash("Shared Conn Per Database. Stats Hash", + sharedWorkerNodeDatabaseConnStatsHashSize, + sharedWorkerNodeDatabaseConnStatsHashSize, + &sharedWorkerNodeDatabaseConnStatsHashInfo, + (HASH_ELEM | HASH_FUNCTION | HASH_COMPARE)); LWLockRelease(AddinShmemInitLock); - Assert(SharedConnStatsHash != NULL); + Assert(SharedWorkerNodeConnStatsHash != NULL); + Assert(SharedWorkerNodeDatabaseConnStatsHash != NULL); Assert(ConnectionStatsSharedState->sharedConnectionHashTrancheId != 0); if (prev_shmem_startup_hook != NULL) @@ -800,14 +954,53 @@ ShouldWaitForConnection(int currentConnectionCount) } +static SharedWorkerNodeConnStatsHashKey +PrepareWorkerNodeHashKey(const char *hostname, int port) +{ + SharedWorkerNodeConnStatsHashKey key; + strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + key.port = port; + return key; +} + + +static SharedWorkerNodeDatabaseConnStatsHashKey +PrepareWorkerNodeDatabaseHashKey(const char *hostname, + int port, + Oid database) +{ + SharedWorkerNodeDatabaseConnStatsHashKey workerNodeDatabaseKey; + workerNodeDatabaseKey.workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); + workerNodeDatabaseKey.database = database; + return workerNodeDatabaseKey; +} + + static uint32 SharedConnectionHashHash(const void *key, Size keysize) { - SharedConnStatsHashKey *entry = (SharedConnStatsHashKey *) key; + SharedWorkerNodeConnStatsHashKey *entry = (SharedWorkerNodeConnStatsHashKey *) key; uint32 hash = string_hash(entry->hostname, NAMEDATALEN); hash = hash_combine(hash, hash_uint32(entry->port)); - hash = hash_combine(hash, hash_uint32(entry->databaseOid)); + + return hash; +} + + +static uint32 +SharedWorkerNodeDatabaseHashHash(const void *key, Size keysize) +{ + SharedWorkerNodeDatabaseConnStatsHashKey *entry = + (SharedWorkerNodeDatabaseConnStatsHashKey *) key; + uint32 hash = SharedConnectionHashHash(&(entry->workerNodeKey), keysize); + hash = hash_combine(hash, hash_uint32(entry->database)); return hash; } @@ -816,17 +1009,39 @@ SharedConnectionHashHash(const void *key, Size keysize) static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize) { - SharedConnStatsHashKey *ca = (SharedConnStatsHashKey *) a; - SharedConnStatsHashKey *cb = (SharedConnStatsHashKey *) b; + SharedWorkerNodeConnStatsHashKey *ca = (SharedWorkerNodeConnStatsHashKey *) a; + SharedWorkerNodeConnStatsHashKey *cb = (SharedWorkerNodeConnStatsHashKey *) b; - if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || - ca->port != cb->port || - ca->databaseOid != cb->databaseOid) - { - return 1; - } - else - { - return 0; - } + return strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || + ca->port != cb->port; +} + + +static int +SharedWorkerNodeDatabaseHashCompare(const void *a, const void *b, Size keysize) +{ + SharedWorkerNodeDatabaseConnStatsHashKey *ca = + (SharedWorkerNodeDatabaseConnStatsHashKey *) a; + SharedWorkerNodeDatabaseConnStatsHashKey *cb = + (SharedWorkerNodeDatabaseConnStatsHashKey *) b; + + int sharedConnectionHashCompare = + SharedConnectionHashCompare(&(ca->workerNodeKey), &(cb->workerNodeKey), keysize); + return sharedConnectionHashCompare || + ca->database != cb->database; +} + + +static bool +isConnectionThrottlingDisabled(uint32 externalFlags) +{ + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + + /* + * Do not call Get*PoolSize() functions here, since it may read from + * the catalog and we may be in the process exit handler. + */ + return maintenanceConnection + ? MaxMaintenanceSharedPoolSize <= 0 + : MaxSharedPoolSize == DISABLE_CONNECTION_THROTTLING; } diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index ba622e4d7..53109e324 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -39,6 +39,7 @@ /* Config variables managed via guc.c */ char *WorkerListFileName; int MaxWorkerNodesTracked = 2048; /* determines worker node hash table size */ +int MaxDatabasesPerWorkerNodesTracked = 1; /* determine database per worker hash table size */ /* Local functions forward declarations */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 6d26b802f..b1e0a7f88 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1844,6 +1844,19 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.maintenance_connection_pool_timeout", + gettext_noop( + "Timeout for acquiring a connection from a maintenance shared pool size. " + "Applicable only when the maintenance pool is enabled via citus.max_maintenance_shared_pool_size. " + "Setting it to 0 or -1 disables the timeout"), + NULL, + &MaintenanceConnectionPoolTimeout, + 30 * MS_PER_SECOND, -1, MS_PER_HOUR, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_adaptive_executor_pool_size", gettext_noop("Sets the maximum number of connections per worker node used by " @@ -1933,6 +1946,20 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_databases_per_worker_tracked", + gettext_noop("Sets the amount of databases per worker tracked."), + gettext_noop( + "This configuration value complements the citus.max_worker_nodes_tracked." + "It should be used when there are more then one database with Citus in cluster," + "and, effectively, limits the size of the hash table with connections per worker + database." + "Currently, it does not affect the connection management logic and serves only statistical purposes."), + &MaxDatabasesPerWorkerNodesTracked, + 1, 1, INT_MAX, + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_high_priority_background_processes", gettext_noop("Sets the maximum number of background processes " @@ -1958,6 +1985,18 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB | GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.max_maintenance_shared_pool_size", + gettext_noop("Similar to citus.max_shared_pool_size, but applies to connections " + "for maintenance operations only. " + "Setting it to 0 or -1 disables maintenance connection throttling."), + NULL, + &MaxMaintenanceSharedPoolSize, + -1, -1, INT_MAX, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_matview_size_to_auto_recreate", gettext_noop("Sets the maximum size of materialized views in MB to " diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c index c59602887..917ffc19d 100644 --- a/src/backend/distributed/test/shared_connection_counters.c +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -33,7 +33,8 @@ PG_FUNCTION_INFO_V1(set_max_shared_pool_size); Datum wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) { - WakeupWaiterBackendsForSharedConnection(); + WakeupWaiterBackendsForSharedConnection(0); + WakeupWaiterBackendsForSharedConnection(MAINTENANCE_CONNECTION); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 695df2bf4..1d6bbeef8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -153,7 +153,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) { const char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; - int connectionFlags = 0; + int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; if (workerNode->groupId == localGroupId) { diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index c31dc85a2..8f3e65059 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -160,7 +160,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) bool recoveryFailed = false; - int connectionFlags = 0; + int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9cef13539..cc808eb89 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -49,6 +49,7 @@ #include "distributed/background_jobs.h" #include "distributed/citus_safe_lib.h" +#include "distributed/connection_management.h" #include "distributed/coordinator_protocol.h" #include "distributed/distributed_deadlock_detection.h" #include "distributed/maintenanced.h" @@ -89,12 +90,14 @@ typedef struct MaintenanceDaemonDBData Oid userOid; pid_t workerPid; bool daemonStarted; + bool daemonShuttingDown; bool triggerNodeMetadataSync; Latch *latch; /* pointer to the background worker's latch */ } MaintenanceDaemonDBData; /* config variable for distributed deadlock detection timeout */ double DistributedDeadlockDetectionTimeoutFactor = 2.0; +char *MaintenanceManagementDatabase = ""; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; @@ -118,7 +121,7 @@ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; /* set to true when becoming a maintenance daemon */ -static bool IsMaintenanceDaemon = false; +bool IsMaintenanceDaemon = false; static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); @@ -241,6 +244,14 @@ InitializeMaintenanceDaemonBackend(void) return; } + if (dbData->daemonShuttingDown) + { + elog(DEBUG1, "Another maintenance daemon for database %u is shutting down. " + "Aborting current initialization", MyDatabaseId); + LWLockRelease(&MaintenanceDaemonControl->lock); + return; + } + if (IsMaintenanceDaemon) { /* @@ -1056,20 +1067,11 @@ MaintenanceDaemonShmemExit(int code, Datum arg) MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &databaseOid, - HASH_FIND, NULL); - - /* myDbData is NULL after StopMaintenanceDaemon */ - if (myDbData != NULL) - { - /* - * Confirm that I am still the registered maintenance daemon before exiting. - */ - Assert(myDbData->workerPid == MyProcPid); - - myDbData->daemonStarted = false; - myDbData->workerPid = 0; - } + HASH_REMOVE, NULL); + /* Workaround for -Werror=unused-variable */ + (void) myDbData; + Assert(myDbData->workerPid == MyProcPid); LWLockRelease(&MaintenanceDaemonControl->lock); } @@ -1168,11 +1170,12 @@ StopMaintenanceDaemon(Oid databaseId) MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( MaintenanceDaemonDBHash, &databaseId, - HASH_REMOVE, &found); + HASH_FIND, &found); if (found) { workerPid = dbData->workerPid; + dbData->daemonShuttingDown = true; } LWLockRelease(&MaintenanceDaemonControl->lock); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index d93e4483a..cc2fa3403 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -123,7 +123,14 @@ enum MultiConnectionMode * * This is need to run 'CREATE_REPLICATION_SLOT' command. */ - REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 + REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8, + + /* + * This flag specifies that connection is required for maintenance operations, e.g. + * transaction recovery, distributed deadlock detection. Such connections have + * a reserved quota of the MaxSharedPoolSize. + */ + REQUIRE_MAINTENANCE_CONNECTION = 1 << 9 }; @@ -223,6 +230,9 @@ typedef struct MultiConnection /* replication option */ bool requiresReplication; + /* See REQUIRE_MAINTENANCE_CONNECTION */ + bool useForMaintenanceOperations; + MultiConnectionStructInitializationState initializationState; } MultiConnection; diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h index adec8c9c4..637cbce47 100644 --- a/src/include/distributed/locally_reserved_shared_connections.h +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -20,8 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, Oid databaseOid); extern void DeallocateReservedConnections(void); -extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); -extern bool TryConnectionPossibilityForLocalPrimaryNode(void); -extern bool IsReservationPossible(void); +extern void EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags); +extern bool TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags); +extern bool IsReservationPossible(uint32 connectionFlags); #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index 07387a7fd..7e7a1d474 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -12,6 +12,8 @@ #ifndef MAINTENANCED_H #define MAINTENANCED_H +#include "commands/dbcommands.h" + /* collect statistics every 24 hours */ #define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000) diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 007691e16..681e5bf5b 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,25 +16,38 @@ #define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 #define ALLOW_ALL_EXTERNAL_CONNECTIONS -1 +enum SharedPoolCounterMode +{ + /* + * Use this flag to reserve a connection from a maintenance quota + */ + MAINTENANCE_CONNECTION = 1 << 0 +}; extern int MaxSharedPoolSize; +extern int MaxMaintenanceSharedPoolSize; +extern int MaintenanceConnectionPoolTimeout; extern int LocalSharedPoolSize; extern int MaxClientConnections; extern void InitializeSharedConnectionStats(void); -extern void WaitForSharedConnection(void); -extern void WakeupWaiterBackendsForSharedConnection(void); +extern void WaitForSharedConnection(uint32); +extern void WakeupWaiterBackendsForSharedConnection(uint32); extern size_t SharedConnectionStatsShmemSize(void); extern void SharedConnectionStatsShmemInit(void); extern int GetMaxClientConnections(void); extern int GetMaxSharedPoolSize(void); +extern int GetMaxMaintenanceSharedPoolSize(void); extern int GetLocalSharedPoolSize(void); -extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); -extern void WaitLoopForSharedConnection(const char *hostname, int port); -extern void DecrementSharedConnectionCounter(const char *hostname, int port); -extern void IncrementSharedConnectionCounter(const char *hostname, int port); -extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int - activeConnectionCount); +extern bool TryToIncrementSharedConnectionCounter(uint32 flags, const char *hostname, + int port); +extern void WaitLoopForSharedConnection(uint32 flags, const char *hostname, int port); +extern void DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, + int port); +extern void IncrementSharedConnectionCounter(uint32 flags, const char *hostname, + int port); +extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, + int activeConnectionCount); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..4e1287e07 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -59,6 +59,7 @@ typedef struct WorkerNode /* Config variables managed via guc.c */ extern int MaxWorkerNodesTracked; +extern int MaxDatabasesPerWorkerNodesTracked; extern char *WorkerListFileName; extern char *CurrentCluster; diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 665cd30c2..255ff804c 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -48,8 +48,6 @@ get_guc_variables_compat(int *gucCount) #define pgstat_fetch_stat_local_beentry(a) pgstat_get_local_beentry_by_index(a) -#define have_createdb_privilege() have_createdb_privilege() - #else #include "miscadmin.h" diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 6c09e0b38..420ddb5c0 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -823,7 +823,7 @@ class Postgres(QueryRunner): # of our tests pgconf.write("max_logical_replication_workers = 50\n") pgconf.write("max_wal_senders = 50\n") - pgconf.write("max_worker_processes = 50\n") + pgconf.write("max_worker_processes = 150\n") pgconf.write("max_replication_slots = 50\n") # We need to make the log go to stderr so that the tests can @@ -846,6 +846,8 @@ class Postgres(QueryRunner): # happened pgconf.write("restart_after_crash = off\n") + # prevent tests from hanging + pgconf.write("statement_timeout= '5min'\n") os.truncate(self.hba_path, 0) self.ssl_access("all", "trust") self.nossl_access("all", "trust") @@ -977,6 +979,14 @@ class Postgres(QueryRunner): for config in configs: self.sql(f"alter system set {config}") + def reset_configuration(self, *configs): + """Reset specific Postgres settings using ALTER SYSTEM RESET + NOTE: after configuring a call to reload or restart is needed for the + settings to become effective. + """ + for config in configs: + self.sql(f"alter system reset {config}") + def log_handle(self): """Returns the opened logfile at the current end of the log @@ -1018,6 +1028,15 @@ class Postgres(QueryRunner): self.databases.add(name) self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name))) + def drop_database(self, name): + self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=name) + self.sql( + sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format( + sql.Identifier(name) + ) + ) + self.databases.remove(name) + def create_schema(self, name): self.schemas.add(name) self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) @@ -1047,9 +1066,13 @@ class Postgres(QueryRunner): def cleanup_databases(self): for database in self.databases: + self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=database) self.sql( - sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database)) + sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format( + sql.Identifier(database) + ) ) + self.databases.clear() def cleanup_schemas(self): for schema in self.schemas: diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 9a648c0ab..fa430fa0a 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -241,7 +241,8 @@ def run_python_test(test_name, args): "pytest", "pytest", "--numprocesses", - "auto", + # Tests may be heavy, so limit the concurrency + "2", "--count", str(args["repeat"]), str(test_path), diff --git a/src/test/regress/citus_tests/test/test_maintenancedeamon.py b/src/test/regress/citus_tests/test/test_maintenancedeamon.py index 3f6cb501e..1eb4e28c9 100644 --- a/src/test/regress/citus_tests/test/test_maintenancedeamon.py +++ b/src/test/regress/citus_tests/test/test_maintenancedeamon.py @@ -62,7 +62,7 @@ def test_set_maindb(cluster_factory): wait_until_maintenance_deamons_start(2, cluster) - cluster.coordinator.sql("DROP DATABASE mymaindb;") + cluster.coordinator.drop_database("mymaindb") wait_until_maintenance_deamons_start(1, cluster) diff --git a/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py new file mode 100644 index 000000000..6df95b4f7 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py @@ -0,0 +1,149 @@ +import asyncio + +import pytest +from psycopg.errors import DeadlockDetected + +# For every database there is expected to be 2 queries, +# so ~80 connections will be held by deadlocks. Another 5 is expected to be used by maintenance daemon, +# leaving ~15 available +DATABASES_NUMBER = 40 + + +async def test_multiple_databases_distributed_deadlock_detection(cluster): + + # Disable maintenance on all nodes + for node in cluster.nodes: + node.configure( + "citus.recover_2pc_interval = '-1'", + "citus.distributed_deadlock_detection_factor = '-1'", + "citus.max_maintenance_shared_pool_size = 5", + ) + node.restart() + + # Prepare database names for test + db_names = [f"db{db_index}" for db_index in range(1, DATABASES_NUMBER + 1)] + + # Create and configure databases + for db_name in db_names: + nodes = cluster.workers + [cluster.coordinator] + for node in nodes: + node.create_database(f"{db_name}") + with node.cur(dbname=db_name) as node_cursor: + node_cursor.execute("CREATE EXTENSION citus;") + if node == cluster.coordinator: + for worker in cluster.workers: + node_cursor.execute( + "SELECT pg_catalog.citus_add_node(%s, %s)", + (worker.host, worker.port), + ) + node_cursor.execute( + """ + CREATE TABLE public.deadlock_detection_test (user_id int UNIQUE, some_val int); + SELECT create_distributed_table('public.deadlock_detection_test', 'user_id'); + INSERT INTO public.deadlock_detection_test SELECT i, i FROM generate_series(1,2) i; + """ + ) + + async def create_deadlock(db_name, run_on_coordinator): + """Function to prepare a deadlock query in a given database""" + # Init connections and store for later commits + if run_on_coordinator: + first_connection = await cluster.coordinator.aconn( + dbname=db_name, autocommit=False + ) + first_cursor = first_connection.cursor() + second_connection = await cluster.coordinator.aconn( + dbname=db_name, autocommit=False + ) + second_cursor = second_connection.cursor() + else: + first_connection = await cluster.workers[0].aconn( + dbname=db_name, autocommit=False + ) + first_cursor = first_connection.cursor() + second_connection = await cluster.workers[1].aconn( + dbname=db_name, autocommit=False + ) + second_cursor = second_connection.cursor() + + # initiate deadlock + await first_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 1;" + ) + await second_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 2;" + ) + + # Test that deadlock is resolved by a maintenance daemon + with pytest.raises(DeadlockDetected): + + async def run_deadlocked_queries(): + await asyncio.gather( + second_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 1;" + ), + first_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 2;" + ), + ) + + await asyncio.wait_for(run_deadlocked_queries(), 300) + + await first_connection.rollback() + await second_connection.rollback() + + async def enable_maintenance_when_deadlocks_ready(): + """Function to enable maintenance daemons, when all the expected deadlock queries are ready""" + # Let deadlocks commence + await asyncio.sleep(2) + # Check that queries are deadlocked + databases_with_deadlock = set() + while len(databases_with_deadlock) < DATABASES_NUMBER: + for db_name in (db for db in db_names if db not in databases_with_deadlock): + for node in cluster.nodes: + async with node.acur(dbname=db_name) as cursor: + expected_lock_count = 4 if node == cluster.coordinator else 2 + await cursor.execute( + """ + SELECT count(*) = %s AS deadlock_created + FROM pg_locks + INNER JOIN pg_class pc ON relation = oid + WHERE relname LIKE 'deadlock_detection_test%%'""", + (expected_lock_count,), + ) + queries_deadlocked = await cursor.fetchone() + if queries_deadlocked[0]: + databases_with_deadlock.add(db_name) + # Enable maintenance back + for node in cluster.nodes: + node.reset_configuration( + "citus.recover_2pc_interval", + "citus.distributed_deadlock_detection_factor", + ) + node.reload() + + # Distribute deadlocked queries among all nodes in the cluster + tasks = list() + for idx, db_name in enumerate(db_names): + run_on_coordinator = True if idx % 3 == 0 else False + tasks.append( + create_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator) + ) + + tasks.append(enable_maintenance_when_deadlocks_ready()) + + # await for the results + await asyncio.gather(*tasks) + + # Check for "too many clients" on all nodes + for node in cluster.nodes: + with node.cur() as cursor: + cursor.execute( + """ + SELECT count(*) AS too_many_clients_errors_count + FROM regexp_split_to_table(pg_read_file(%s), E'\n') AS t(log_line) + WHERE log_line LIKE '%%sorry, too many clients already%%';""", + (node.log_path.as_posix(),), + ) + too_many_clients_errors_count = cursor.fetchone()[0] + assert too_many_clients_errors_count == 0 diff --git a/src/test/regress/expected/citus_local_tables.out b/src/test/regress/expected/citus_local_tables.out index 4f3053094..08784f789 100644 --- a/src/test/regress/expected/citus_local_tables.out +++ b/src/test/regress/expected/citus_local_tables.out @@ -949,10 +949,15 @@ select count(*) from pg_constraint where conname = 'fkey_test_drop'; -- verify we still preserve the child-parent hierarchy after all conversions -- check the shard partition -select inhrelid::regclass from pg_inherits where (select inhparent::regclass::text) ~ '^parent_1_\d{7}$' order by 1; - inhrelid +SELECT count(*) = 1 AS child_parent_hierarchy_test +FROM pg_inherits +WHERE (SELECT inhparent::regclass::text) ~ '^parent_1_\d{7}$' + -- order of the child shard id is not guaranteed, but should be either 1904004 or 04 + AND (inhrelid::regclass::text) IN ('parent_1_child_1_1904004', 'parent_1_child_1_1904006') +ORDER BY 1; + child_parent_hierarchy_test --------------------------------------------------------------------- - parent_1_child_1_1904006 + t (1 row) -- check the shell partition diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index 586dd4756..76edf6e17 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -144,6 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: connection not open COMMIT; +-- Maintenance connections are not cached SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); mitmproxy --------------------------------------------------------------------- @@ -157,8 +158,11 @@ SELECT recover_prepared_transactions(); (1 row) SELECT recover_prepared_transactions(); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + -- bug from https://github.com/citusdata/citus/issues/1926 SET citus.max_cached_conns_per_worker TO 0; -- purge cache DROP TABLE select_test; diff --git a/src/test/regress/expected/global_cancel.out b/src/test/regress/expected/global_cancel.out index e5ce4fbc6..3687871d6 100644 --- a/src/test/regress/expected/global_cancel.out +++ b/src/test/regress/expected/global_cancel.out @@ -56,8 +56,8 @@ SELECT global_pid AS maintenance_daemon_gpid FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = gaat.process_id WHERE application_name = 'Citus Maintenance Daemon' \gset SET client_min_messages TO ERROR; -CREATE USER global_cancel_user; -SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); +CREATE USER global_cancel_user NOSUPERUSER ; +SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user NOSUPERUSER ;'); ?column? --------------------------------------------------------------------- 1 @@ -66,6 +66,12 @@ SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); RESET client_min_messages; \c - global_cancel_user - :master_port +SELECT current_user; + current_user +--------------------------------------------------------------------- + global_cancel_user +(1 row) + SELECT pg_typeof(:maintenance_daemon_gpid); pg_typeof --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_maintenance_multiple_databases.out b/src/test/regress/expected/multi_maintenance_multiple_databases.out new file mode 100644 index 000000000..1b0a65046 --- /dev/null +++ b/src/test/regress/expected/multi_maintenance_multiple_databases.out @@ -0,0 +1,545 @@ +-- This test verfies a behavioir of maintenance daemon in multi-database environment +-- It checks that distributed deadlock detection and 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota. +-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there. +SELECT $definition$ + ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1'; + SELECT pg_reload_conf(); + $definition$ AS turn_off_maintenance +\gset +SELECT $deinition$ +ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); +$deinition$ AS turn_on_maintenance +\gset +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + db_create_statement text; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('CREATE DATABASE %I', db_name) + INTO db_create_statement; + + PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port), + db_create_statement); + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'CREATE EXTENSION citus;'); + IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport)) + FROM pg_dist_node + WHERE groupid != 0 AND isactive AND noderole = 'primary'; + END IF; + END LOOP; + END; + $do$; + $definition$ AS create_databases +\gset +-- Code reiles heavily on dblink for cross-db and cross-node queries +CREATE EXTENSION IF NOT EXISTS dblink; +-- Disable maintenance operations to prepare the environment +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Create databases +\c - - - :worker_1_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +\c - - - :worker_2_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +\c - - - :master_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +-- Generate distributed transactions +\c - - - :master_port +DO +$do$ + DECLARE + index int; + db_name text; + transaction_to_abort_name text; + transaction_to_commit_name text; + transaction_to_be_forgotten text; + coordinator_port int; + BEGIN + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('citus_0_1234_3_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_abort_name; + + SELECT format('citus_0_1234_4_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_commit_name; + + SELECT format('citus_0_should_be_forgotten_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_be_forgotten; + + SELECT setting::int + FROM pg_settings + WHERE name = 'port' + INTO coordinator_port; + + -- Prepare transactions on workers + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport), + format($worker_cmd$ + BEGIN; + CREATE TABLE should_abort + (value int); + PREPARE TRANSACTION '%s'; + + BEGIN; + CREATE TABLE should_commit + (value int); + PREPARE TRANSACTION '%s'; + $worker_cmd$, transaction_to_abort_name, transaction_to_commit_name)) + FROM pg_dist_node + WHERE groupid != 0 + AND isactive + AND noderole = 'primary'; + + -- Fill the pg_dist_transaction + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port), + format($coordinator_cmd$ + INSERT INTO pg_dist_transaction + SELECT groupid, '%s' FROM pg_dist_node + UNION ALL + SELECT groupid, '%s' FROM pg_dist_node; + $coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten)); + END LOOP; + END; +$do$; +-- Verify state before enabling maintenance +\c - - - :master_port +SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT groupid, gid + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + pg_dist_transaction_before_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_before_recover_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_before_recover_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +-- Turn on the maintenance +\c - - - :master_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +-- Verify maintenance result +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + +DO +$$ + DECLARE + pg_dist_transaction_after_recovery_coordinator_test boolean; + cached_connections_after_recovery_coordinator_test boolean; + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + SELECT count(*) = 0 + FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT groupid, gid + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) + WHERE datname LIKE 'db%' + INTO pg_dist_transaction_after_recovery_coordinator_test; + + SELECT count(*) = 0 AS cached_connections_after_recovery_coordinator_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval + INTO cached_connections_after_recovery_coordinator_test; + + IF (pg_dist_transaction_after_recovery_coordinator_test + AND cached_connections_after_recovery_coordinator_test) THEN + EXIT; + END IF; + + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; +\c - - - :worker_1_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_after_recover_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +DO +$$ + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_1_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval) THEN + EXIT; + END IF; + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; +\c - - - :worker_2_port +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + too_many_clients_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_after_recover_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +DO +$$ + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_2_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval) THEN + EXIT; + END IF; + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; +-- Cleanup +\c - - - :master_port +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'DROP EXTENSION citus;'); + END LOOP; + END; + $do$; + + -- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink + DROP DATABASE db1 WITH (FORCE); + DROP DATABASE db2 WITH (FORCE); + DROP DATABASE db3 WITH (FORCE); + DROP DATABASE db4 WITH (FORCE); + DROP DATABASE db5 WITH (FORCE); + DROP DATABASE db6 WITH (FORCE); + DROP DATABASE db7 WITH (FORCE); + DROP DATABASE db8 WITH (FORCE); + DROP DATABASE db9 WITH (FORCE); + DROP DATABASE db10 WITH (FORCE); + DROP DATABASE db11 WITH (FORCE); + DROP DATABASE db12 WITH (FORCE); + DROP DATABASE db13 WITH (FORCE); + DROP DATABASE db14 WITH (FORCE); + DROP DATABASE db15 WITH (FORCE); + DROP DATABASE db16 WITH (FORCE); + DROP DATABASE db17 WITH (FORCE); + DROP DATABASE db18 WITH (FORCE); + DROP DATABASE db19 WITH (FORCE); + DROP DATABASE db20 WITH (FORCE); + DROP DATABASE db21 WITH (FORCE); + DROP DATABASE db22 WITH (FORCE); + DROP DATABASE db23 WITH (FORCE); + DROP DATABASE db24 WITH (FORCE); + DROP DATABASE db25 WITH (FORCE); + DROP DATABASE db26 WITH (FORCE); + DROP DATABASE db27 WITH (FORCE); + DROP DATABASE db28 WITH (FORCE); + DROP DATABASE db29 WITH (FORCE); + DROP DATABASE db30 WITH (FORCE); + DROP DATABASE db31 WITH (FORCE); + DROP DATABASE db32 WITH (FORCE); + DROP DATABASE db33 WITH (FORCE); + DROP DATABASE db34 WITH (FORCE); + DROP DATABASE db35 WITH (FORCE); + DROP DATABASE db36 WITH (FORCE); + DROP DATABASE db37 WITH (FORCE); + DROP DATABASE db38 WITH (FORCE); + DROP DATABASE db39 WITH (FORCE); + DROP DATABASE db40 WITH (FORCE); + DROP DATABASE db41 WITH (FORCE); + DROP DATABASE db42 WITH (FORCE); + DROP DATABASE db43 WITH (FORCE); + DROP DATABASE db44 WITH (FORCE); + DROP DATABASE db45 WITH (FORCE); + DROP DATABASE db46 WITH (FORCE); + DROP DATABASE db47 WITH (FORCE); + DROP DATABASE db48 WITH (FORCE); + DROP DATABASE db49 WITH (FORCE); + DROP DATABASE db50 WITH (FORCE); + DROP DATABASE db51 WITH (FORCE); + DROP DATABASE db52 WITH (FORCE); + DROP DATABASE db53 WITH (FORCE); + DROP DATABASE db54 WITH (FORCE); + DROP DATABASE db55 WITH (FORCE); + DROP DATABASE db56 WITH (FORCE); + DROP DATABASE db57 WITH (FORCE); + DROP DATABASE db58 WITH (FORCE); + DROP DATABASE db59 WITH (FORCE); + DROP DATABASE db60 WITH (FORCE); + DROP DATABASE db61 WITH (FORCE); + DROP DATABASE db62 WITH (FORCE); + DROP DATABASE db63 WITH (FORCE); + DROP DATABASE db64 WITH (FORCE); + DROP DATABASE db65 WITH (FORCE); + DROP DATABASE db66 WITH (FORCE); + DROP DATABASE db67 WITH (FORCE); + DROP DATABASE db68 WITH (FORCE); + DROP DATABASE db69 WITH (FORCE); + DROP DATABASE db70 WITH (FORCE); + DROP DATABASE db71 WITH (FORCE); + DROP DATABASE db72 WITH (FORCE); + DROP DATABASE db73 WITH (FORCE); + DROP DATABASE db74 WITH (FORCE); + DROP DATABASE db75 WITH (FORCE); + DROP DATABASE db76 WITH (FORCE); + DROP DATABASE db77 WITH (FORCE); + DROP DATABASE db78 WITH (FORCE); + DROP DATABASE db79 WITH (FORCE); + DROP DATABASE db80 WITH (FORCE); + DROP DATABASE db81 WITH (FORCE); + DROP DATABASE db82 WITH (FORCE); + DROP DATABASE db83 WITH (FORCE); + DROP DATABASE db84 WITH (FORCE); + DROP DATABASE db85 WITH (FORCE); + DROP DATABASE db86 WITH (FORCE); + DROP DATABASE db87 WITH (FORCE); + DROP DATABASE db88 WITH (FORCE); + DROP DATABASE db89 WITH (FORCE); + DROP DATABASE db90 WITH (FORCE); + DROP DATABASE db91 WITH (FORCE); + DROP DATABASE db92 WITH (FORCE); + DROP DATABASE db93 WITH (FORCE); + DROP DATABASE db94 WITH (FORCE); + DROP DATABASE db95 WITH (FORCE); + DROP DATABASE db96 WITH (FORCE); + DROP DATABASE db97 WITH (FORCE); + DROP DATABASE db98 WITH (FORCE); + DROP DATABASE db99 WITH (FORCE); + DROP DATABASE db100 WITH (FORCE); + + SELECT count(*) = 0 as all_databases_dropped + FROM pg_database + WHERE datname LIKE 'db%'; + + ALTER SYSTEM RESET citus.recover_2pc_interval; + ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size; + SELECT pg_reload_conf(); + + $definition$ AS cleanup +\gset +:cleanup + all_databases_dropped +--------------------------------------------------------------------- + t +(1 row) + + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +:cleanup + all_databases_dropped +--------------------------------------------------------------------- + t +(1 row) + + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +:cleanup + all_databases_dropped +--------------------------------------------------------------------- + t +(1 row) + + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +DROP EXTENSION IF EXISTS dblink; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 015f74973..50ddd3e7a 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -220,6 +220,8 @@ test: multi_generate_ddl_commands test: multi_create_shards test: multi_transaction_recovery test: multi_transaction_recovery_multiple_databases +test: multi_maintenance_multiple_databases +#test: maintenance_connection_timeout test: local_dist_join_modifications test: local_table_join diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 35671ad26..df7f54c46 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -464,7 +464,7 @@ push(@pgOptions, "wal_retrieve_retry_interval=250"); push(@pgOptions, "max_logical_replication_workers=50"); push(@pgOptions, "max_wal_senders=50"); -push(@pgOptions, "max_worker_processes=50"); +push(@pgOptions, "max_worker_processes=150"); if ($majorversion >= "14") { # disable compute_query_id so that we don't get Query Identifiers diff --git a/src/test/regress/sql/citus_local_tables.sql b/src/test/regress/sql/citus_local_tables.sql index e33fbc6cc..f61c97f9d 100644 --- a/src/test/regress/sql/citus_local_tables.sql +++ b/src/test/regress/sql/citus_local_tables.sql @@ -581,7 +581,12 @@ alter table parent_1 drop constraint fkey_test_drop; select count(*) from pg_constraint where conname = 'fkey_test_drop'; -- verify we still preserve the child-parent hierarchy after all conversions -- check the shard partition -select inhrelid::regclass from pg_inherits where (select inhparent::regclass::text) ~ '^parent_1_\d{7}$' order by 1; +SELECT count(*) = 1 AS child_parent_hierarchy_test +FROM pg_inherits +WHERE (SELECT inhparent::regclass::text) ~ '^parent_1_\d{7}$' + -- order of the child shard id is not guaranteed, but should be either 1904004 or 04 + AND (inhrelid::regclass::text) IN ('parent_1_child_1_1904004', 'parent_1_child_1_1904006') +ORDER BY 1; -- check the shell partition select inhrelid::regclass from pg_inherits where inhparent='parent_1'::regclass order by 1; diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index c8218c950..80e3f9ca8 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -77,6 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; COMMIT; +-- Maintenance connections are not cached SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions(); diff --git a/src/test/regress/sql/global_cancel.sql b/src/test/regress/sql/global_cancel.sql index 12330baf2..2288cd8b5 100644 --- a/src/test/regress/sql/global_cancel.sql +++ b/src/test/regress/sql/global_cancel.sql @@ -39,11 +39,12 @@ FROM pg_stat_activity psa JOIN get_all_active_transactions() gaat ON psa.pid = g WHERE application_name = 'Citus Maintenance Daemon' \gset SET client_min_messages TO ERROR; -CREATE USER global_cancel_user; -SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user'); +CREATE USER global_cancel_user NOSUPERUSER ; +SELECT 1 FROM run_command_on_workers('CREATE USER global_cancel_user NOSUPERUSER ;'); RESET client_min_messages; \c - global_cancel_user - :master_port +SELECT current_user; SELECT pg_typeof(:maintenance_daemon_gpid); diff --git a/src/test/regress/sql/multi_maintenance_multiple_databases.sql b/src/test/regress/sql/multi_maintenance_multiple_databases.sql new file mode 100644 index 000000000..7b27c755d --- /dev/null +++ b/src/test/regress/sql/multi_maintenance_multiple_databases.sql @@ -0,0 +1,475 @@ +-- This test verfies a behavioir of maintenance daemon in multi-database environment +-- It checks that distributed deadlock detection and 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota. +-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there. + +SELECT $definition$ + ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1'; + SELECT pg_reload_conf(); + $definition$ AS turn_off_maintenance +\gset + +SELECT $deinition$ +ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +SELECT pg_reload_conf(); +$deinition$ AS turn_on_maintenance +\gset + +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + db_create_statement text; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('CREATE DATABASE %I', db_name) + INTO db_create_statement; + + PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port), + db_create_statement); + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'CREATE EXTENSION citus;'); + IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport)) + FROM pg_dist_node + WHERE groupid != 0 AND isactive AND noderole = 'primary'; + END IF; + END LOOP; + END; + $do$; + $definition$ AS create_databases +\gset + +-- Code reiles heavily on dblink for cross-db and cross-node queries +CREATE EXTENSION IF NOT EXISTS dblink; + +-- Disable maintenance operations to prepare the environment +:turn_off_maintenance + +\c - - - :worker_1_port + +:turn_off_maintenance + +\c - - - :worker_2_port + +:turn_off_maintenance + +-- Create databases + +\c - - - :worker_1_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +\c - - - :worker_2_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +\c - - - :master_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +-- Generate distributed transactions + +\c - - - :master_port + +DO +$do$ + DECLARE + index int; + db_name text; + transaction_to_abort_name text; + transaction_to_commit_name text; + transaction_to_be_forgotten text; + coordinator_port int; + BEGIN + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('citus_0_1234_3_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_abort_name; + + SELECT format('citus_0_1234_4_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_commit_name; + + SELECT format('citus_0_should_be_forgotten_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_be_forgotten; + + SELECT setting::int + FROM pg_settings + WHERE name = 'port' + INTO coordinator_port; + + -- Prepare transactions on workers + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport), + format($worker_cmd$ + BEGIN; + CREATE TABLE should_abort + (value int); + PREPARE TRANSACTION '%s'; + + BEGIN; + CREATE TABLE should_commit + (value int); + PREPARE TRANSACTION '%s'; + $worker_cmd$, transaction_to_abort_name, transaction_to_commit_name)) + FROM pg_dist_node + WHERE groupid != 0 + AND isactive + AND noderole = 'primary'; + + -- Fill the pg_dist_transaction + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port), + format($coordinator_cmd$ + INSERT INTO pg_dist_transaction + SELECT groupid, '%s' FROM pg_dist_node + UNION ALL + SELECT groupid, '%s' FROM pg_dist_node; + $coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten)); + END LOOP; + END; +$do$; + +-- Verify state before enabling maintenance +\c - - - :master_port + +SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT groupid, gid + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_1_port + +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_2_port + +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +-- Turn on the maintenance + +\c - - - :master_port + +:turn_on_maintenance + +\c - - - :worker_1_port + +:turn_on_maintenance + +\c - - - :worker_2_port + +:turn_on_maintenance + +\c - - - :master_port + +-- Verify maintenance result + +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + +DO +$$ + DECLARE + pg_dist_transaction_after_recovery_coordinator_test boolean; + cached_connections_after_recovery_coordinator_test boolean; + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + SELECT count(*) = 0 + FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT groupid, gid + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) + WHERE datname LIKE 'db%' + INTO pg_dist_transaction_after_recovery_coordinator_test; + + SELECT count(*) = 0 AS cached_connections_after_recovery_coordinator_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval + INTO cached_connections_after_recovery_coordinator_test; + + IF (pg_dist_transaction_after_recovery_coordinator_test + AND cached_connections_after_recovery_coordinator_test) THEN + EXIT; + END IF; + + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; + +\c - - - :worker_1_port + +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +DO +$$ + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_1_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval) THEN + EXIT; + END IF; + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; + +\c - - - :worker_2_port + +SELECT count(*) = 0 AS too_many_clients_test +FROM regexp_split_to_table(pg_read_file('../log/postmaster.log'), E'\n') AS t(log_line) +WHERE log_line LIKE '%sorry, too many clients already%'; + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +DO +$$ + BEGIN + FOR i IN 0 .. 300 + LOOP + IF i = 300 THEN RAISE 'Waited too long'; END IF; + IF (SELECT count(*) = 0 AS cached_connections_after_recovery_worker_2_test + FROM pg_stat_activity + WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval) THEN + EXIT; + END IF; + PERFORM pg_sleep_for('1 SECOND'::interval); + END LOOP; + END +$$; + +-- Cleanup + +\c - - - :master_port + +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'DROP EXTENSION citus;'); + END LOOP; + END; + $do$; + + -- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink + DROP DATABASE db1 WITH (FORCE); + DROP DATABASE db2 WITH (FORCE); + DROP DATABASE db3 WITH (FORCE); + DROP DATABASE db4 WITH (FORCE); + DROP DATABASE db5 WITH (FORCE); + DROP DATABASE db6 WITH (FORCE); + DROP DATABASE db7 WITH (FORCE); + DROP DATABASE db8 WITH (FORCE); + DROP DATABASE db9 WITH (FORCE); + DROP DATABASE db10 WITH (FORCE); + DROP DATABASE db11 WITH (FORCE); + DROP DATABASE db12 WITH (FORCE); + DROP DATABASE db13 WITH (FORCE); + DROP DATABASE db14 WITH (FORCE); + DROP DATABASE db15 WITH (FORCE); + DROP DATABASE db16 WITH (FORCE); + DROP DATABASE db17 WITH (FORCE); + DROP DATABASE db18 WITH (FORCE); + DROP DATABASE db19 WITH (FORCE); + DROP DATABASE db20 WITH (FORCE); + DROP DATABASE db21 WITH (FORCE); + DROP DATABASE db22 WITH (FORCE); + DROP DATABASE db23 WITH (FORCE); + DROP DATABASE db24 WITH (FORCE); + DROP DATABASE db25 WITH (FORCE); + DROP DATABASE db26 WITH (FORCE); + DROP DATABASE db27 WITH (FORCE); + DROP DATABASE db28 WITH (FORCE); + DROP DATABASE db29 WITH (FORCE); + DROP DATABASE db30 WITH (FORCE); + DROP DATABASE db31 WITH (FORCE); + DROP DATABASE db32 WITH (FORCE); + DROP DATABASE db33 WITH (FORCE); + DROP DATABASE db34 WITH (FORCE); + DROP DATABASE db35 WITH (FORCE); + DROP DATABASE db36 WITH (FORCE); + DROP DATABASE db37 WITH (FORCE); + DROP DATABASE db38 WITH (FORCE); + DROP DATABASE db39 WITH (FORCE); + DROP DATABASE db40 WITH (FORCE); + DROP DATABASE db41 WITH (FORCE); + DROP DATABASE db42 WITH (FORCE); + DROP DATABASE db43 WITH (FORCE); + DROP DATABASE db44 WITH (FORCE); + DROP DATABASE db45 WITH (FORCE); + DROP DATABASE db46 WITH (FORCE); + DROP DATABASE db47 WITH (FORCE); + DROP DATABASE db48 WITH (FORCE); + DROP DATABASE db49 WITH (FORCE); + DROP DATABASE db50 WITH (FORCE); + DROP DATABASE db51 WITH (FORCE); + DROP DATABASE db52 WITH (FORCE); + DROP DATABASE db53 WITH (FORCE); + DROP DATABASE db54 WITH (FORCE); + DROP DATABASE db55 WITH (FORCE); + DROP DATABASE db56 WITH (FORCE); + DROP DATABASE db57 WITH (FORCE); + DROP DATABASE db58 WITH (FORCE); + DROP DATABASE db59 WITH (FORCE); + DROP DATABASE db60 WITH (FORCE); + DROP DATABASE db61 WITH (FORCE); + DROP DATABASE db62 WITH (FORCE); + DROP DATABASE db63 WITH (FORCE); + DROP DATABASE db64 WITH (FORCE); + DROP DATABASE db65 WITH (FORCE); + DROP DATABASE db66 WITH (FORCE); + DROP DATABASE db67 WITH (FORCE); + DROP DATABASE db68 WITH (FORCE); + DROP DATABASE db69 WITH (FORCE); + DROP DATABASE db70 WITH (FORCE); + DROP DATABASE db71 WITH (FORCE); + DROP DATABASE db72 WITH (FORCE); + DROP DATABASE db73 WITH (FORCE); + DROP DATABASE db74 WITH (FORCE); + DROP DATABASE db75 WITH (FORCE); + DROP DATABASE db76 WITH (FORCE); + DROP DATABASE db77 WITH (FORCE); + DROP DATABASE db78 WITH (FORCE); + DROP DATABASE db79 WITH (FORCE); + DROP DATABASE db80 WITH (FORCE); + DROP DATABASE db81 WITH (FORCE); + DROP DATABASE db82 WITH (FORCE); + DROP DATABASE db83 WITH (FORCE); + DROP DATABASE db84 WITH (FORCE); + DROP DATABASE db85 WITH (FORCE); + DROP DATABASE db86 WITH (FORCE); + DROP DATABASE db87 WITH (FORCE); + DROP DATABASE db88 WITH (FORCE); + DROP DATABASE db89 WITH (FORCE); + DROP DATABASE db90 WITH (FORCE); + DROP DATABASE db91 WITH (FORCE); + DROP DATABASE db92 WITH (FORCE); + DROP DATABASE db93 WITH (FORCE); + DROP DATABASE db94 WITH (FORCE); + DROP DATABASE db95 WITH (FORCE); + DROP DATABASE db96 WITH (FORCE); + DROP DATABASE db97 WITH (FORCE); + DROP DATABASE db98 WITH (FORCE); + DROP DATABASE db99 WITH (FORCE); + DROP DATABASE db100 WITH (FORCE); + + SELECT count(*) = 0 as all_databases_dropped + FROM pg_database + WHERE datname LIKE 'db%'; + + ALTER SYSTEM RESET citus.recover_2pc_interval; + ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.max_maintenance_shared_pool_size; + SELECT pg_reload_conf(); + + $definition$ AS cleanup +\gset + +:cleanup + +\c - - - :worker_1_port + +:cleanup + +\c - - - :worker_2_port + +:cleanup + +\c - - - :master_port + +DROP EXTENSION IF EXISTS dblink;