From 1c219fe9454c778e0f4e444b4652b733483797f0 Mon Sep 17 00:00:00 2001 From: ivyazmitinov Date: Fri, 28 Jun 2024 16:49:46 +0200 Subject: [PATCH] Done test_multiple_databases_distributed_deadlock_detection --- src/test/regress/citus_tests/common.py | 11 ++++++ .../test/test_maintenancedeamon.py | 2 +- ...atabases_distributed_deadlock_detection.py | 35 +++++++++++++------ 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index e3dbcaf8b..a1990203f 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -1026,6 +1026,15 @@ class Postgres(QueryRunner): self.databases.add(name) self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name))) + def drop_database(self, name): + self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=name) + self.sql( + sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format( + sql.Identifier(name) + ) + ) + self.databases.remove(name) + def create_schema(self, name): self.schemas.add(name) self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) @@ -1055,11 +1064,13 @@ class Postgres(QueryRunner): def cleanup_databases(self): for database in self.databases: + self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=database) self.sql( sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format( sql.Identifier(database) ) ) + self.databases.clear() def cleanup_schemas(self): for schema in self.schemas: diff --git a/src/test/regress/citus_tests/test/test_maintenancedeamon.py b/src/test/regress/citus_tests/test/test_maintenancedeamon.py index 3f6cb501e..1eb4e28c9 100644 --- a/src/test/regress/citus_tests/test/test_maintenancedeamon.py +++ b/src/test/regress/citus_tests/test/test_maintenancedeamon.py @@ -62,7 +62,7 @@ def test_set_maindb(cluster_factory): wait_until_maintenance_deamons_start(2, cluster) - cluster.coordinator.sql("DROP DATABASE mymaindb;") + cluster.coordinator.drop_database("mymaindb") wait_until_maintenance_deamons_start(1, cluster) diff --git a/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py index 6db6db69f..b9cd9596b 100644 --- a/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py +++ b/src/test/regress/citus_tests/test/test_multiple_databases_distributed_deadlock_detection.py @@ -3,18 +3,20 @@ import asyncio import pytest from psycopg.errors import DeadlockDetected +# For every database there is expected to be 2 queries, +# so ~80 connections will be held by deadlocks. Another 5 is expected to be used by maintenance daemon, +# leaving ~15 available DATABASES_NUMBER = 40 async def test_multiple_databases_distributed_deadlock_detection(cluster): + # Disable maintenance on all nodes for node in cluster.nodes: node.configure( "citus.recover_2pc_interval = '-1'", "citus.distributed_deadlock_detection_factor = '-1'", "citus.max_maintenance_shared_pool_size = 5", - # "log_min_messages = 'debug4'", - # "citus.main_db='postgres'" ) node.restart() @@ -42,9 +44,7 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): """ ) - print("Setup is done") - - async def test_deadlock(db_name, run_on_coordinator): + async def create_deadlock(db_name, run_on_coordinator): """Function to prepare a deadlock query in a given database""" # Init connections and store for later commits if run_on_coordinator: @@ -89,11 +89,13 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): await asyncio.wait_for(run_deadlocked_queries(), 300) + await first_connection.commit() + await second_connection.commit() + async def enable_maintenance_when_deadlocks_ready(): """Function to enable maintenance daemons, when all the expected deadlock queries are ready""" # Let deadlocks commence await asyncio.sleep(2) - # cluster.debug() # Check that queries are deadlocked databases_with_deadlock = set() while len(databases_with_deadlock) < DATABASES_NUMBER: @@ -111,11 +113,7 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): ) queries_deadlocked = await cursor.fetchone() if queries_deadlocked[0]: - print(f"Queries are deadlocked on {db_name}") databases_with_deadlock.add(db_name) - - print("Queries on all databases are deadlocked, enabling maintenance") - # Enable maintenance back for node in cluster.nodes: node.reset_configuration( @@ -124,13 +122,28 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster): ) node.reload() + # Distribute deadlocked queries among all nodes in the cluster tasks = list() for idx, db_name in enumerate(db_names): run_on_coordinator = True if idx % 3 == 0 else False tasks.append( - test_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator) + create_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator) ) tasks.append(enable_maintenance_when_deadlocks_ready()) + # await for the results await asyncio.gather(*tasks) + + # Check for "too many clients" on all nodes + for node in cluster.nodes: + with node.cur() as cursor: + cursor.execute( + """ + SELECT count(*) AS too_many_clients_errors_count + FROM regexp_split_to_table(pg_read_file(%s), E'\n') AS t(log_line) + WHERE log_line LIKE '%%sorry, too many clients already%%';""", + (node.log_path.as_posix(),), + ) + too_many_clients_errors_count = cursor.fetchone()[0] + assert too_many_clients_errors_count == 0