diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 66ff044d2..907102482 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -429,6 +429,10 @@ PORT_UPPER_BOUND = 32768 next_port = PORT_LOWER_BOUND +def notice_handler(diag: psycopg.errors.Diagnostic): + print(f"{diag.severity}: {diag.message_primary}") + + def cleanup_test_leftovers(nodes): """ Cleaning up test leftovers needs to be done in a specific order, because @@ -444,7 +448,7 @@ def cleanup_test_leftovers(nodes): node.cleanup_publications() for node in nodes: - node.cleanup_logical_replication_slots() + node.cleanup_replication_slots() for node in nodes: node.cleanup_schemas() @@ -526,10 +530,12 @@ class QueryRunner(ABC): def conn(self, *, autocommit=True, **kwargs): """Open a psycopg connection to this server""" self.set_default_connection_options(kwargs) - return psycopg.connect( + conn = psycopg.connect( autocommit=autocommit, **kwargs, ) + conn.add_notice_handler(notice_handler) + return conn def aconn(self, *, autocommit=True, **kwargs): """Open an asynchronous psycopg connection to this server""" @@ -572,6 +578,21 @@ class QueryRunner(ABC): with self.cur(**kwargs) as cur: cur.execute(query, params=params) + def sql_row(self, query, params=None, allow_empty_result=False, **kwargs): + """Run an SQL query that returns a single row and returns this row + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params) + result = cur.fetchall() + + if allow_empty_result and len(result) == 0: + return None + + assert len(result) == 1, "sql_row returns more than one row" + return result[0] + def sql_value(self, query, params=None, allow_empty_result=False, **kwargs): """Run an SQL query that returns a single cell and return this value @@ -731,7 +752,7 @@ class Postgres(QueryRunner): # Used to track objects that we want to clean up at the end of a test self.subscriptions = set() self.publications = set() - self.logical_replication_slots = set() + self.replication_slots = set() self.schemas = set() self.users = set() @@ -983,7 +1004,7 @@ class Postgres(QueryRunner): def create_logical_replication_slot( self, name, plugin, temporary=False, twophase=False ): - self.logical_replication_slots.add(name) + self.replication_slots.add(name) self.sql( "SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)", (name, plugin, temporary, twophase), @@ -1015,12 +1036,21 @@ class Postgres(QueryRunner): ) ) - def cleanup_logical_replication_slots(self): - for slot in self.logical_replication_slots: - self.sql( - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", - (slot,), - ) + def cleanup_replication_slots(self): + for slot in self.replication_slots: + start = time.time() + while True: + try: + self.sql( + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", + (slot,), + ) + except psycopg.errors.ObjectInUse: + if time.time() < start + 10: + time.sleep(0.5) + continue + raise + break def cleanup_subscriptions(self): for subscription in self.subscriptions: