diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 5996a128f..0ed71d198 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -360,6 +360,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { + if ((flags & REQUIRE_MAINTENANCE_CONNECTION) && + IsMaintenanceDaemon && + !IsMaintenanceManagementDatabase(MyDatabaseId)) + { + // Maintenance database may have changed, so cached connection should be closed + connection->forceCloseAtTransactionEnd = true; + } return connection; } } @@ -432,7 +439,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); - if (flags & REQUIRE_METADATA_CONNECTION) { connection->useForMetadataOperations = true; @@ -440,6 +446,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, else if (flags & REQUIRE_MAINTENANCE_CONNECTION) { connection->useForMaintenanceOperations = true; + if (IsMaintenanceDaemon && !IsMaintenanceManagementDatabase(MyDatabaseId)) + { + connection->forceCloseAtTransactionEnd = true; + } } /* fully initialized the connection, record it */ @@ -1510,9 +1520,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection * escalating the number of cached connections. We can recognize such backends * from their application name. */ - return ((IsCitusMaintenanceDaemonBackend() && !IsMaintenanceManagementDatabase(MyDatabaseId)) || - IsCitusInternalBackend() || - IsRebalancerInternalBackend()) || + return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) || connection->initializationState != POOL_STATE_INITIALIZED || cachedConnectionCount >= MaxCachedConnectionsPerWorker || connection->forceCloseAtTransactionEnd || diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 249dfbd4e..b8a84bd0d 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -151,7 +151,7 @@ static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey int port, Oid database); static void -DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port); +DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port, Oid database); PG_FUNCTION_INFO_V1(citus_remote_connection_stats); @@ -478,8 +478,24 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, workerNodeDatabaseEntry->count += 1; } + if (IsLoggableLevel(DEBUG4)) + { + ereport(DEBUG4, errmsg( + "Incrementing connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is %s", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database, + connectionSlotAvailable ? "available" : "not available" + )); + } + UnLockConnectionSharedMemory(); + return connectionSlotAvailable; } @@ -498,18 +514,21 @@ DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int LockConnectionSharedMemory(LW_EXCLUSIVE); - DecrementSharedConnectionCounterInternal(externalFlags, hostname, port); + DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId); UnLockConnectionSharedMemory(); WakeupWaiterBackendsForSharedConnection(); } static void -DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port) +DecrementSharedConnectionCounterInternal(uint32 externalFlags, + const char *hostname, + int port, + Oid database) { bool workerNodeEntryFound = false; SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); - SharedWorkerNodeConnStatsHashEntry *workerNodeEntry = + SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry = hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound); /* this worker node is removed or updated, no need to care */ @@ -521,18 +540,32 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostn } /* we should never go below 0 */ - Assert(workerNodeEntry->regularConnectionsCount > 0 || workerNodeEntry->maintenanceConnectionsCount > 0); + Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || + workerNodeConnectionEntry->maintenanceConnectionsCount > 0); /* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION))) { - workerNodeEntry->maintenanceConnectionsCount -= 1; + workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; } else { - workerNodeEntry->regularConnectionsCount -= 1; + workerNodeConnectionEntry->regularConnectionsCount -= 1; } + if (IsLoggableLevel(DEBUG4)) + { + ereport(DEBUG4, errmsg( + "Decrementing connection counter. " + "Current regular connections: %i, maintenance connections: %i. " + "Connection slot to %s:%i database %i is released", + workerNodeConnectionEntry->regularConnectionsCount, + workerNodeConnectionEntry->maintenanceConnectionsCount, + hostname, + port, + database + )); + } /* * We don't have to remove at this point as the node might be still active @@ -541,7 +574,8 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostn * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * we're unlikely to trigger this often. */ - if (workerNodeEntry->regularConnectionsCount == 0 && workerNodeEntry->maintenanceConnectionsCount == 0) + if (workerNodeConnectionEntry->regularConnectionsCount == 0 && + workerNodeConnectionEntry->maintenanceConnectionsCount == 0) { hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 0bd0c6592..186ac886d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1995,12 +1995,11 @@ RegisterCitusConfigVariables(void) DefineCustomRealVariable( "citus.shared_pool_size_maintenance_quota", - gettext_noop("Sets the maximum number of connections allowed per worker node " - "across all the backends from this node. Setting to -1 disables " - "connections throttling. Setting to 0 makes it auto-adjust, meaning " - "equal to max_connections on the coordinator."), - gettext_noop("As a rule of thumb, the value should be at most equal to the " - "max_connections on the remote nodes."), + gettext_noop("Sets the fraction of citus.max_shared_pool_size reserved " + "for maintenance operations only. " + "Setting it to 0 disables the quota. " + "This way the maintenance and regular connections will share the same pool"), + NULL, &SharedPoolSizeMaintenanceQuota, 0.1, 0, 1, PGC_SIGHUP, @@ -2030,7 +2029,7 @@ RegisterCitusConfigVariables(void) "citus.max_databases_per_worker_tracked", gettext_noop("Sets the amount of databases per worker tracked."), gettext_noop( - "This configuration value compliments the citus.max_worker_nodes_tracked." + "This configuration value complements the citus.max_worker_nodes_tracked." "It should be used when there are more then one database with Citus in cluster," "and, effectively, limits the size of the hash table with connections per worker + database." "Currently, it does not affect the connection management logic and serves only statistical purposes."), @@ -2709,7 +2708,8 @@ RegisterCitusConfigVariables(void) DefineCustomStringVariable( "citus.maintenance_management_database", gettext_noop("Database for cluster-wide maintenance operations across all databases"), - NULL, + gettext_noop("It should be enabled when there are more than " + "one database with Citus in a cluster."), &MaintenanceManagementDatabase, "", PGC_SIGHUP, diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 16d381acb..829f19850 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -86,7 +86,6 @@ typedef struct BackendManagementShmemData typedef enum CitusBackendType { CITUS_BACKEND_NOT_ASSIGNED, - CITUS_MAINTENANCE_DAEMON_BACKEND, CITUS_INTERNAL_BACKEND, CITUS_REBALANCER_BACKEND, CITUS_RUN_COMMAND_BACKEND, @@ -97,7 +96,6 @@ static const char *CitusBackendPrefixes[] = { CITUS_APPLICATION_NAME_PREFIX, CITUS_REBALANCER_APPLICATION_NAME_PREFIX, CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX, - CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX, }; static const CitusBackendType CitusBackendTypes[] = { @@ -1446,18 +1444,6 @@ IsCitusShardTransferBackend(void) prefixLength) == 0; } -bool -IsCitusMaintenanceDaemonBackend(void) -{ - if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED) - { - DetermineCitusBackendType(application_name); - } - - return CurrentBackendType == CITUS_MAINTENANCE_DAEMON_BACKEND; -} - - /* * DetermineCitusBackendType determines the type of backend based on the application_name. diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index b55a72843..a1c524ae1 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -153,7 +153,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) { const char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; - int connectionFlags = 0; + int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION; if (workerNode->groupId == localGroupId) { diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index a4656c0c8..0ab30e529 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -120,7 +120,7 @@ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; /* set to true when becoming a maintenance daemon */ -static bool IsMaintenanceDaemon = false; +bool IsMaintenanceDaemon = false; static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); @@ -508,7 +508,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid); /* make worker recognizable in pg_stat_activity */ - pgstat_report_appname(CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX); + pgstat_report_appname("Citus Maintenance Daemon"); /* * Terminate orphaned metadata sync daemons spawned from previously terminated @@ -1248,7 +1248,8 @@ char if (!maintenanceDatabaseOid) { ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Database %s doesn't exists, please check the citus.maintenance_management_database parameter.", + errmsg("Database \"%s\" doesn't exists, please check the citus.maintenance_management_database parameter. " + "Applying a default value instead.", MaintenanceManagementDatabase))); result = ""; } diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 716c4024b..8014fe5a6 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -78,7 +78,6 @@ extern bool IsRebalancerInternalBackend(void); extern bool IsCitusRunCommandBackend(void); extern bool IsExternalClientBackend(void); extern bool IsCitusShardTransferBackend(void); -extern bool IsCitusMaintenanceDaemonBackend(void); #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 089d477d5..91e1e9222 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -44,8 +44,6 @@ /* application name used for connections made by run_command_on_* */ #define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid=" -#define CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX "Citus Maintenance Daemon" - /* * application name prefix for move/split replication connections. * diff --git a/src/test/regress/expected/multi_maintenance_multiple_databases.out b/src/test/regress/expected/multi_maintenance_multiple_databases.out new file mode 100644 index 000000000..8b37d7e25 --- /dev/null +++ b/src/test/regress/expected/multi_maintenance_multiple_databases.out @@ -0,0 +1,502 @@ +-- This test verfies a behavioir of maintenance daemon in multi-database environment +-- It checks two things: +-- 1. Maintenance daemons should not cache connections, except the one for the citus.maintenance_management_database +-- 2. 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota +-- 2. Distributed deadlock detection should run only on citus.maintenance_management_database. +-- +-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there. +SELECT $definition$ + ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1'; + SELECT pg_reload_conf(); + $definition$ AS turn_off_maintenance +\gset +SELECT $deinition$ +ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM SET citus.maintenance_management_database = 'regression'; +SELECT pg_reload_conf(); +$deinition$ AS turn_on_maintenance +\gset +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + db_create_statement text; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('CREATE DATABASE %I', db_name) + INTO db_create_statement; + + PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port), + db_create_statement); + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'CREATE EXTENSION citus;'); + IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport)) + FROM pg_dist_node + WHERE groupid != 0 AND isactive AND noderole = 'primary'; + END IF; + END LOOP; + END; + $do$; + $definition$ AS create_databases +\gset +-- Code reiles heavily on dblink for cross-db and cross-node queries +CREATE EXTENSION IF NOT EXISTS dblink; +-- Disable maintenance operations to prepare the environment +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +:turn_off_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Create databases +\c - - - :worker_1_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +\c - - - :worker_2_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +\c - - - :master_port +:create_databases +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + count +--------------------------------------------------------------------- + 100 +(1 row) + +-- Generate distributed transactions +\c - - - :master_port +DO +$do$ + DECLARE + index int; + db_name text; + transaction_to_abort_name text; + transaction_to_commit_name text; + transaction_to_be_forgotten text; + coordinator_port int; + BEGIN + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('citus_0_1234_3_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_abort_name; + + SELECT format('citus_0_1234_4_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_commit_name; + + SELECT format('citus_0_should_be_forgotten_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_be_forgotten; + + SELECT setting::int + FROM pg_settings + WHERE name = 'port' + INTO coordinator_port; + + -- Prepare transactions on workers + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport), + format($worker_cmd$ + BEGIN; + CREATE TABLE should_abort + (value int); + PREPARE TRANSACTION '%s'; + + BEGIN; + CREATE TABLE should_commit + (value int); + PREPARE TRANSACTION '%s'; + $worker_cmd$, transaction_to_abort_name, transaction_to_commit_name)) + FROM pg_dist_node + WHERE groupid != 0 + AND isactive + AND noderole = 'primary'; + + -- Fill the pg_dist_transaction + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port), + format($coordinator_cmd$ + INSERT INTO pg_dist_transaction + SELECT groupid, '%s' FROM pg_dist_node + UNION ALL + SELECT groupid, '%s' FROM pg_dist_node; + $coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten)); + END LOOP; + END; +$do$; +-- Verify state before enabling maintenance +\c - - - :master_port +SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT * + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + pg_dist_transaction_before_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_before_recover_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_before_recover_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_before_recovery_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +-- Turn on the maintenance +\c - - - :master_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +:turn_on_maintenance + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :master_port +-- Let maintenance do it's work... +SELECT pg_sleep_for('10 seconds'::interval); + pg_sleep_for +--------------------------------------------------------------------- + +(1 row) + +-- Verify maintenance result +SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT * + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + pg_dist_transaction_after_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_after_recovery_coordinator_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_after_recover_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_after_recovery_worker_1_test +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + pg_prepared_xacts_after_recover_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + cached_connections_after_recovery_worker_2_test +--------------------------------------------------------------------- + t +(1 row) + +-- Cleanup +\c - - - :master_port +SELECT $definition$ + ALTER SYSTEM RESET citus.recover_2pc_interval; + ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.maintenance_management_database; + SELECT pg_reload_conf(); + + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'DROP EXTENSION citus;'); + END LOOP; + END; + $do$; + + -- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink + DROP DATABASE db1 WITH (FORCE); + DROP DATABASE db2 WITH (FORCE); + DROP DATABASE db3 WITH (FORCE); + DROP DATABASE db4 WITH (FORCE); + DROP DATABASE db5 WITH (FORCE); + DROP DATABASE db6 WITH (FORCE); + DROP DATABASE db7 WITH (FORCE); + DROP DATABASE db8 WITH (FORCE); + DROP DATABASE db9 WITH (FORCE); + DROP DATABASE db10 WITH (FORCE); + DROP DATABASE db11 WITH (FORCE); + DROP DATABASE db12 WITH (FORCE); + DROP DATABASE db13 WITH (FORCE); + DROP DATABASE db14 WITH (FORCE); + DROP DATABASE db15 WITH (FORCE); + DROP DATABASE db16 WITH (FORCE); + DROP DATABASE db17 WITH (FORCE); + DROP DATABASE db18 WITH (FORCE); + DROP DATABASE db19 WITH (FORCE); + DROP DATABASE db20 WITH (FORCE); + DROP DATABASE db21 WITH (FORCE); + DROP DATABASE db22 WITH (FORCE); + DROP DATABASE db23 WITH (FORCE); + DROP DATABASE db24 WITH (FORCE); + DROP DATABASE db25 WITH (FORCE); + DROP DATABASE db26 WITH (FORCE); + DROP DATABASE db27 WITH (FORCE); + DROP DATABASE db28 WITH (FORCE); + DROP DATABASE db29 WITH (FORCE); + DROP DATABASE db30 WITH (FORCE); + DROP DATABASE db31 WITH (FORCE); + DROP DATABASE db32 WITH (FORCE); + DROP DATABASE db33 WITH (FORCE); + DROP DATABASE db34 WITH (FORCE); + DROP DATABASE db35 WITH (FORCE); + DROP DATABASE db36 WITH (FORCE); + DROP DATABASE db37 WITH (FORCE); + DROP DATABASE db38 WITH (FORCE); + DROP DATABASE db39 WITH (FORCE); + DROP DATABASE db40 WITH (FORCE); + DROP DATABASE db41 WITH (FORCE); + DROP DATABASE db42 WITH (FORCE); + DROP DATABASE db43 WITH (FORCE); + DROP DATABASE db44 WITH (FORCE); + DROP DATABASE db45 WITH (FORCE); + DROP DATABASE db46 WITH (FORCE); + DROP DATABASE db47 WITH (FORCE); + DROP DATABASE db48 WITH (FORCE); + DROP DATABASE db49 WITH (FORCE); + DROP DATABASE db50 WITH (FORCE); + DROP DATABASE db51 WITH (FORCE); + DROP DATABASE db52 WITH (FORCE); + DROP DATABASE db53 WITH (FORCE); + DROP DATABASE db54 WITH (FORCE); + DROP DATABASE db55 WITH (FORCE); + DROP DATABASE db56 WITH (FORCE); + DROP DATABASE db57 WITH (FORCE); + DROP DATABASE db58 WITH (FORCE); + DROP DATABASE db59 WITH (FORCE); + DROP DATABASE db60 WITH (FORCE); + DROP DATABASE db61 WITH (FORCE); + DROP DATABASE db62 WITH (FORCE); + DROP DATABASE db63 WITH (FORCE); + DROP DATABASE db64 WITH (FORCE); + DROP DATABASE db65 WITH (FORCE); + DROP DATABASE db66 WITH (FORCE); + DROP DATABASE db67 WITH (FORCE); + DROP DATABASE db68 WITH (FORCE); + DROP DATABASE db69 WITH (FORCE); + DROP DATABASE db70 WITH (FORCE); + DROP DATABASE db71 WITH (FORCE); + DROP DATABASE db72 WITH (FORCE); + DROP DATABASE db73 WITH (FORCE); + DROP DATABASE db74 WITH (FORCE); + DROP DATABASE db75 WITH (FORCE); + DROP DATABASE db76 WITH (FORCE); + DROP DATABASE db77 WITH (FORCE); + DROP DATABASE db78 WITH (FORCE); + DROP DATABASE db79 WITH (FORCE); + DROP DATABASE db80 WITH (FORCE); + DROP DATABASE db81 WITH (FORCE); + DROP DATABASE db82 WITH (FORCE); + DROP DATABASE db83 WITH (FORCE); + DROP DATABASE db84 WITH (FORCE); + DROP DATABASE db85 WITH (FORCE); + DROP DATABASE db86 WITH (FORCE); + DROP DATABASE db87 WITH (FORCE); + DROP DATABASE db88 WITH (FORCE); + DROP DATABASE db89 WITH (FORCE); + DROP DATABASE db90 WITH (FORCE); + DROP DATABASE db91 WITH (FORCE); + DROP DATABASE db92 WITH (FORCE); + DROP DATABASE db93 WITH (FORCE); + DROP DATABASE db94 WITH (FORCE); + DROP DATABASE db95 WITH (FORCE); + DROP DATABASE db96 WITH (FORCE); + DROP DATABASE db97 WITH (FORCE); + DROP DATABASE db98 WITH (FORCE); + DROP DATABASE db99 WITH (FORCE); + DROP DATABASE db100 WITH (FORCE); + SELECT count(*) + FROM pg_database + WHERE datname LIKE 'db%'; + $definition$ AS cleanup +\gset +:cleanup + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +:cleanup + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_2_port +:cleanup + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + + count +--------------------------------------------------------------------- + 0 +(1 row) + diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 015f74973..cbd8ecf2f 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -220,6 +220,7 @@ test: multi_generate_ddl_commands test: multi_create_shards test: multi_transaction_recovery test: multi_transaction_recovery_multiple_databases +test: multi_maintenance_multiple_databases test: local_dist_join_modifications test: local_table_join diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index c9a85d523..4d01bc698 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -462,7 +462,7 @@ push(@pgOptions, "wal_retrieve_retry_interval=250"); push(@pgOptions, "max_logical_replication_workers=50"); push(@pgOptions, "max_wal_senders=50"); -push(@pgOptions, "max_worker_processes=50"); +push(@pgOptions, "max_worker_processes=150"); if ($majorversion >= "14") { # disable compute_query_id so that we don't get Query Identifiers diff --git a/src/test/regress/sql/multi_maintenance_multiple_databases.sql b/src/test/regress/sql/multi_maintenance_multiple_databases.sql new file mode 100644 index 000000000..fce49acdc --- /dev/null +++ b/src/test/regress/sql/multi_maintenance_multiple_databases.sql @@ -0,0 +1,423 @@ +-- This test verfies a behavioir of maintenance daemon in multi-database environment +-- It checks two things: +-- 1. Maintenance daemons should not cache connections, except the one for the citus.maintenance_management_database +-- 2. 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota +-- 2. Distributed deadlock detection should run only on citus.maintenance_management_database. +-- +-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there. + +SELECT $definition$ + ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1'; + SELECT pg_reload_conf(); + $definition$ AS turn_off_maintenance +\gset + +SELECT $deinition$ +ALTER SYSTEM SET citus.recover_2pc_interval TO '5s'; +ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM SET citus.maintenance_management_database = 'regression'; +SELECT pg_reload_conf(); +$deinition$ AS turn_on_maintenance +\gset + +SELECT $definition$ + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + db_create_statement text; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('CREATE DATABASE %I', db_name) + INTO db_create_statement; + + PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port), + db_create_statement); + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'CREATE EXTENSION citus;'); + IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport)) + FROM pg_dist_node + WHERE groupid != 0 AND isactive AND noderole = 'primary'; + END IF; + END LOOP; + END; + $do$; + $definition$ AS create_databases +\gset + +-- Code reiles heavily on dblink for cross-db and cross-node queries +CREATE EXTENSION IF NOT EXISTS dblink; + +-- Disable maintenance operations to prepare the environment +:turn_off_maintenance + +\c - - - :worker_1_port + +:turn_off_maintenance + +\c - - - :worker_2_port + +:turn_off_maintenance + +-- Create databases + +\c - - - :worker_1_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +\c - - - :worker_2_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +\c - - - :master_port + +:create_databases + +SELECT count(*) +FROM pg_database +WHERE datname LIKE 'db%'; + +-- Generate distributed transactions + +\c - - - :master_port + +DO +$do$ + DECLARE + index int; + db_name text; + transaction_to_abort_name text; + transaction_to_commit_name text; + transaction_to_be_forgotten text; + coordinator_port int; + BEGIN + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + SELECT format('citus_0_1234_3_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_abort_name; + + SELECT format('citus_0_1234_4_0_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_commit_name; + + SELECT format('citus_0_should_be_forgotten_%s', oid) + FROM pg_database + WHERE datname = db_name + INTO transaction_to_be_forgotten; + + SELECT setting::int + FROM pg_settings + WHERE name = 'port' + INTO coordinator_port; + + -- Prepare transactions on workers + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport), + format($worker_cmd$ + BEGIN; + CREATE TABLE should_abort + (value int); + PREPARE TRANSACTION '%s'; + + BEGIN; + CREATE TABLE should_commit + (value int); + PREPARE TRANSACTION '%s'; + $worker_cmd$, transaction_to_abort_name, transaction_to_commit_name)) + FROM pg_dist_node + WHERE groupid != 0 + AND isactive + AND noderole = 'primary'; + + -- Fill the pg_dist_transaction + PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port), + format($coordinator_cmd$ + INSERT INTO pg_dist_transaction + SELECT groupid, '%s' FROM pg_dist_node + UNION ALL + SELECT groupid, '%s' FROM pg_dist_node; + $coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten)); + END LOOP; + END; +$do$; + +-- Verify state before enabling maintenance +\c - - - :master_port + +SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT * + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_1_port + +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_2_port + +SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +-- Turn on the maintenance + +\c - - - :master_port + +:turn_on_maintenance + +\c - - - :worker_1_port + +:turn_on_maintenance + +\c - - - :worker_2_port + +:turn_on_maintenance + + + +\c - - - :master_port + +-- Let maintenance do it's work... + +SELECT pg_sleep_for('10 seconds'::interval); + +-- Verify maintenance result + +SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test +FROM pg_database, + dblink(format('dbname=%s host=localhost port=%s user=postgres', datname, + (SELECT setting::int FROM pg_settings WHERE name = 'port')), + $statement$ + SELECT * + FROM pg_dist_transaction + WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%' + $statement$) AS t(groupid integer, gid text) +WHERE datname LIKE 'db%'; + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_coordinator_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_1_port + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_1_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + +\c - - - :worker_2_port + +SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test +FROM pg_prepared_xacts +WHERE gid LIKE 'citus_0_1234_4_0_%' + OR gid LIKE 'citus_0_should_be_forgotten_%'; + +SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_2_test +FROM pg_stat_activity +WHERE state = 'idle' + AND now() - backend_start > '5 seconds'::interval; + + +-- Cleanup + +\c - - - :master_port + +SELECT $definition$ + ALTER SYSTEM RESET citus.recover_2pc_interval; + ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; + ALTER SYSTEM RESET citus.maintenance_management_database; + SELECT pg_reload_conf(); + + DO + $do$ + DECLARE + index int; + db_name text; + current_port int; + BEGIN + SELECT setting::int FROM pg_settings WHERE name = 'port' + INTO current_port; + FOR index IN 1..100 + LOOP + SELECT format('db%s', index) + INTO db_name; + + PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port), + 'DROP EXTENSION citus;'); + END LOOP; + END; + $do$; + + -- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink + DROP DATABASE db1 WITH (FORCE); + DROP DATABASE db2 WITH (FORCE); + DROP DATABASE db3 WITH (FORCE); + DROP DATABASE db4 WITH (FORCE); + DROP DATABASE db5 WITH (FORCE); + DROP DATABASE db6 WITH (FORCE); + DROP DATABASE db7 WITH (FORCE); + DROP DATABASE db8 WITH (FORCE); + DROP DATABASE db9 WITH (FORCE); + DROP DATABASE db10 WITH (FORCE); + DROP DATABASE db11 WITH (FORCE); + DROP DATABASE db12 WITH (FORCE); + DROP DATABASE db13 WITH (FORCE); + DROP DATABASE db14 WITH (FORCE); + DROP DATABASE db15 WITH (FORCE); + DROP DATABASE db16 WITH (FORCE); + DROP DATABASE db17 WITH (FORCE); + DROP DATABASE db18 WITH (FORCE); + DROP DATABASE db19 WITH (FORCE); + DROP DATABASE db20 WITH (FORCE); + DROP DATABASE db21 WITH (FORCE); + DROP DATABASE db22 WITH (FORCE); + DROP DATABASE db23 WITH (FORCE); + DROP DATABASE db24 WITH (FORCE); + DROP DATABASE db25 WITH (FORCE); + DROP DATABASE db26 WITH (FORCE); + DROP DATABASE db27 WITH (FORCE); + DROP DATABASE db28 WITH (FORCE); + DROP DATABASE db29 WITH (FORCE); + DROP DATABASE db30 WITH (FORCE); + DROP DATABASE db31 WITH (FORCE); + DROP DATABASE db32 WITH (FORCE); + DROP DATABASE db33 WITH (FORCE); + DROP DATABASE db34 WITH (FORCE); + DROP DATABASE db35 WITH (FORCE); + DROP DATABASE db36 WITH (FORCE); + DROP DATABASE db37 WITH (FORCE); + DROP DATABASE db38 WITH (FORCE); + DROP DATABASE db39 WITH (FORCE); + DROP DATABASE db40 WITH (FORCE); + DROP DATABASE db41 WITH (FORCE); + DROP DATABASE db42 WITH (FORCE); + DROP DATABASE db43 WITH (FORCE); + DROP DATABASE db44 WITH (FORCE); + DROP DATABASE db45 WITH (FORCE); + DROP DATABASE db46 WITH (FORCE); + DROP DATABASE db47 WITH (FORCE); + DROP DATABASE db48 WITH (FORCE); + DROP DATABASE db49 WITH (FORCE); + DROP DATABASE db50 WITH (FORCE); + DROP DATABASE db51 WITH (FORCE); + DROP DATABASE db52 WITH (FORCE); + DROP DATABASE db53 WITH (FORCE); + DROP DATABASE db54 WITH (FORCE); + DROP DATABASE db55 WITH (FORCE); + DROP DATABASE db56 WITH (FORCE); + DROP DATABASE db57 WITH (FORCE); + DROP DATABASE db58 WITH (FORCE); + DROP DATABASE db59 WITH (FORCE); + DROP DATABASE db60 WITH (FORCE); + DROP DATABASE db61 WITH (FORCE); + DROP DATABASE db62 WITH (FORCE); + DROP DATABASE db63 WITH (FORCE); + DROP DATABASE db64 WITH (FORCE); + DROP DATABASE db65 WITH (FORCE); + DROP DATABASE db66 WITH (FORCE); + DROP DATABASE db67 WITH (FORCE); + DROP DATABASE db68 WITH (FORCE); + DROP DATABASE db69 WITH (FORCE); + DROP DATABASE db70 WITH (FORCE); + DROP DATABASE db71 WITH (FORCE); + DROP DATABASE db72 WITH (FORCE); + DROP DATABASE db73 WITH (FORCE); + DROP DATABASE db74 WITH (FORCE); + DROP DATABASE db75 WITH (FORCE); + DROP DATABASE db76 WITH (FORCE); + DROP DATABASE db77 WITH (FORCE); + DROP DATABASE db78 WITH (FORCE); + DROP DATABASE db79 WITH (FORCE); + DROP DATABASE db80 WITH (FORCE); + DROP DATABASE db81 WITH (FORCE); + DROP DATABASE db82 WITH (FORCE); + DROP DATABASE db83 WITH (FORCE); + DROP DATABASE db84 WITH (FORCE); + DROP DATABASE db85 WITH (FORCE); + DROP DATABASE db86 WITH (FORCE); + DROP DATABASE db87 WITH (FORCE); + DROP DATABASE db88 WITH (FORCE); + DROP DATABASE db89 WITH (FORCE); + DROP DATABASE db90 WITH (FORCE); + DROP DATABASE db91 WITH (FORCE); + DROP DATABASE db92 WITH (FORCE); + DROP DATABASE db93 WITH (FORCE); + DROP DATABASE db94 WITH (FORCE); + DROP DATABASE db95 WITH (FORCE); + DROP DATABASE db96 WITH (FORCE); + DROP DATABASE db97 WITH (FORCE); + DROP DATABASE db98 WITH (FORCE); + DROP DATABASE db99 WITH (FORCE); + DROP DATABASE db100 WITH (FORCE); + SELECT count(*) + FROM pg_database + WHERE datname LIKE 'db%'; + $definition$ AS cleanup +\gset + +:cleanup + +\c - - - :worker_1_port + +:cleanup + +\c - - - :worker_2_port + +:cleanup