From 4312b0656b6d8487e1cadc9d0ff0870c4bdd0af2 Mon Sep 17 00:00:00 2001 From: ivyazmitinov Date: Thu, 27 Jun 2024 17:50:04 +0200 Subject: [PATCH] - Fix limits check for local nodes - WIP test_multiple_databases_distributed_deadlock_detection --- .../connection/shared_connection_stats.c | 19 +++-- src/test/regress/citus_tests/common.py | 8 ++ ...atabases_distributed_deadlock_detection.py | 84 +++++++++++++------ 3 files changed, 77 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 33c6943c1..4338a7860 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -461,8 +461,8 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, currentConnectionsCount = workerNodeConnectionEntry->regularConnectionsCount; } - bool remoteNodeLimitExceeded = currentConnectionsCount + 1 > - currentConnectionsLimit; + bool currentConnectionsLimitExceeded = currentConnectionsCount + 1 > + currentConnectionsLimit; /* * For local nodes, solely relying on citus.max_shared_pool_size or @@ -476,11 +476,11 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, * a reasonable pace. The latter limit typically kicks in when the database * is issued lots of concurrent sessions at the same time, such as benchmarks. */ - bool localNodeLimitExceeded = + bool localNodeConnectionsLimitExceeded = connectionToLocalNode && (GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES || - GetExternalClientBackendCount() + 1 > currentConnectionsLimit); - if (remoteNodeLimitExceeded || localNodeLimitExceeded) + GetExternalClientBackendCount() + 1 > GetLocalSharedPoolSize()); + if (currentConnectionsLimitExceeded || localNodeConnectionsLimitExceeded) { connectionSlotAvailable = false; } @@ -502,9 +502,10 @@ IncrementSharedConnectionCounterInternal(uint32 externalFlags, if (IsLoggableLevel(DEBUG4)) { ereport(DEBUG4, errmsg( - "Incrementing connection counter. " + "Incrementing %s connection counter. " "Current regular connections: %i, maintenance connections: %i. " "Connection slot to %s:%i database %i is %s", + maintenanceConnection ? "maintenance" : "regular", workerNodeConnectionEntry->regularConnectionsCount, workerNodeConnectionEntry->maintenanceConnectionsCount, hostname, @@ -568,7 +569,8 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, Assert(workerNodeConnectionEntry->regularConnectionsCount > 0 || workerNodeConnectionEntry->maintenanceConnectionsCount > 0); - if (externalFlags & MAINTENANCE_CONNECTION) + bool maintenanceConnection = externalFlags & MAINTENANCE_CONNECTION; + if (maintenanceConnection) { workerNodeConnectionEntry->maintenanceConnectionsCount -= 1; } @@ -580,9 +582,10 @@ DecrementSharedConnectionCounterInternal(uint32 externalFlags, if (IsLoggableLevel(DEBUG4)) { ereport(DEBUG4, errmsg( - "Decrementing connection counter. " + "Decrementing %s connection counter. " "Current regular connections: %i, maintenance connections: %i. " "Connection slot to %s:%i database %i is released", + maintenanceConnection ? "maintenance" : "regular", workerNodeConnectionEntry->regularConnectionsCount, workerNodeConnectionEntry->maintenanceConnectionsCount, hostname, diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 322788462..12cc8d54d 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -977,6 +977,14 @@ class Postgres(QueryRunner): for config in configs: self.sql(f"alter system set {config}") + def reset_configuration(self, *configs): + """Reset specific Postgres settings using ALTER SYSTEM RESET + NOTE: after configuring a call to reload or restart is needed for the + settings to become effective. + """ + for config in configs: + self.sql(f"alter system reset {config}") + def log_handle(self): """Returns the opened logfile at the current end of the log diff --git a/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py index 35b0e36fe..b71e0211f 100644 --- a/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py +++ b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py @@ -10,29 +10,38 @@ DATABASES_NUMBER = 40 async def test_multiple_databases_distributed_deadlock_detection(cluster): # Disable maintenance on all nodes for node in cluster.nodes: - node.sql("ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';") - node.sql("ALTER SYSTEM SET citus.distributed_deadlock_detection_factor = '-1';") - node.sql("ALTER SYSTEM SET citus.max_maintenance_shared_pool_size = 10;") - node.sql("SELECT pg_reload_conf();") + node.configure( + "citus.recover_2pc_interval = '-1'", + "citus.distributed_deadlock_detection_factor = '-1'", + "citus.max_maintenance_shared_pool_size = 5", + # "log_min_messages = 'debug4'", + # "citus.main_db='postgres'" + ) + node.restart() # Prepare database names for test - db_names = [f'db{db_index}' for db_index in range(1, DATABASES_NUMBER + 1)] + db_names = [f"db{db_index}" for db_index in range(1, DATABASES_NUMBER + 1)] # Create and configure databases for db_name in db_names: nodes = cluster.workers + [cluster.coordinator] for node in nodes: - node.sql(f'CREATE DATABASE {db_name}') + node.sql(f"CREATE DATABASE {db_name}") with node.cur(dbname=db_name) as node_cursor: node_cursor.execute("CREATE EXTENSION citus;") if node == cluster.coordinator: for worker in cluster.workers: - node_cursor.execute(f"SELECT citus_add_node('localhost', {worker.port});") - node_cursor.execute(""" + node_cursor.execute( + "SELECT pg_catalog.citus_add_node(%s, %s)", + (worker.host, worker.port), + ) + node_cursor.execute( + """ CREATE TABLE public.deadlock_detection_test (user_id int UNIQUE, some_val int); SELECT create_distributed_table('public.deadlock_detection_test', 'user_id'); INSERT INTO public.deadlock_detection_test SELECT i, i FROM generate_series(1,2) i; - """) + """ + ) print("Setup is done") @@ -40,26 +49,43 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): """Function to prepare a deadlock query in a given database""" # Init connections and store for later commits if run_on_coordinator: - first_connection = await cluster.coordinator.aconn(dbname=db_name, autocommit=False) + first_connection = await cluster.coordinator.aconn( + dbname=db_name, autocommit=False + ) first_cursor = first_connection.cursor() - second_connection = await cluster.coordinator.aconn(dbname=db_name, autocommit=False) + second_connection = await cluster.coordinator.aconn( + dbname=db_name, autocommit=False + ) second_cursor = second_connection.cursor() else: - first_connection = await cluster.workers[0].aconn(dbname=db_name, autocommit=False) + first_connection = await cluster.workers[0].aconn( + dbname=db_name, autocommit=False + ) first_cursor = first_connection.cursor() - second_connection = await cluster.workers[1].aconn(dbname=db_name, autocommit=False) + second_connection = await cluster.workers[1].aconn( + dbname=db_name, autocommit=False + ) second_cursor = second_connection.cursor() # initiate deadlock - await first_cursor.execute("UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 1;") - await second_cursor.execute("UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 2;") + await first_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 1;" + ) + await second_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 2;" + ) # Test that deadlock is resolved by a maintenance daemon with pytest.raises(DeadlockDetected): + async def run_deadlocked_queries(): await asyncio.gather( - second_cursor.execute("UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 1;"), - first_cursor.execute("UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 2;") + second_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 2 WHERE user_id = 1;" + ), + first_cursor.execute( + "UPDATE public.deadlock_detection_test SET some_val = 1 WHERE user_id = 2;" + ), ) await asyncio.wait_for(run_deadlocked_queries(), 300) @@ -72,16 +98,18 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): # Check that queries are deadlocked databases_with_deadlock = set() while len(databases_with_deadlock) < DATABASES_NUMBER: - for db_name in (db for db in db_names if - db not in databases_with_deadlock): + for db_name in (db for db in db_names if db not in databases_with_deadlock): for node in cluster.nodes: async with node.acur(dbname=db_name) as cursor: expected_lock_count = 4 if node == cluster.coordinator else 2 - await cursor.execute(f""" - SELECT count(*) = {expected_lock_count} AS deadlock_created + await cursor.execute( + """ + SELECT count(*) = %s AS deadlock_created FROM pg_locks INNER JOIN pg_class pc ON relation = oid - WHERE relname LIKE 'deadlock_detection_test%'""") + WHERE relname LIKE 'deadlock_detection_test%%'""", + (expected_lock_count,), + ) queries_deadlocked = await cursor.fetchone() if queries_deadlocked[0]: print(f"Queries are deadlocked on {db_name}") @@ -91,14 +119,18 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): # Enable maintenance back for node in cluster.nodes: - node.sql("ALTER SYSTEM RESET citus.recover_2pc_interval;") - node.sql("ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;") - node.sql("SELECT pg_reload_conf();") + node.reset_configuration( + "citus.recover_2pc_interval", + "citus.distributed_deadlock_detection_factor", + ) + node.reload() tasks = list() for idx, db_name in enumerate(db_names): run_on_coordinator = True if idx % 3 == 0 else False - tasks.append(test_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)) + tasks.append( + test_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator) + ) tasks.append(enable_maintenance_when_deadlocks_ready())