mirror of https://github.com/citusdata/citus.git
Done test_multiple_databases_distributed_deadlock_detection
parent
f25b4b294c
commit
1c219fe945
|
@ -1026,6 +1026,15 @@ class Postgres(QueryRunner):
|
||||||
self.databases.add(name)
|
self.databases.add(name)
|
||||||
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
|
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
|
||||||
|
|
||||||
|
def drop_database(self, name):
|
||||||
|
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=name)
|
||||||
|
self.sql(
|
||||||
|
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
|
||||||
|
sql.Identifier(name)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.databases.remove(name)
|
||||||
|
|
||||||
def create_schema(self, name):
|
def create_schema(self, name):
|
||||||
self.schemas.add(name)
|
self.schemas.add(name)
|
||||||
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
|
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
|
||||||
|
@ -1055,11 +1064,13 @@ class Postgres(QueryRunner):
|
||||||
|
|
||||||
def cleanup_databases(self):
|
def cleanup_databases(self):
|
||||||
for database in self.databases:
|
for database in self.databases:
|
||||||
|
self.sql("DROP EXTENSION IF EXISTS citus CASCADE", dbname=database)
|
||||||
self.sql(
|
self.sql(
|
||||||
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
|
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
|
||||||
sql.Identifier(database)
|
sql.Identifier(database)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
self.databases.clear()
|
||||||
|
|
||||||
def cleanup_schemas(self):
|
def cleanup_schemas(self):
|
||||||
for schema in self.schemas:
|
for schema in self.schemas:
|
||||||
|
|
|
@ -62,7 +62,7 @@ def test_set_maindb(cluster_factory):
|
||||||
|
|
||||||
wait_until_maintenance_deamons_start(2, cluster)
|
wait_until_maintenance_deamons_start(2, cluster)
|
||||||
|
|
||||||
cluster.coordinator.sql("DROP DATABASE mymaindb;")
|
cluster.coordinator.drop_database("mymaindb")
|
||||||
|
|
||||||
wait_until_maintenance_deamons_start(1, cluster)
|
wait_until_maintenance_deamons_start(1, cluster)
|
||||||
|
|
||||||
|
|
|
@ -3,18 +3,20 @@ import asyncio
|
||||||
import pytest
|
import pytest
|
||||||
from psycopg.errors import DeadlockDetected
|
from psycopg.errors import DeadlockDetected
|
||||||
|
|
||||||
|
# For every database there is expected to be 2 queries,
|
||||||
|
# so ~80 connections will be held by deadlocks. Another 5 is expected to be used by maintenance daemon,
|
||||||
|
# leaving ~15 available
|
||||||
DATABASES_NUMBER = 40
|
DATABASES_NUMBER = 40
|
||||||
|
|
||||||
|
|
||||||
async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
||||||
|
|
||||||
# Disable maintenance on all nodes
|
# Disable maintenance on all nodes
|
||||||
for node in cluster.nodes:
|
for node in cluster.nodes:
|
||||||
node.configure(
|
node.configure(
|
||||||
"citus.recover_2pc_interval = '-1'",
|
"citus.recover_2pc_interval = '-1'",
|
||||||
"citus.distributed_deadlock_detection_factor = '-1'",
|
"citus.distributed_deadlock_detection_factor = '-1'",
|
||||||
"citus.max_maintenance_shared_pool_size = 5",
|
"citus.max_maintenance_shared_pool_size = 5",
|
||||||
# "log_min_messages = 'debug4'",
|
|
||||||
# "citus.main_db='postgres'"
|
|
||||||
)
|
)
|
||||||
node.restart()
|
node.restart()
|
||||||
|
|
||||||
|
@ -42,9 +44,7 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
print("Setup is done")
|
async def create_deadlock(db_name, run_on_coordinator):
|
||||||
|
|
||||||
async def test_deadlock(db_name, run_on_coordinator):
|
|
||||||
"""Function to prepare a deadlock query in a given database"""
|
"""Function to prepare a deadlock query in a given database"""
|
||||||
# Init connections and store for later commits
|
# Init connections and store for later commits
|
||||||
if run_on_coordinator:
|
if run_on_coordinator:
|
||||||
|
@ -89,11 +89,13 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
||||||
|
|
||||||
await asyncio.wait_for(run_deadlocked_queries(), 300)
|
await asyncio.wait_for(run_deadlocked_queries(), 300)
|
||||||
|
|
||||||
|
await first_connection.commit()
|
||||||
|
await second_connection.commit()
|
||||||
|
|
||||||
async def enable_maintenance_when_deadlocks_ready():
|
async def enable_maintenance_when_deadlocks_ready():
|
||||||
"""Function to enable maintenance daemons, when all the expected deadlock queries are ready"""
|
"""Function to enable maintenance daemons, when all the expected deadlock queries are ready"""
|
||||||
# Let deadlocks commence
|
# Let deadlocks commence
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
# cluster.debug()
|
|
||||||
# Check that queries are deadlocked
|
# Check that queries are deadlocked
|
||||||
databases_with_deadlock = set()
|
databases_with_deadlock = set()
|
||||||
while len(databases_with_deadlock) < DATABASES_NUMBER:
|
while len(databases_with_deadlock) < DATABASES_NUMBER:
|
||||||
|
@ -111,11 +113,7 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
||||||
)
|
)
|
||||||
queries_deadlocked = await cursor.fetchone()
|
queries_deadlocked = await cursor.fetchone()
|
||||||
if queries_deadlocked[0]:
|
if queries_deadlocked[0]:
|
||||||
print(f"Queries are deadlocked on {db_name}")
|
|
||||||
databases_with_deadlock.add(db_name)
|
databases_with_deadlock.add(db_name)
|
||||||
|
|
||||||
print("Queries on all databases are deadlocked, enabling maintenance")
|
|
||||||
|
|
||||||
# Enable maintenance back
|
# Enable maintenance back
|
||||||
for node in cluster.nodes:
|
for node in cluster.nodes:
|
||||||
node.reset_configuration(
|
node.reset_configuration(
|
||||||
|
@ -124,13 +122,28 @@ async def test_multiple_databases_distributed_deadlock_detection(cluster):
|
||||||
)
|
)
|
||||||
node.reload()
|
node.reload()
|
||||||
|
|
||||||
|
# Distribute deadlocked queries among all nodes in the cluster
|
||||||
tasks = list()
|
tasks = list()
|
||||||
for idx, db_name in enumerate(db_names):
|
for idx, db_name in enumerate(db_names):
|
||||||
run_on_coordinator = True if idx % 3 == 0 else False
|
run_on_coordinator = True if idx % 3 == 0 else False
|
||||||
tasks.append(
|
tasks.append(
|
||||||
test_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)
|
create_deadlock(db_name=db_name, run_on_coordinator=run_on_coordinator)
|
||||||
)
|
)
|
||||||
|
|
||||||
tasks.append(enable_maintenance_when_deadlocks_ready())
|
tasks.append(enable_maintenance_when_deadlocks_ready())
|
||||||
|
|
||||||
|
# await for the results
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
# Check for "too many clients" on all nodes
|
||||||
|
for node in cluster.nodes:
|
||||||
|
with node.cur() as cursor:
|
||||||
|
cursor.execute(
|
||||||
|
"""
|
||||||
|
SELECT count(*) AS too_many_clients_errors_count
|
||||||
|
FROM regexp_split_to_table(pg_read_file(%s), E'\n') AS t(log_line)
|
||||||
|
WHERE log_line LIKE '%%sorry, too many clients already%%';""",
|
||||||
|
(node.log_path.as_posix(),),
|
||||||
|
)
|
||||||
|
too_many_clients_errors_count = cursor.fetchone()[0]
|
||||||
|
assert too_many_clients_errors_count == 0
|
||||||
|
|
Loading…
Reference in New Issue