- Changed the way to disable connection caching for maintenance daemons
- Implemented a test for basic use cases
- Brushed up before MR
pull/7286/head
ivyazmitinov 2023-10-27 21:59:09 +07:00
parent c8ec1b603a
commit 481aa99205
12 changed files with 994 additions and 42 deletions

View File

@ -360,6 +360,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
if ((flags & REQUIRE_MAINTENANCE_CONNECTION) &&
IsMaintenanceDaemon &&
!IsMaintenanceManagementDatabase(MyDatabaseId))
{
// Maintenance database may have changed, so cached connection should be closed
connection->forceCloseAtTransactionEnd = true;
}
return connection; return connection;
} }
} }
@ -432,7 +439,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if (flags & REQUIRE_METADATA_CONNECTION) if (flags & REQUIRE_METADATA_CONNECTION)
{ {
connection->useForMetadataOperations = true; connection->useForMetadataOperations = true;
@ -440,6 +446,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
else if (flags & REQUIRE_MAINTENANCE_CONNECTION) else if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{ {
connection->useForMaintenanceOperations = true; connection->useForMaintenanceOperations = true;
if (IsMaintenanceDaemon && !IsMaintenanceManagementDatabase(MyDatabaseId))
{
connection->forceCloseAtTransactionEnd = true;
}
} }
/* fully initialized the connection, record it */ /* fully initialized the connection, record it */
@ -1510,9 +1520,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
* escalating the number of cached connections. We can recognize such backends * escalating the number of cached connections. We can recognize such backends
* from their application name. * from their application name.
*/ */
return ((IsCitusMaintenanceDaemonBackend() && !IsMaintenanceManagementDatabase(MyDatabaseId)) || return (IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
IsCitusInternalBackend() ||
IsRebalancerInternalBackend()) ||
connection->initializationState != POOL_STATE_INITIALIZED || connection->initializationState != POOL_STATE_INITIALIZED ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker || cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd || connection->forceCloseAtTransactionEnd ||

View File

@ -151,7 +151,7 @@ static SharedWorkerNodeDatabaseConnStatsHashKey PrepareWorkerNodeDatabaseHashKey
int port, int port,
Oid database); Oid database);
static void static void
DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port); DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port, Oid database);
PG_FUNCTION_INFO_V1(citus_remote_connection_stats); PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
@ -478,8 +478,24 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags,
workerNodeDatabaseEntry->count += 1; workerNodeDatabaseEntry->count += 1;
} }
if (IsLoggableLevel(DEBUG4))
{
ereport(DEBUG4, errmsg(
"Incrementing connection counter. "
"Current regular connections: %i, maintenance connections: %i. "
"Connection slot to %s:%i database %i is %s",
workerNodeConnectionEntry->regularConnectionsCount,
workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname,
port,
database,
connectionSlotAvailable ? "available" : "not available"
));
}
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
return connectionSlotAvailable; return connectionSlotAvailable;
} }
@ -498,18 +514,21 @@ DecrementSharedConnectionCounter(uint32 externalFlags, const char *hostname, int
LockConnectionSharedMemory(LW_EXCLUSIVE); LockConnectionSharedMemory(LW_EXCLUSIVE);
DecrementSharedConnectionCounterInternal(externalFlags, hostname, port); DecrementSharedConnectionCounterInternal(externalFlags, hostname, port, MyDatabaseId);
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection(); WakeupWaiterBackendsForSharedConnection();
} }
static void static void
DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostname, int port) DecrementSharedConnectionCounterInternal(uint32 externalFlags,
const char *hostname,
int port,
Oid database)
{ {
bool workerNodeEntryFound = false; bool workerNodeEntryFound = false;
SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port); SharedWorkerNodeConnStatsHashKey workerNodeKey = PrepareWorkerNodeHashKey(hostname, port);
SharedWorkerNodeConnStatsHashEntry *workerNodeEntry = SharedWorkerNodeConnStatsHashEntry *workerNodeConnectionEntry =
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound); hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_FIND, &workerNodeEntryFound);
/* this worker node is removed or updated, no need to care */ /* this worker node is removed or updated, no need to care */
@ -521,18 +540,32 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostn
} }
/* we should never go below 0 */ /* we should never go below 0 */
Assert(workerNodeEntry->regularConnectionsCount > 0 || workerNodeEntry->maintenanceConnectionsCount > 0); Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 ||
workerNodeConnectionEntry->maintenanceConnectionsCount > 0);
/* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */ /* When GetSharedPoolSizeMaintenanceQuota() == 0, treat maintenance connections as regular */
if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION))) if ((GetSharedPoolSizeMaintenanceQuota() > 0 && (externalFlags & MAINTENANCE_CONNECTION)))
{ {
workerNodeEntry->maintenanceConnectionsCount -= 1; workerNodeConnectionEntry->maintenanceConnectionsCount -= 1;
} }
else else
{ {
workerNodeEntry->regularConnectionsCount -= 1; workerNodeConnectionEntry->regularConnectionsCount -= 1;
} }
if (IsLoggableLevel(DEBUG4))
{
ereport(DEBUG4, errmsg(
"Decrementing connection counter. "
"Current regular connections: %i, maintenance connections: %i. "
"Connection slot to %s:%i database %i is released",
workerNodeConnectionEntry->regularConnectionsCount,
workerNodeConnectionEntry->maintenanceConnectionsCount,
hostname,
port,
database
));
}
/* /*
* We don't have to remove at this point as the node might be still active * We don't have to remove at this point as the node might be still active
@ -541,7 +574,8 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, const char *hostn
* not busy, and given the default value of MaxCachedConnectionsPerWorker = 1, * not busy, and given the default value of MaxCachedConnectionsPerWorker = 1,
* we're unlikely to trigger this often. * we're unlikely to trigger this often.
*/ */
if (workerNodeEntry->regularConnectionsCount == 0 && workerNodeEntry->maintenanceConnectionsCount == 0) if (workerNodeConnectionEntry->regularConnectionsCount == 0 &&
workerNodeConnectionEntry->maintenanceConnectionsCount == 0)
{ {
hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL); hash_search(SharedWorkerNodeConnStatsHash, &workerNodeKey, HASH_REMOVE, NULL);
} }

View File

@ -1995,12 +1995,11 @@ RegisterCitusConfigVariables(void)
DefineCustomRealVariable( DefineCustomRealVariable(
"citus.shared_pool_size_maintenance_quota", "citus.shared_pool_size_maintenance_quota",
gettext_noop("Sets the maximum number of connections allowed per worker node " gettext_noop("Sets the fraction of citus.max_shared_pool_size reserved "
"across all the backends from this node. Setting to -1 disables " "for maintenance operations only. "
"connections throttling. Setting to 0 makes it auto-adjust, meaning " "Setting it to 0 disables the quota. "
"equal to max_connections on the coordinator."), "This way the maintenance and regular connections will share the same pool"),
gettext_noop("As a rule of thumb, the value should be at most equal to the " NULL,
"max_connections on the remote nodes."),
&SharedPoolSizeMaintenanceQuota, &SharedPoolSizeMaintenanceQuota,
0.1, 0, 1, 0.1, 0, 1,
PGC_SIGHUP, PGC_SIGHUP,
@ -2030,7 +2029,7 @@ RegisterCitusConfigVariables(void)
"citus.max_databases_per_worker_tracked", "citus.max_databases_per_worker_tracked",
gettext_noop("Sets the amount of databases per worker tracked."), gettext_noop("Sets the amount of databases per worker tracked."),
gettext_noop( gettext_noop(
"This configuration value compliments the citus.max_worker_nodes_tracked." "This configuration value complements the citus.max_worker_nodes_tracked."
"It should be used when there are more then one database with Citus in cluster," "It should be used when there are more then one database with Citus in cluster,"
"and, effectively, limits the size of the hash table with connections per worker + database." "and, effectively, limits the size of the hash table with connections per worker + database."
"Currently, it does not affect the connection management logic and serves only statistical purposes."), "Currently, it does not affect the connection management logic and serves only statistical purposes."),
@ -2709,7 +2708,8 @@ RegisterCitusConfigVariables(void)
DefineCustomStringVariable( DefineCustomStringVariable(
"citus.maintenance_management_database", "citus.maintenance_management_database",
gettext_noop("Database for cluster-wide maintenance operations across all databases"), gettext_noop("Database for cluster-wide maintenance operations across all databases"),
NULL, gettext_noop("It should be enabled when there are more than "
"one database with Citus in a cluster."),
&MaintenanceManagementDatabase, &MaintenanceManagementDatabase,
"", "",
PGC_SIGHUP, PGC_SIGHUP,

View File

@ -86,7 +86,6 @@ typedef struct BackendManagementShmemData
typedef enum CitusBackendType typedef enum CitusBackendType
{ {
CITUS_BACKEND_NOT_ASSIGNED, CITUS_BACKEND_NOT_ASSIGNED,
CITUS_MAINTENANCE_DAEMON_BACKEND,
CITUS_INTERNAL_BACKEND, CITUS_INTERNAL_BACKEND,
CITUS_REBALANCER_BACKEND, CITUS_REBALANCER_BACKEND,
CITUS_RUN_COMMAND_BACKEND, CITUS_RUN_COMMAND_BACKEND,
@ -97,7 +96,6 @@ static const char *CitusBackendPrefixes[] = {
CITUS_APPLICATION_NAME_PREFIX, CITUS_APPLICATION_NAME_PREFIX,
CITUS_REBALANCER_APPLICATION_NAME_PREFIX, CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX, CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX,
CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX,
}; };
static const CitusBackendType CitusBackendTypes[] = { static const CitusBackendType CitusBackendTypes[] = {
@ -1446,18 +1444,6 @@ IsCitusShardTransferBackend(void)
prefixLength) == 0; prefixLength) == 0;
} }
bool
IsCitusMaintenanceDaemonBackend(void)
{
if (CurrentBackendType == CITUS_BACKEND_NOT_ASSIGNED)
{
DetermineCitusBackendType(application_name);
}
return CurrentBackendType == CITUS_MAINTENANCE_DAEMON_BACKEND;
}
/* /*
* DetermineCitusBackendType determines the type of backend based on the application_name. * DetermineCitusBackendType determines the type of backend based on the application_name.

View File

@ -153,7 +153,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = 0; int connectionFlags = WAIT_FOR_CONNECTION | REQUIRE_MAINTENANCE_CONNECTION;
if (workerNode->groupId == localGroupId) if (workerNode->groupId == localGroupId)
{ {

View File

@ -120,7 +120,7 @@ static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false; static volatile sig_atomic_t got_SIGTERM = false;
/* set to true when becoming a maintenance daemon */ /* set to true when becoming a maintenance daemon */
static bool IsMaintenanceDaemon = false; bool IsMaintenanceDaemon = false;
static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS);
static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS); static void MaintenanceDaemonSigHupHandler(SIGNAL_ARGS);
@ -508,7 +508,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid); MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
/* make worker recognizable in pg_stat_activity */ /* make worker recognizable in pg_stat_activity */
pgstat_report_appname(CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX); pgstat_report_appname("Citus Maintenance Daemon");
/* /*
* Terminate orphaned metadata sync daemons spawned from previously terminated * Terminate orphaned metadata sync daemons spawned from previously terminated
@ -1248,7 +1248,8 @@ char
if (!maintenanceDatabaseOid) if (!maintenanceDatabaseOid)
{ {
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Database %s doesn't exists, please check the citus.maintenance_management_database parameter.", errmsg("Database \"%s\" doesn't exists, please check the citus.maintenance_management_database parameter. "
"Applying a default value instead.",
MaintenanceManagementDatabase))); MaintenanceManagementDatabase)));
result = ""; result = "";
} }

View File

@ -78,7 +78,6 @@ extern bool IsRebalancerInternalBackend(void);
extern bool IsCitusRunCommandBackend(void); extern bool IsCitusRunCommandBackend(void);
extern bool IsExternalClientBackend(void); extern bool IsExternalClientBackend(void);
extern bool IsCitusShardTransferBackend(void); extern bool IsCitusShardTransferBackend(void);
extern bool IsCitusMaintenanceDaemonBackend(void);
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 #define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999 #define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999

View File

@ -44,8 +44,6 @@
/* application name used for connections made by run_command_on_* */ /* application name used for connections made by run_command_on_* */
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid=" #define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
#define CITUS_MAINTENANCE_DAEMON_APPLICATION_NAME_PREFIX "Citus Maintenance Daemon"
/* /*
* application name prefix for move/split replication connections. * application name prefix for move/split replication connections.
* *

View File

@ -0,0 +1,502 @@
-- This test verfies a behavioir of maintenance daemon in multi-database environment
-- It checks two things:
-- 1. Maintenance daemons should not cache connections, except the one for the citus.maintenance_management_database
-- 2. 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota
-- 2. Distributed deadlock detection should run only on citus.maintenance_management_database.
--
-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there.
SELECT $definition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1';
SELECT pg_reload_conf();
$definition$ AS turn_off_maintenance
\gset
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM SET citus.maintenance_management_database = 'regression';
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
\gset
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
db_create_statement text;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('CREATE DATABASE %I', db_name)
INTO db_create_statement;
PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port),
db_create_statement);
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'CREATE EXTENSION citus;');
IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport))
FROM pg_dist_node
WHERE groupid != 0 AND isactive AND noderole = 'primary';
END IF;
END LOOP;
END;
$do$;
$definition$ AS create_databases
\gset
-- Code reiles heavily on dblink for cross-db and cross-node queries
CREATE EXTENSION IF NOT EXISTS dblink;
-- Disable maintenance operations to prepare the environment
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
:turn_off_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Create databases
\c - - - :worker_1_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
\c - - - :worker_2_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
\c - - - :master_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
count
---------------------------------------------------------------------
100
(1 row)
-- Generate distributed transactions
\c - - - :master_port
DO
$do$
DECLARE
index int;
db_name text;
transaction_to_abort_name text;
transaction_to_commit_name text;
transaction_to_be_forgotten text;
coordinator_port int;
BEGIN
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('citus_0_1234_3_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_abort_name;
SELECT format('citus_0_1234_4_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_commit_name;
SELECT format('citus_0_should_be_forgotten_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_be_forgotten;
SELECT setting::int
FROM pg_settings
WHERE name = 'port'
INTO coordinator_port;
-- Prepare transactions on workers
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport),
format($worker_cmd$
BEGIN;
CREATE TABLE should_abort
(value int);
PREPARE TRANSACTION '%s';
BEGIN;
CREATE TABLE should_commit
(value int);
PREPARE TRANSACTION '%s';
$worker_cmd$, transaction_to_abort_name, transaction_to_commit_name))
FROM pg_dist_node
WHERE groupid != 0
AND isactive
AND noderole = 'primary';
-- Fill the pg_dist_transaction
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port),
format($coordinator_cmd$
INSERT INTO pg_dist_transaction
SELECT groupid, '%s' FROM pg_dist_node
UNION ALL
SELECT groupid, '%s' FROM pg_dist_node;
$coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten));
END LOOP;
END;
$do$;
-- Verify state before enabling maintenance
\c - - - :master_port
SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT *
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
pg_dist_transaction_before_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_before_recover_worker_1_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_worker_1_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_before_recover_worker_2_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_before_recovery_worker_2_test
---------------------------------------------------------------------
t
(1 row)
-- Turn on the maintenance
\c - - - :master_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
:turn_on_maintenance
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
-- Let maintenance do it's work...
SELECT pg_sleep_for('10 seconds'::interval);
pg_sleep_for
---------------------------------------------------------------------
(1 row)
-- Verify maintenance result
SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT *
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
pg_dist_transaction_after_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_after_recovery_coordinator_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_after_recover_worker_1_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_after_recovery_worker_1_test
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
pg_prepared_xacts_after_recover_worker_2_test
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
cached_connections_after_recovery_worker_2_test
---------------------------------------------------------------------
t
(1 row)
-- Cleanup
\c - - - :master_port
SELECT $definition$
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.maintenance_management_database;
SELECT pg_reload_conf();
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'DROP EXTENSION citus;');
END LOOP;
END;
$do$;
-- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink
DROP DATABASE db1 WITH (FORCE);
DROP DATABASE db2 WITH (FORCE);
DROP DATABASE db3 WITH (FORCE);
DROP DATABASE db4 WITH (FORCE);
DROP DATABASE db5 WITH (FORCE);
DROP DATABASE db6 WITH (FORCE);
DROP DATABASE db7 WITH (FORCE);
DROP DATABASE db8 WITH (FORCE);
DROP DATABASE db9 WITH (FORCE);
DROP DATABASE db10 WITH (FORCE);
DROP DATABASE db11 WITH (FORCE);
DROP DATABASE db12 WITH (FORCE);
DROP DATABASE db13 WITH (FORCE);
DROP DATABASE db14 WITH (FORCE);
DROP DATABASE db15 WITH (FORCE);
DROP DATABASE db16 WITH (FORCE);
DROP DATABASE db17 WITH (FORCE);
DROP DATABASE db18 WITH (FORCE);
DROP DATABASE db19 WITH (FORCE);
DROP DATABASE db20 WITH (FORCE);
DROP DATABASE db21 WITH (FORCE);
DROP DATABASE db22 WITH (FORCE);
DROP DATABASE db23 WITH (FORCE);
DROP DATABASE db24 WITH (FORCE);
DROP DATABASE db25 WITH (FORCE);
DROP DATABASE db26 WITH (FORCE);
DROP DATABASE db27 WITH (FORCE);
DROP DATABASE db28 WITH (FORCE);
DROP DATABASE db29 WITH (FORCE);
DROP DATABASE db30 WITH (FORCE);
DROP DATABASE db31 WITH (FORCE);
DROP DATABASE db32 WITH (FORCE);
DROP DATABASE db33 WITH (FORCE);
DROP DATABASE db34 WITH (FORCE);
DROP DATABASE db35 WITH (FORCE);
DROP DATABASE db36 WITH (FORCE);
DROP DATABASE db37 WITH (FORCE);
DROP DATABASE db38 WITH (FORCE);
DROP DATABASE db39 WITH (FORCE);
DROP DATABASE db40 WITH (FORCE);
DROP DATABASE db41 WITH (FORCE);
DROP DATABASE db42 WITH (FORCE);
DROP DATABASE db43 WITH (FORCE);
DROP DATABASE db44 WITH (FORCE);
DROP DATABASE db45 WITH (FORCE);
DROP DATABASE db46 WITH (FORCE);
DROP DATABASE db47 WITH (FORCE);
DROP DATABASE db48 WITH (FORCE);
DROP DATABASE db49 WITH (FORCE);
DROP DATABASE db50 WITH (FORCE);
DROP DATABASE db51 WITH (FORCE);
DROP DATABASE db52 WITH (FORCE);
DROP DATABASE db53 WITH (FORCE);
DROP DATABASE db54 WITH (FORCE);
DROP DATABASE db55 WITH (FORCE);
DROP DATABASE db56 WITH (FORCE);
DROP DATABASE db57 WITH (FORCE);
DROP DATABASE db58 WITH (FORCE);
DROP DATABASE db59 WITH (FORCE);
DROP DATABASE db60 WITH (FORCE);
DROP DATABASE db61 WITH (FORCE);
DROP DATABASE db62 WITH (FORCE);
DROP DATABASE db63 WITH (FORCE);
DROP DATABASE db64 WITH (FORCE);
DROP DATABASE db65 WITH (FORCE);
DROP DATABASE db66 WITH (FORCE);
DROP DATABASE db67 WITH (FORCE);
DROP DATABASE db68 WITH (FORCE);
DROP DATABASE db69 WITH (FORCE);
DROP DATABASE db70 WITH (FORCE);
DROP DATABASE db71 WITH (FORCE);
DROP DATABASE db72 WITH (FORCE);
DROP DATABASE db73 WITH (FORCE);
DROP DATABASE db74 WITH (FORCE);
DROP DATABASE db75 WITH (FORCE);
DROP DATABASE db76 WITH (FORCE);
DROP DATABASE db77 WITH (FORCE);
DROP DATABASE db78 WITH (FORCE);
DROP DATABASE db79 WITH (FORCE);
DROP DATABASE db80 WITH (FORCE);
DROP DATABASE db81 WITH (FORCE);
DROP DATABASE db82 WITH (FORCE);
DROP DATABASE db83 WITH (FORCE);
DROP DATABASE db84 WITH (FORCE);
DROP DATABASE db85 WITH (FORCE);
DROP DATABASE db86 WITH (FORCE);
DROP DATABASE db87 WITH (FORCE);
DROP DATABASE db88 WITH (FORCE);
DROP DATABASE db89 WITH (FORCE);
DROP DATABASE db90 WITH (FORCE);
DROP DATABASE db91 WITH (FORCE);
DROP DATABASE db92 WITH (FORCE);
DROP DATABASE db93 WITH (FORCE);
DROP DATABASE db94 WITH (FORCE);
DROP DATABASE db95 WITH (FORCE);
DROP DATABASE db96 WITH (FORCE);
DROP DATABASE db97 WITH (FORCE);
DROP DATABASE db98 WITH (FORCE);
DROP DATABASE db99 WITH (FORCE);
DROP DATABASE db100 WITH (FORCE);
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
$definition$ AS cleanup
\gset
:cleanup
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_1_port
:cleanup
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_2_port
:cleanup
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
count
---------------------------------------------------------------------
0
(1 row)

View File

@ -220,6 +220,7 @@ test: multi_generate_ddl_commands
test: multi_create_shards test: multi_create_shards
test: multi_transaction_recovery test: multi_transaction_recovery
test: multi_transaction_recovery_multiple_databases test: multi_transaction_recovery_multiple_databases
test: multi_maintenance_multiple_databases
test: local_dist_join_modifications test: local_dist_join_modifications
test: local_table_join test: local_table_join

View File

@ -462,7 +462,7 @@ push(@pgOptions, "wal_retrieve_retry_interval=250");
push(@pgOptions, "max_logical_replication_workers=50"); push(@pgOptions, "max_logical_replication_workers=50");
push(@pgOptions, "max_wal_senders=50"); push(@pgOptions, "max_wal_senders=50");
push(@pgOptions, "max_worker_processes=50"); push(@pgOptions, "max_worker_processes=150");
if ($majorversion >= "14") { if ($majorversion >= "14") {
# disable compute_query_id so that we don't get Query Identifiers # disable compute_query_id so that we don't get Query Identifiers

View File

@ -0,0 +1,423 @@
-- This test verfies a behavioir of maintenance daemon in multi-database environment
-- It checks two things:
-- 1. Maintenance daemons should not cache connections, except the one for the citus.maintenance_management_database
-- 2. 2PC transaction recovery should respect the citus.shared_pool_size_maintenance_quota
-- 2. Distributed deadlock detection should run only on citus.maintenance_management_database.
--
-- To do that, it created 100 databases and syntactically generates distributed transactions in various states there.
SELECT $definition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1';
SELECT pg_reload_conf();
$definition$ AS turn_off_maintenance
\gset
SELECT $deinition$
ALTER SYSTEM SET citus.recover_2pc_interval TO '5s';
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM SET citus.maintenance_management_database = 'regression';
SELECT pg_reload_conf();
$deinition$ AS turn_on_maintenance
\gset
SELECT $definition$
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
db_create_statement text;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('CREATE DATABASE %I', db_name)
INTO db_create_statement;
PERFORM dblink(format('dbname=regression host=localhost port=%s user=postgres', current_port),
db_create_statement);
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'CREATE EXTENSION citus;');
IF (SELECT groupid = 0 FROM pg_dist_node WHERE nodeport = current_port) THEN
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
format($add_workers$SELECT citus_add_node('localhost', %s);COMMIT;$add_workers$, nodeport))
FROM pg_dist_node
WHERE groupid != 0 AND isactive AND noderole = 'primary';
END IF;
END LOOP;
END;
$do$;
$definition$ AS create_databases
\gset
-- Code reiles heavily on dblink for cross-db and cross-node queries
CREATE EXTENSION IF NOT EXISTS dblink;
-- Disable maintenance operations to prepare the environment
:turn_off_maintenance
\c - - - :worker_1_port
:turn_off_maintenance
\c - - - :worker_2_port
:turn_off_maintenance
-- Create databases
\c - - - :worker_1_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
\c - - - :worker_2_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
\c - - - :master_port
:create_databases
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
-- Generate distributed transactions
\c - - - :master_port
DO
$do$
DECLARE
index int;
db_name text;
transaction_to_abort_name text;
transaction_to_commit_name text;
transaction_to_be_forgotten text;
coordinator_port int;
BEGIN
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
SELECT format('citus_0_1234_3_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_abort_name;
SELECT format('citus_0_1234_4_0_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_commit_name;
SELECT format('citus_0_should_be_forgotten_%s', oid)
FROM pg_database
WHERE datname = db_name
INTO transaction_to_be_forgotten;
SELECT setting::int
FROM pg_settings
WHERE name = 'port'
INTO coordinator_port;
-- Prepare transactions on workers
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, nodeport),
format($worker_cmd$
BEGIN;
CREATE TABLE should_abort
(value int);
PREPARE TRANSACTION '%s';
BEGIN;
CREATE TABLE should_commit
(value int);
PREPARE TRANSACTION '%s';
$worker_cmd$, transaction_to_abort_name, transaction_to_commit_name))
FROM pg_dist_node
WHERE groupid != 0
AND isactive
AND noderole = 'primary';
-- Fill the pg_dist_transaction
PERFORM dblink_exec(format('dbname=%s host=localhost port=%s user=postgres', db_name, coordinator_port),
format($coordinator_cmd$
INSERT INTO pg_dist_transaction
SELECT groupid, '%s' FROM pg_dist_node
UNION ALL
SELECT groupid, '%s' FROM pg_dist_node;
$coordinator_cmd$, transaction_to_commit_name, transaction_to_be_forgotten));
END LOOP;
END;
$do$;
-- Verify state before enabling maintenance
\c - - - :master_port
SELECT count(*) = 400 AS pg_dist_transaction_before_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT *
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
SELECT count(*) = 0 AS cached_connections_before_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_1_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_2_port
SELECT count(*) = 100 AS pg_prepared_xacts_before_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) = 0 AS cached_connections_before_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
-- Turn on the maintenance
\c - - - :master_port
:turn_on_maintenance
\c - - - :worker_1_port
:turn_on_maintenance
\c - - - :worker_2_port
:turn_on_maintenance
\c - - - :master_port
-- Let maintenance do it's work...
SELECT pg_sleep_for('10 seconds'::interval);
-- Verify maintenance result
SELECT count(*) = 0 AS pg_dist_transaction_after_recovery_coordinator_test
FROM pg_database,
dblink(format('dbname=%s host=localhost port=%s user=postgres', datname,
(SELECT setting::int FROM pg_settings WHERE name = 'port')),
$statement$
SELECT *
FROM pg_dist_transaction
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%'
$statement$) AS t(groupid integer, gid text)
WHERE datname LIKE 'db%';
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_coordinator_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_1_port
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_1_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_1_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
\c - - - :worker_2_port
SELECT count(*) = 0 AS pg_prepared_xacts_after_recover_worker_2_test
FROM pg_prepared_xacts
WHERE gid LIKE 'citus_0_1234_4_0_%'
OR gid LIKE 'citus_0_should_be_forgotten_%';
SELECT count(*) BETWEEN 1 AND 3 AS cached_connections_after_recovery_worker_2_test
FROM pg_stat_activity
WHERE state = 'idle'
AND now() - backend_start > '5 seconds'::interval;
-- Cleanup
\c - - - :master_port
SELECT $definition$
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.maintenance_management_database;
SELECT pg_reload_conf();
DO
$do$
DECLARE
index int;
db_name text;
current_port int;
BEGIN
SELECT setting::int FROM pg_settings WHERE name = 'port'
INTO current_port;
FOR index IN 1..100
LOOP
SELECT format('db%s', index)
INTO db_name;
PERFORM dblink(format('dbname=%s host=localhost port=%s user=postgres', db_name, current_port),
'DROP EXTENSION citus;');
END LOOP;
END;
$do$;
-- Dropping tables explicitly because ProcSignalBarrier prevents from using dblink
DROP DATABASE db1 WITH (FORCE);
DROP DATABASE db2 WITH (FORCE);
DROP DATABASE db3 WITH (FORCE);
DROP DATABASE db4 WITH (FORCE);
DROP DATABASE db5 WITH (FORCE);
DROP DATABASE db6 WITH (FORCE);
DROP DATABASE db7 WITH (FORCE);
DROP DATABASE db8 WITH (FORCE);
DROP DATABASE db9 WITH (FORCE);
DROP DATABASE db10 WITH (FORCE);
DROP DATABASE db11 WITH (FORCE);
DROP DATABASE db12 WITH (FORCE);
DROP DATABASE db13 WITH (FORCE);
DROP DATABASE db14 WITH (FORCE);
DROP DATABASE db15 WITH (FORCE);
DROP DATABASE db16 WITH (FORCE);
DROP DATABASE db17 WITH (FORCE);
DROP DATABASE db18 WITH (FORCE);
DROP DATABASE db19 WITH (FORCE);
DROP DATABASE db20 WITH (FORCE);
DROP DATABASE db21 WITH (FORCE);
DROP DATABASE db22 WITH (FORCE);
DROP DATABASE db23 WITH (FORCE);
DROP DATABASE db24 WITH (FORCE);
DROP DATABASE db25 WITH (FORCE);
DROP DATABASE db26 WITH (FORCE);
DROP DATABASE db27 WITH (FORCE);
DROP DATABASE db28 WITH (FORCE);
DROP DATABASE db29 WITH (FORCE);
DROP DATABASE db30 WITH (FORCE);
DROP DATABASE db31 WITH (FORCE);
DROP DATABASE db32 WITH (FORCE);
DROP DATABASE db33 WITH (FORCE);
DROP DATABASE db34 WITH (FORCE);
DROP DATABASE db35 WITH (FORCE);
DROP DATABASE db36 WITH (FORCE);
DROP DATABASE db37 WITH (FORCE);
DROP DATABASE db38 WITH (FORCE);
DROP DATABASE db39 WITH (FORCE);
DROP DATABASE db40 WITH (FORCE);
DROP DATABASE db41 WITH (FORCE);
DROP DATABASE db42 WITH (FORCE);
DROP DATABASE db43 WITH (FORCE);
DROP DATABASE db44 WITH (FORCE);
DROP DATABASE db45 WITH (FORCE);
DROP DATABASE db46 WITH (FORCE);
DROP DATABASE db47 WITH (FORCE);
DROP DATABASE db48 WITH (FORCE);
DROP DATABASE db49 WITH (FORCE);
DROP DATABASE db50 WITH (FORCE);
DROP DATABASE db51 WITH (FORCE);
DROP DATABASE db52 WITH (FORCE);
DROP DATABASE db53 WITH (FORCE);
DROP DATABASE db54 WITH (FORCE);
DROP DATABASE db55 WITH (FORCE);
DROP DATABASE db56 WITH (FORCE);
DROP DATABASE db57 WITH (FORCE);
DROP DATABASE db58 WITH (FORCE);
DROP DATABASE db59 WITH (FORCE);
DROP DATABASE db60 WITH (FORCE);
DROP DATABASE db61 WITH (FORCE);
DROP DATABASE db62 WITH (FORCE);
DROP DATABASE db63 WITH (FORCE);
DROP DATABASE db64 WITH (FORCE);
DROP DATABASE db65 WITH (FORCE);
DROP DATABASE db66 WITH (FORCE);
DROP DATABASE db67 WITH (FORCE);
DROP DATABASE db68 WITH (FORCE);
DROP DATABASE db69 WITH (FORCE);
DROP DATABASE db70 WITH (FORCE);
DROP DATABASE db71 WITH (FORCE);
DROP DATABASE db72 WITH (FORCE);
DROP DATABASE db73 WITH (FORCE);
DROP DATABASE db74 WITH (FORCE);
DROP DATABASE db75 WITH (FORCE);
DROP DATABASE db76 WITH (FORCE);
DROP DATABASE db77 WITH (FORCE);
DROP DATABASE db78 WITH (FORCE);
DROP DATABASE db79 WITH (FORCE);
DROP DATABASE db80 WITH (FORCE);
DROP DATABASE db81 WITH (FORCE);
DROP DATABASE db82 WITH (FORCE);
DROP DATABASE db83 WITH (FORCE);
DROP DATABASE db84 WITH (FORCE);
DROP DATABASE db85 WITH (FORCE);
DROP DATABASE db86 WITH (FORCE);
DROP DATABASE db87 WITH (FORCE);
DROP DATABASE db88 WITH (FORCE);
DROP DATABASE db89 WITH (FORCE);
DROP DATABASE db90 WITH (FORCE);
DROP DATABASE db91 WITH (FORCE);
DROP DATABASE db92 WITH (FORCE);
DROP DATABASE db93 WITH (FORCE);
DROP DATABASE db94 WITH (FORCE);
DROP DATABASE db95 WITH (FORCE);
DROP DATABASE db96 WITH (FORCE);
DROP DATABASE db97 WITH (FORCE);
DROP DATABASE db98 WITH (FORCE);
DROP DATABASE db99 WITH (FORCE);
DROP DATABASE db100 WITH (FORCE);
SELECT count(*)
FROM pg_database
WHERE datname LIKE 'db%';
$definition$ AS cleanup
\gset
:cleanup
\c - - - :worker_1_port
:cleanup
\c - - - :worker_2_port
:cleanup