mirror of https://github.com/citusdata/citus.git
Add some small improvements to python testing framework (#7159)
1. Adds an `sql_row` function, for when a query returns a single row with multiple columns. 2. Include a `notice_handler` for easier debugging 3. Retry dropping replication slots when they are "in use", this is often an ephemeral state and can cause flaky testspull/6622/merge
parent
e94bf93152
commit
bdf085eabb
|
@ -429,6 +429,10 @@ PORT_UPPER_BOUND = 32768
|
||||||
next_port = PORT_LOWER_BOUND
|
next_port = PORT_LOWER_BOUND
|
||||||
|
|
||||||
|
|
||||||
|
def notice_handler(diag: psycopg.errors.Diagnostic):
|
||||||
|
print(f"{diag.severity}: {diag.message_primary}")
|
||||||
|
|
||||||
|
|
||||||
def cleanup_test_leftovers(nodes):
|
def cleanup_test_leftovers(nodes):
|
||||||
"""
|
"""
|
||||||
Cleaning up test leftovers needs to be done in a specific order, because
|
Cleaning up test leftovers needs to be done in a specific order, because
|
||||||
|
@ -444,7 +448,7 @@ def cleanup_test_leftovers(nodes):
|
||||||
node.cleanup_publications()
|
node.cleanup_publications()
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.cleanup_logical_replication_slots()
|
node.cleanup_replication_slots()
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.cleanup_schemas()
|
node.cleanup_schemas()
|
||||||
|
@ -526,10 +530,12 @@ class QueryRunner(ABC):
|
||||||
def conn(self, *, autocommit=True, **kwargs):
|
def conn(self, *, autocommit=True, **kwargs):
|
||||||
"""Open a psycopg connection to this server"""
|
"""Open a psycopg connection to this server"""
|
||||||
self.set_default_connection_options(kwargs)
|
self.set_default_connection_options(kwargs)
|
||||||
return psycopg.connect(
|
conn = psycopg.connect(
|
||||||
autocommit=autocommit,
|
autocommit=autocommit,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
conn.add_notice_handler(notice_handler)
|
||||||
|
return conn
|
||||||
|
|
||||||
def aconn(self, *, autocommit=True, **kwargs):
|
def aconn(self, *, autocommit=True, **kwargs):
|
||||||
"""Open an asynchronous psycopg connection to this server"""
|
"""Open an asynchronous psycopg connection to this server"""
|
||||||
|
@ -572,6 +578,21 @@ class QueryRunner(ABC):
|
||||||
with self.cur(**kwargs) as cur:
|
with self.cur(**kwargs) as cur:
|
||||||
cur.execute(query, params=params)
|
cur.execute(query, params=params)
|
||||||
|
|
||||||
|
def sql_row(self, query, params=None, allow_empty_result=False, **kwargs):
|
||||||
|
"""Run an SQL query that returns a single row and returns this row
|
||||||
|
|
||||||
|
This opens a new connection and closes it once the query is done
|
||||||
|
"""
|
||||||
|
with self.cur(**kwargs) as cur:
|
||||||
|
cur.execute(query, params=params)
|
||||||
|
result = cur.fetchall()
|
||||||
|
|
||||||
|
if allow_empty_result and len(result) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
assert len(result) == 1, "sql_row returns more than one row"
|
||||||
|
return result[0]
|
||||||
|
|
||||||
def sql_value(self, query, params=None, allow_empty_result=False, **kwargs):
|
def sql_value(self, query, params=None, allow_empty_result=False, **kwargs):
|
||||||
"""Run an SQL query that returns a single cell and return this value
|
"""Run an SQL query that returns a single cell and return this value
|
||||||
|
|
||||||
|
@ -731,7 +752,7 @@ class Postgres(QueryRunner):
|
||||||
# Used to track objects that we want to clean up at the end of a test
|
# Used to track objects that we want to clean up at the end of a test
|
||||||
self.subscriptions = set()
|
self.subscriptions = set()
|
||||||
self.publications = set()
|
self.publications = set()
|
||||||
self.logical_replication_slots = set()
|
self.replication_slots = set()
|
||||||
self.schemas = set()
|
self.schemas = set()
|
||||||
self.users = set()
|
self.users = set()
|
||||||
|
|
||||||
|
@ -983,7 +1004,7 @@ class Postgres(QueryRunner):
|
||||||
def create_logical_replication_slot(
|
def create_logical_replication_slot(
|
||||||
self, name, plugin, temporary=False, twophase=False
|
self, name, plugin, temporary=False, twophase=False
|
||||||
):
|
):
|
||||||
self.logical_replication_slots.add(name)
|
self.replication_slots.add(name)
|
||||||
self.sql(
|
self.sql(
|
||||||
"SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)",
|
"SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)",
|
||||||
(name, plugin, temporary, twophase),
|
(name, plugin, temporary, twophase),
|
||||||
|
@ -1015,12 +1036,21 @@ class Postgres(QueryRunner):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
def cleanup_logical_replication_slots(self):
|
def cleanup_replication_slots(self):
|
||||||
for slot in self.logical_replication_slots:
|
for slot in self.replication_slots:
|
||||||
|
start = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
self.sql(
|
self.sql(
|
||||||
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s",
|
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s",
|
||||||
(slot,),
|
(slot,),
|
||||||
)
|
)
|
||||||
|
except psycopg.errors.ObjectInUse:
|
||||||
|
if time.time() < start + 10:
|
||||||
|
time.sleep(0.5)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
break
|
||||||
|
|
||||||
def cleanup_subscriptions(self):
|
def cleanup_subscriptions(self):
|
||||||
for subscription in self.subscriptions:
|
for subscription in self.subscriptions:
|
||||||
|
|
Loading…
Reference in New Issue