From bdf085eabb217c0f7c202a5d1474d9d27bbdf81c Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 5 Sep 2023 09:34:56 +0200 Subject: [PATCH 1/9] Add some small improvements to python testing framework (#7159) 1. Adds an `sql_row` function, for when a query returns a single row with multiple columns. 2. Include a `notice_handler` for easier debugging 3. Retry dropping replication slots when they are "in use", this is often an ephemeral state and can cause flaky tests --- src/test/regress/citus_tests/common.py | 50 ++++++++++++++++++++------ 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 66ff044d2..907102482 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -429,6 +429,10 @@ PORT_UPPER_BOUND = 32768 next_port = PORT_LOWER_BOUND +def notice_handler(diag: psycopg.errors.Diagnostic): + print(f"{diag.severity}: {diag.message_primary}") + + def cleanup_test_leftovers(nodes): """ Cleaning up test leftovers needs to be done in a specific order, because @@ -444,7 +448,7 @@ def cleanup_test_leftovers(nodes): node.cleanup_publications() for node in nodes: - node.cleanup_logical_replication_slots() + node.cleanup_replication_slots() for node in nodes: node.cleanup_schemas() @@ -526,10 +530,12 @@ class QueryRunner(ABC): def conn(self, *, autocommit=True, **kwargs): """Open a psycopg connection to this server""" self.set_default_connection_options(kwargs) - return psycopg.connect( + conn = psycopg.connect( autocommit=autocommit, **kwargs, ) + conn.add_notice_handler(notice_handler) + return conn def aconn(self, *, autocommit=True, **kwargs): """Open an asynchronous psycopg connection to this server""" @@ -572,6 +578,21 @@ class QueryRunner(ABC): with self.cur(**kwargs) as cur: cur.execute(query, params=params) + def sql_row(self, query, params=None, allow_empty_result=False, **kwargs): + """Run an SQL query that returns a single row and returns this row + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params) + result = cur.fetchall() + + if allow_empty_result and len(result) == 0: + return None + + assert len(result) == 1, "sql_row returns more than one row" + return result[0] + def sql_value(self, query, params=None, allow_empty_result=False, **kwargs): """Run an SQL query that returns a single cell and return this value @@ -731,7 +752,7 @@ class Postgres(QueryRunner): # Used to track objects that we want to clean up at the end of a test self.subscriptions = set() self.publications = set() - self.logical_replication_slots = set() + self.replication_slots = set() self.schemas = set() self.users = set() @@ -983,7 +1004,7 @@ class Postgres(QueryRunner): def create_logical_replication_slot( self, name, plugin, temporary=False, twophase=False ): - self.logical_replication_slots.add(name) + self.replication_slots.add(name) self.sql( "SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)", (name, plugin, temporary, twophase), @@ -1015,12 +1036,21 @@ class Postgres(QueryRunner): ) ) - def cleanup_logical_replication_slots(self): - for slot in self.logical_replication_slots: - self.sql( - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", - (slot,), - ) + def cleanup_replication_slots(self): + for slot in self.replication_slots: + start = time.time() + while True: + try: + self.sql( + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", + (slot,), + ) + except psycopg.errors.ObjectInUse: + if time.time() < start + 10: + time.sleep(0.5) + continue + raise + break def cleanup_subscriptions(self): for subscription in self.subscriptions: From c22547d221af72e2764731f8629d37409aa4d905 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Wed, 17 May 2023 16:19:56 +0300 Subject: [PATCH 2/9] Create a new colocation properly after braking one When braking a colocation, we need to create a new colocation group record in pg_dist_colocation for the relation. It is not sufficient to have a new colocationid value in pg_dist_partition only. This patch also fixes a bug when deleting a colocation group if no tables are left in it. Previously we passed a relation id as a parameter to DeleteColocationGroupIfNoTablesBelong function, where we should have passed a colocation id. --- src/backend/distributed/utils/colocation_utils.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 7042ebe7e..e7007874b 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -175,12 +175,11 @@ BreakColocation(Oid sourceRelationId) */ Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock); - uint32 newColocationId = GetNextColocationId(); - bool localOnly = false; - UpdateRelationColocationGroup(sourceRelationId, newColocationId, localOnly); + uint32 oldColocationId = TableColocationId(sourceRelationId); + CreateColocationGroupForRelation(sourceRelationId); - /* if there is not any remaining table in the colocation group, delete it */ - DeleteColocationGroupIfNoTablesBelong(sourceRelationId); + /* if there is not any remaining table in the old colocation group, delete it */ + DeleteColocationGroupIfNoTablesBelong(oldColocationId); table_close(pgDistColocation, NoLock); } From a849570f3fe95f9ad2af1324e4aa64d7a0afd20c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emel=20=C5=9Eim=C5=9Fek?= Date: Tue, 5 Sep 2023 13:29:35 +0300 Subject: [PATCH 3/9] Improve the performance of CitusHasBeenLoaded function for a database that does not do CREATE EXTENSION citus but load citus.so. (#7123) For a database that does not create the citus extension by running ` CREATE EXTENSION citus;` `CitusHasBeenLoaded ` function ends up querying the `pg_extension` table every time it is invoked. This is not an ideal situation for a such a database. The idea in this PR is as follows: ### A new field in MetadataCache. Add a new variable `extensionCreatedState `of the following type: ``` typedef enum ExtensionCreatedState { UNKNOWN = 0, CREATED = 1, NOTCREATED = 2, } ExtensionCreatedState; ``` When the MetadataCache is invalidated, `ExtensionCreatedState` will be set to UNKNOWN. ### Invalidate MetadataCache when CREATE/DROP/ALTER EXTENSION citus commands are run. - Register a callback function, named `InvalidateDistRelationCacheCallback`, for relcache invalidation during the shared library initialization for `citus.so`. This callback function is invoked in all the backends whenever the relcache is invalidated in one of the backends. (This could be caused many DDLs operations). - In the cache invalidation callback,` InvalidateDistRelationCacheCallback`, invalidate `MetadataCache` zeroing it out. - In `CitusHasBeenLoaded`, perform the costly citus is loaded check only if the `MetadataCache` is not valid. ### Downsides Any relcache invalidation (caused by various DDL operations) will case Citus MetadataCache to get invalidated. Most of the time it will be unnecessary. But we rely on that DDL operations on relations will not be too frequent. --- .../distributed/commands/utility_hook.c | 23 ++-- .../distributed/metadata/metadata_cache.c | 120 ++++++++++-------- src/backend/distributed/shared_library_init.c | 5 + src/include/distributed/metadata_cache.h | 2 + .../citus_tests/test/test_extension.py | 44 +++++++ 5 files changed, 134 insertions(+), 60 deletions(-) create mode 100644 src/test/regress/citus_tests/test/test_extension.py diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 888b3dfed..1945218b6 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -77,6 +77,7 @@ #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -193,6 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt( parsetree); + if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt) { ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); @@ -207,6 +209,18 @@ multi_ProcessUtility(PlannedStmt *pstmt, PreprocessCreateExtensionStmtForCitusColumnar(parsetree); } + if (isCreateAlterExtensionUpdateCitusStmt || IsDropCitusExtensionStmt(parsetree)) + { + /* + * Citus maintains a higher level cache. We use the cache invalidation mechanism + * of Postgres to achieve cache coherency between backends. Any change to citus + * extension should be made known to other backends. We do this by invalidating the + * relcache and therefore invoking the citus registered callback that invalidates + * the citus cache in other backends. + */ + CacheInvalidateRelcacheAll(); + } + /* * Make sure that on DROP DATABASE we terminate the background daemon * associated with it. @@ -926,15 +940,6 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } } } - - if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt)) - { - /* - * Ensure value is valid, we can't do some checks during CREATE - * EXTENSION. This is important to register some invalidation callbacks. - */ - CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */ - } } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 5ccd4a512..55d0f11c5 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -133,6 +133,19 @@ typedef struct ShardIdCacheEntry int shardIndex; } ShardIdCacheEntry; +/* + * ExtensionCreatedState is used to track if citus extension has been created + * using CREATE EXTENSION command. + * UNKNOWN : MetadataCache is invalid. State is UNKNOWN. + * CREATED : Citus is created. + * NOTCREATED : Citus is not created. + */ +typedef enum ExtensionCreatedState +{ + UNKNOWN = 0, + CREATED = 1, + NOTCREATED = 2, +} ExtensionCreatedState; /* * State which should be cleared upon DROP EXTENSION. When the configuration @@ -140,7 +153,7 @@ typedef struct ShardIdCacheEntry */ typedef struct MetadataCacheData { - bool extensionLoaded; + ExtensionCreatedState extensionCreatedState; Oid distShardRelationId; Oid distPlacementRelationId; Oid distBackgroundJobRelationId; @@ -288,7 +301,6 @@ static void CreateDistTableCache(void); static void CreateShardIdCache(void); static void CreateDistObjectCache(void); static void InvalidateForeignRelationGraphCacheCallback(Datum argument, Oid relationId); -static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateLocalGroupIdRelationCacheCallback(Datum argument, Oid relationId); static void InvalidateConnParamsCacheCallback(Datum argument, Oid relationId); @@ -2187,16 +2199,30 @@ HasOverlappingShardInterval(ShardInterval **shardIntervalArray, bool CitusHasBeenLoaded(void) { - if (!MetadataCache.extensionLoaded || creating_extension) + /* + * We do not use Citus hooks during CREATE/ALTER EXTENSION citus + * since the objects used by the C code might be not be there yet. + */ + if (creating_extension) { - /* - * Refresh if we have not determined whether the extension has been - * loaded yet, or in case of ALTER EXTENSION since we want to treat - * Citus as "not loaded" during ALTER EXTENSION citus. - */ - bool extensionLoaded = CitusHasBeenLoadedInternal(); + Oid citusExtensionOid = get_extension_oid("citus", true); - if (extensionLoaded && !MetadataCache.extensionLoaded) + if (CurrentExtensionObject == citusExtensionOid) + { + return false; + } + } + + /* + * If extensionCreatedState is UNKNOWN, query pg_extension for Citus + * and cache the result. Otherwise return the value extensionCreatedState + * indicates. + */ + if (MetadataCache.extensionCreatedState == UNKNOWN) + { + bool extensionCreated = CitusHasBeenLoadedInternal(); + + if (extensionCreated) { /* * Loaded Citus for the first time in this session, or first time after @@ -2208,31 +2234,22 @@ CitusHasBeenLoaded(void) */ StartupCitusBackend(); - /* - * InvalidateDistRelationCacheCallback resets state such as extensionLoaded - * when it notices changes to pg_dist_partition (which usually indicate - * `DROP EXTENSION citus;` has been run) - * - * Ensure InvalidateDistRelationCacheCallback will notice those changes - * by caching pg_dist_partition's oid. - * - * We skip these checks during upgrade since pg_dist_partition is not - * present during early stages of upgrade operation. - */ - DistPartitionRelationId(); - /* * This needs to be initialized so we can receive foreign relation graph * invalidation messages in InvalidateForeignRelationGraphCacheCallback(). * See the comments of InvalidateForeignKeyGraph for more context. */ DistColocationRelationId(); - } - MetadataCache.extensionLoaded = extensionLoaded; + MetadataCache.extensionCreatedState = CREATED; + } + else + { + MetadataCache.extensionCreatedState = NOTCREATED; + } } - return MetadataCache.extensionLoaded; + return (MetadataCache.extensionCreatedState == CREATED) ? true : false; } @@ -2257,15 +2274,6 @@ CitusHasBeenLoadedInternal(void) return false; } - if (creating_extension && CurrentExtensionObject == citusExtensionOid) - { - /* - * We do not use Citus hooks during CREATE/ALTER EXTENSION citus - * since the objects used by the C code might be not be there yet. - */ - return false; - } - /* citus extension exists and has been created */ return true; } @@ -4201,10 +4209,6 @@ InitializeDistCache(void) CreateShardIdCache(); InitializeDistObjectCache(); - - /* Watch for invalidation events. */ - CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, - (Datum) 0); } @@ -4754,7 +4758,7 @@ InvalidateForeignKeyGraph(void) * InvalidateDistRelationCacheCallback flushes cache entries when a relation * is updated (or flushes the entire cache). */ -static void +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { /* invalidate either entire cache or a specific entry */ @@ -4762,12 +4766,18 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) { InvalidateDistTableCache(); InvalidateDistObjectCache(); + InvalidateMetadataSystemCache(); } else { void *hashKey = (void *) &relationId; bool foundInCache = false; + if (DistTableCacheHash == NULL) + { + return; + } + CitusTableCacheEntrySlot *cacheSlot = hash_search(DistTableCacheHash, hashKey, HASH_FIND, &foundInCache); if (foundInCache) @@ -4776,21 +4786,19 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) } /* - * If pg_dist_partition is being invalidated drop all state - * This happens pretty rarely, but most importantly happens during - * DROP EXTENSION citus; This isn't the only time when this happens - * though, it can happen for multiple other reasons, such as an - * autovacuum running ANALYZE on pg_dist_partition. Such an ANALYZE - * wouldn't really need a full Metadata cache invalidation, but we - * don't know how to differentiate between DROP EXTENSION and ANALYZE. - * So for now we simply drop it in both cases and take the slight - * temporary performance hit. + * if pg_dist_partition relcache is invalidated for some reason, + * invalidate the MetadataCache. It is likely an overkill to invalidate + * the entire cache here. But until a better fix, we keep it this way + * for postgres regression tests that includes + * REINDEX SCHEMA CONCURRENTLY pg_catalog + * command. */ if (relationId == MetadataCache.distPartitionRelationId) { InvalidateMetadataSystemCache(); } + if (relationId == MetadataCache.distObjectRelationId) { InvalidateDistObjectCache(); @@ -4830,6 +4838,11 @@ InvalidateDistTableCache(void) CitusTableCacheEntrySlot *cacheSlot = NULL; HASH_SEQ_STATUS status; + if (DistTableCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistTableCacheHash); while ((cacheSlot = (CitusTableCacheEntrySlot *) hash_seq_search(&status)) != NULL) @@ -4848,6 +4861,11 @@ InvalidateDistObjectCache(void) DistObjectCacheEntry *cacheEntry = NULL; HASH_SEQ_STATUS status; + if (DistObjectCacheHash == NULL) + { + return; + } + hash_seq_init(&status, DistObjectCacheHash); while ((cacheEntry = (DistObjectCacheEntry *) hash_seq_search(&status)) != NULL) @@ -4930,8 +4948,8 @@ CreateDistObjectCache(void) /* - * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag, - * and invalidates the worker node, ConnParams, and local group ID caches. + * InvalidateMetadataSystemCache resets all the cached OIDs and the extensionCreatedState + * flag and invalidates the worker node, ConnParams, and local group ID caches. */ void InvalidateMetadataSystemCache(void) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1f4cee037..e5d593295 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -109,6 +109,8 @@ #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -554,6 +556,9 @@ _PG_init(void) "ColumnarSupportsIndexAM", true, &handle); + CacheRegisterRelcacheCallback(InvalidateDistRelationCacheCallback, + (Datum) 0); + INIT_COLUMNAR_SYMBOL(CompressionTypeStr_type, CompressionTypeStr); INIT_COLUMNAR_SYMBOL(IsColumnarTableAmTable_type, IsColumnarTableAmTable); INIT_COLUMNAR_SYMBOL(ReadColumnarOptions_type, ReadColumnarOptions); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 4e918ecf7..34b95b859 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -137,6 +137,8 @@ typedef enum ANY_CITUS_TABLE_TYPE } CitusTableType; +void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId); + extern List * AllCitusTableIds(void); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern CitusTableType GetCitusTableType(CitusTableCacheEntry *tableEntry); diff --git a/src/test/regress/citus_tests/test/test_extension.py b/src/test/regress/citus_tests/test/test_extension.py new file mode 100644 index 000000000..e9b90f115 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_extension.py @@ -0,0 +1,44 @@ +import psycopg +import pytest + + +def test_create_drop_citus(coord): + with coord.cur() as cur1: + with coord.cur() as cur2: + # Conn1 drops the extension + # and Conn2 cannot use it. + cur1.execute("DROP EXTENSION citus") + + with pytest.raises(psycopg.errors.UndefinedFunction): + # Conn1 dropped the extension. citus_version udf + # cannot be found.sycopg.errors.UndefinedFunction + # is expected here. + cur2.execute("SELECT citus_version();") + + # Conn2 creates the extension, + # Conn1 is able to use it immediadtely. + cur2.execute("CREATE EXTENSION citus") + cur1.execute("SELECT citus_version();") + cur1.execute("DROP EXTENSION citus;") + + with coord.cur() as cur1: + with coord.cur() as cur2: + # A connection is able to create and use the extension + # within a transaction block. + cur1.execute("BEGIN;") + cur1.execute("CREATE TABLE t1(id int);") + cur1.execute("CREATE EXTENSION citus;") + cur1.execute("SELECT create_reference_table('t1')") + cur1.execute("ABORT;") + + # Conn1 aborted so Conn2 is be able to create and + # use the extension within a transaction block. + cur2.execute("BEGIN;") + cur2.execute("CREATE TABLE t1(id int);") + cur2.execute("CREATE EXTENSION citus;") + cur2.execute("SELECT create_reference_table('t1')") + cur2.execute("COMMIT;") + + # Conn2 commited so Conn1 is be able to use the + # extension immediately. + cur1.execute("SELECT citus_version();") From 9f067731c0d298cc394a3928b4add5b1eef1e54e Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Tue, 5 Sep 2023 14:32:41 +0300 Subject: [PATCH 4/9] Adds PostgreSQL 16 RC1 support (#7173) --- .circleci/config.yml | 6 +- .../isolation_shard_rebalancer_progress.out | 124 +++++++++--------- .../isolation_shard_rebalancer_progress.spec | 4 +- 3 files changed, 67 insertions(+), 67 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index d269739f2..a1c0f1553 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ orbs: parameters: image_suffix: type: string - default: '-v0c8d80c' + default: '-v641cdcd' pg14_version: type: string default: '14.9' @@ -15,10 +15,10 @@ parameters: default: '15.4' pg16_version: type: string - default: '16beta3' + default: '16rc1' upgrade_pg_versions: type: string - default: '14.9-15.4-16beta3' + default: '14.9-15.4-16rc1' style_checker_tools_version: type: string default: '0.8.18' diff --git a/src/test/regress/expected/isolation_shard_rebalancer_progress.out b/src/test/regress/expected/isolation_shard_rebalancer_progress.out index 8553a1d4d..90c78ca62 100644 --- a/src/test/regress/expected/isolation_shard_rebalancer_progress.out +++ b/src/test/regress/expected/isolation_shard_rebalancer_progress.out @@ -19,7 +19,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -40,8 +40,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -63,7 +63,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -102,7 +102,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -123,8 +123,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 2|move |t |t |f |Completed -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 2|move |t |t |f |Completed +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 2|move |t |t |f |Completed +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 2|move |t |t |f |Completed colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 1|move |t |t |f |Setting Up colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 1|move |t |t |f |Setting Up (4 rows) @@ -141,7 +141,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -184,7 +184,7 @@ step s1-rebalance-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -205,8 +205,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -228,7 +228,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -271,7 +271,7 @@ step s1-rebalance-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -292,8 +292,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -315,7 +315,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -358,7 +358,7 @@ step s1-rebalance-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -379,8 +379,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup colocated1|1500002| 200000|localhost | 57637| 200000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet colocated2|1500006| 8000|localhost | 57637| 8000|localhost | 57638| 0| 0|move |t |t |f |Not Started Yet (4 rows) @@ -402,7 +402,7 @@ rebalance_table_shards step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -445,7 +445,7 @@ step s1-shard-move-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -466,8 +466,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Copying Data (2 rows) step s5-release-advisory-lock: @@ -487,7 +487,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -530,7 +530,7 @@ step s1-shard-move-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -551,8 +551,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f |Copying Data (2 rows) step s6-release-advisory-lock: @@ -572,7 +572,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -616,7 +616,7 @@ step s1-shard-copy-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -637,8 +637,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Copying Data (2 rows) step s5-release-advisory-lock: @@ -658,7 +658,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -702,7 +702,7 @@ step s1-shard-copy-c1-block-writes: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -723,8 +723,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |f |Copying Data -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |f |Copying Data +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |f |Copying Data +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |f |Copying Data (2 rows) step s6-release-advisory-lock: @@ -744,7 +744,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -787,7 +787,7 @@ step s1-shard-move-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -808,8 +808,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f |Setting Up (2 rows) step s5-release-advisory-lock: @@ -829,7 +829,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -872,7 +872,7 @@ step s1-shard-move-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -893,8 +893,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |t |Final Catchup (2 rows) step s6-release-advisory-lock: @@ -914,7 +914,7 @@ citus_move_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -958,7 +958,7 @@ step s1-shard-copy-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -979,8 +979,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|copy |t |t |f |Setting Up (2 rows) step s5-release-advisory-lock: @@ -1000,7 +1000,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1044,7 +1044,7 @@ step s1-shard-copy-c1-online: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1065,8 +1065,8 @@ step s7-get-progress: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available|status --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|copy |t |t |t |Final Catchup -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|copy |t |t |t |Final Catchup +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|copy |t |t |t |Final Catchup +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|copy |t |t |t |Final Catchup (2 rows) step s6-release-advisory-lock: @@ -1086,7 +1086,7 @@ citus_copy_shard_placement step s1-wait: step s7-get-progress: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1132,7 +1132,7 @@ step s4-shard-move-sep-block-writes: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1153,9 +1153,9 @@ step s7-get-progress-ordered: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 8000| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 8000| 1|move |t |t |f +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 8000| 1|move |t |t |f +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 8000| 1|move |t |t |f +separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 8000| 1|move |t |t |f (3 rows) step s5-release-advisory-lock: @@ -1182,7 +1182,7 @@ step s1-wait: step s4-wait: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1228,7 +1228,7 @@ step s4-shard-move-sep-block-writes: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -1249,9 +1249,9 @@ step s7-get-progress-ordered: table_name|shardid|shard_size|sourcename|sourceport|source_shard_size|targetname|targetport|target_shard_size|progress|operation_type|lsn_sanity_check|source_lsn_available|target_lsn_available --------------------------------------------------------------------- -colocated1|1500001| 50000|localhost | 57637| 50000|localhost | 57638| 50000| 1|move |t |t |f -colocated2|1500005| 400000|localhost | 57637| 400000|localhost | 57638| 400000| 1|move |t |t |f -separate |1500009| 50000|localhost | 57637| 50000|localhost | 57638| 200000| 1|move |t |t |f +colocated1|1500001| 40000|localhost | 57637| 40000|localhost | 57638| 40000| 1|move |t |t |f +colocated2|1500005| 480000|localhost | 57637| 480000|localhost | 57638| 480000| 1|move |t |t |f +separate |1500009| 200000|localhost | 57637| 200000|localhost | 57638| 200000| 1|move |t |t |f (3 rows) step s6-release-advisory-lock: @@ -1278,7 +1278,7 @@ step s1-wait: step s4-wait: step s7-get-progress-ordered: set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, diff --git a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec index e329e9483..234703c21 100644 --- a/src/test/regress/spec/isolation_shard_rebalancer_progress.spec +++ b/src/test/regress/spec/isolation_shard_rebalancer_progress.spec @@ -131,7 +131,7 @@ session "s7" step "s7-get-progress" { set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, @@ -157,7 +157,7 @@ step "s7-get-progress" step "s7-get-progress-ordered" { set LOCAL client_min_messages=NOTICE; - WITH possible_sizes(size) as (VALUES (0), (8000), (50000), (200000), (400000)) + WITH possible_sizes(size) as (VALUES (0), (8000), (40000), (200000), (480000)) SELECT table_name, shardid, From 8eb3360017eb19b7aa33626cc92a70398f726bf4 Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Tue, 5 Sep 2023 18:04:16 +0300 Subject: [PATCH 5/9] Fixes visibility problems with dependency propagation (#7028) **Problem:** Previously we always used an outside superuser connection to overcome permission issues for the current user while propagating dependencies. That has mainly 2 problems: 1. Visibility issues during dependency propagation, (metadata connection propagates some objects like a schema, and outside transaction does not see it and tries to create it again) 2. Security issues (it is preferrable to use current user's connection instead of extension superuser) **Solution (high level):** Now, we try to make a smarter decision on whether should we use an outside superuser connection or current user's metadata connection. We prefer using current user's connection if any of the objects, which is already propagated in the current transaction, is a dependency for a target object. We do that since we assume if current user has permissions to create the dependency, then it can most probably propagate the target as well. Our assumption is expected to hold most of the times but it can still be wrong. In those cases, transaction would fail and user should set the GUC `citus.create_object_propagation` to `deferred` to work around it. **Solution:** 1. We track all objects propagated in the current transaction (we can handle subtransactions), 2. We propagate dependencies via the current user's metadata connection if any dependency is created in the current transaction to address issues listed above. Otherwise, we still use an outside superuser connection. DESCRIPTION: Fixes some object propagation errors seen with transaction blocks. Fixes https://github.com/citusdata/citus/issues/6614 --------- Co-authored-by: Nils Dijk --- .../commands/create_distributed_table.c | 1 + .../distributed/commands/dependencies.c | 36 ++- .../distributed/commands/utility_hook.c | 1 + .../transaction/transaction_management.c | 273 ++++++++++++++++- .../transaction/worker_transaction.c | 15 + .../distributed/transaction_management.h | 8 + src/include/distributed/worker_transaction.h | 1 + src/test/regress/citus_tests/run_test.py | 1 + .../regress/expected/distributed_domain.out | 1 + .../regress/expected/multi_schema_support.out | 278 ++++++++++++++++++ src/test/regress/sql/distributed_domain.sql | 1 + src/test/regress/sql/multi_schema_support.sql | 213 ++++++++++++++ 12 files changed, 814 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3b993250f..ff02593f5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1659,6 +1659,7 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*tableAddress, RelationRelationId, relationId); EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress)); + TrackPropagatedTableAndSequences(relationId); } diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index ceec83324..977efb145 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -112,15 +112,35 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) dependency->objectSubId, ExclusiveLock); } - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - const char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), - ddlCommands); + /* + * We need to propagate dependencies via the current user's metadata connection if + * any dependency for the target is created in the current transaction. Our assumption + * is that if we rely on a dependency created in the current transaction, then the + * current user, most probably, has permissions to create the target object as well. + * Note that, user still may not be able to create the target due to no permissions + * for any of its dependencies. But this is ok since it should be rare. + * + * If we opted to use a separate superuser connection for the target, then we would + * have visibility issues since propagated dependencies would be invisible to + * the separate connection until we locally commit. + */ + if (HasAnyDependencyInPropagatedObjects(target)) + { + SendCommandListToWorkersWithMetadata(ddlCommands); + } + else + { + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + const char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), + ddlCommands); + } } /* diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 1945218b6..10e424623 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -937,6 +937,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, foreach_ptr(address, addresses) { MarkObjectDistributed(address); + TrackPropagatedObject(address); } } } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 5add48009..9a7bd9089 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -19,6 +19,8 @@ #include "access/twophase.h" #include "access/xact.h" +#include "catalog/dependency.h" +#include "common/hashfn.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" @@ -30,6 +32,7 @@ #include "distributed/local_executor.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/maintenanced.h" +#include "distributed/metadata/dependency.h" #include "distributed/multi_executor.h" #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" @@ -89,14 +92,25 @@ StringInfo activeSetStmts; * Though a list, we treat this as a stack, pushing on subxact contexts whenever * e.g. a SAVEPOINT is executed (though this is actually performed by providing * PostgreSQL with a sub-xact callback). At present, the context of a subxact - * includes a subxact identifier as well as any SET LOCAL statements propagated - * to workers during the sub-transaction. + * includes + * - a subxact identifier, + * - any SET LOCAL statements propagated to workers during the sub-transaction, + * - all objects propagated to workers during the sub-transaction. * * To be clear, last item of activeSubXactContexts list corresponds to top of * stack. */ static List *activeSubXactContexts = NIL; +/* + * PropagatedObjectsInTx is a set of objects propagated in the root transaction. + * We also keep track of objects propagated in sub-transactions in activeSubXactContexts. + * Any committed sub-transaction would cause the objects, which are propagated during + * the sub-transaction, to be moved to upper transaction's set. Objects are discarded + * when the sub-transaction is aborted. + */ +static HTAB *PropagatedObjectsInTx = NULL; + /* some pre-allocated memory so we don't need to call malloc() during callbacks */ MemoryContext CitusXactCallbackContext = NULL; @@ -142,11 +156,17 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); -static void PopSubXact(SubTransactionId subId); +static void PopSubXact(SubTransactionId subId, bool commit); static void ResetGlobalVariables(void); static bool SwallowErrors(void (*func)(void)); static void ForceAllInProgressConnectionsToClose(void); static void EnsurePrepareTransactionIsAllowed(void); +static HTAB * CurrentTransactionPropagatedObjects(bool readonly); +static HTAB * ParentTransactionPropagatedObjects(bool readonly); +static void MovePropagatedObjectsToParentTransaction(void); +static bool DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, + const ObjectAddress *dependency); +static HTAB * CreateTxPropagatedObjectsHash(void); /* @@ -321,6 +341,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* * Make sure that we give the shared connections back to the shared @@ -391,6 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* Reset any local replication origin session since transaction has been aborted.*/ ResetReplicationOriginLocalSession(); @@ -638,7 +660,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, switch (event) { /* - * Our subtransaction stack should be consistent with postgres' internal + * Our sub-transaction stack should be consistent with postgres' internal * transaction stack. In case of subxact begin, postgres calls our * callback after it has pushed the transaction into stack, so we have to * do the same even if worker commands fail, so we PushSubXact() first. @@ -672,7 +694,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRelease(subId); } - PopSubXact(subId); + PopSubXact(subId, true); /* Set CachedDuringCitusCreation to one level lower to represent citus creation is done */ @@ -706,7 +728,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRollback(subId); } - PopSubXact(subId); + PopSubXact(subId, false); /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus @@ -775,6 +797,9 @@ PushSubXact(SubTransactionId subId) state->subId = subId; state->setLocalCmds = activeSetStmts; + /* we lazily create hashset when any object is propagated during sub-transaction */ + state->propagatedObjects = NULL; + /* append to list and reset active set stmts for upcoming sub-xact */ activeSubXactContexts = lappend(activeSubXactContexts, state); activeSetStmts = makeStringInfo(); @@ -783,7 +808,7 @@ PushSubXact(SubTransactionId subId) /* PopSubXact pops subId from the stack of active sub-transactions. */ static void -PopSubXact(SubTransactionId subId) +PopSubXact(SubTransactionId subId, bool commit) { SubXactContext *state = llast(activeSubXactContexts); @@ -806,6 +831,16 @@ PopSubXact(SubTransactionId subId) */ activeSetStmts = state->setLocalCmds; + /* + * Keep subtransaction's propagated objects at parent transaction + * if subtransaction committed. Otherwise, discard them. + */ + if (commit) + { + MovePropagatedObjectsToParentTransaction(); + } + hash_destroy(state->propagatedObjects); + /* * Free state to avoid memory leaks when we create subxacts for each row, * e.g. in exception handling of UDFs. @@ -913,3 +948,227 @@ EnsurePrepareTransactionIsAllowed(void) errmsg("cannot use 2PC in transactions involving " "multiple servers"))); } + + +/* + * CurrentTransactionPropagatedObjects returns the objects propagated in current + * sub-transaction or the root transaction if no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the current sub-transaction. + */ +static HTAB * +CurrentTransactionPropagatedObjects(bool readonly) +{ + if (activeSubXactContexts == NIL) + { + /* hashset in the root transaction if there is no sub-transaction */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* hashset in top level sub-transaction */ + SubXactContext *state = llast(activeSubXactContexts); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for sub-transaction, for mutating uses */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * ParentTransactionPropagatedObjects returns the objects propagated in parent + * transaction of active sub-transaction. It returns the root transaction if + * no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the target sub-transaction. + */ +static HTAB * +ParentTransactionPropagatedObjects(bool readonly) +{ + int nestingLevel = list_length(activeSubXactContexts); + if (nestingLevel <= 1) + { + /* + * The parent is the root transaction, when there is single level sub-transaction + * or no sub-transaction. + */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* parent is upper sub-transaction */ + Assert(nestingLevel >= 2); + SubXactContext *state = list_nth(activeSubXactContexts, nestingLevel - 2); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for parent sub-transaction */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * MovePropagatedObjectsToParentTransaction moves all objects propagated in the current + * sub-transaction to the parent transaction. This should only be called when there is + * active sub-transaction. + */ +static void +MovePropagatedObjectsToParentTransaction(void) +{ + Assert(llast(activeSubXactContexts) != NULL); + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(true); + if (currentPropagatedObjects == NULL) + { + /* nothing to move */ + return; + } + + /* + * Only after we know we have objects to move into the parent do we get a handle on + * a guaranteed existing parent hash table. This makes sure that the parents only + * get populated once there are objects to be tracked. + */ + HTAB *parentPropagatedObjects = ParentTransactionPropagatedObjects(false); + + HASH_SEQ_STATUS propagatedObjectsSeq; + hash_seq_init(&propagatedObjectsSeq, currentPropagatedObjects); + ObjectAddress *objectAddress = NULL; + while ((objectAddress = hash_seq_search(&propagatedObjectsSeq)) != NULL) + { + hash_search(parentPropagatedObjects, objectAddress, HASH_ENTER, NULL); + } +} + + +/* + * DependencyInPropagatedObjectsHash checks if dependency is in given hashset + * of propagated objects. + */ +static bool +DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, const + ObjectAddress *dependency) +{ + if (propagatedObjects == NULL) + { + return false; + } + + bool found = false; + hash_search(propagatedObjects, dependency, HASH_FIND, &found); + return found; +} + + +/* + * CreateTxPropagatedObjectsHash creates a hashset to keep track of the objects + * propagated in the current root transaction or sub-transaction. + */ +static HTAB * +CreateTxPropagatedObjectsHash(void) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ObjectAddress); + info.entrysize = sizeof(ObjectAddress); + info.hash = tag_hash; + info.hcxt = CitusXactCallbackContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + return hash_create("Tx Propagated Objects", 16, &info, hashFlags); +} + + +/* + * TrackPropagatedObject adds given object into the objects propagated in the current + * sub-transaction. + */ +void +TrackPropagatedObject(const ObjectAddress *objectAddress) +{ + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(false); + hash_search(currentPropagatedObjects, objectAddress, HASH_ENTER, NULL); +} + + +/* + * TrackPropagatedTableAndSequences adds given table and its sequences to the objects + * propagated in the current sub-transaction. + */ +void +TrackPropagatedTableAndSequences(Oid relationId) +{ + /* track table */ + ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*tableAddress, RelationRelationId, relationId); + TrackPropagatedObject(tableAddress); + + /* track its sequences */ + List *ownedSeqIdList = getOwnedSequences(relationId); + Oid ownedSeqId = InvalidOid; + foreach_oid(ownedSeqId, ownedSeqIdList) + { + ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId); + TrackPropagatedObject(seqAddress); + } +} + + +/* + * ResetPropagatedObjects destroys hashset of propagated objects in the root transaction. + */ +void +ResetPropagatedObjects(void) +{ + hash_destroy(PropagatedObjectsInTx); + PropagatedObjectsInTx = NULL; +} + + +/* + * HasAnyDependencyInPropagatedObjects decides if any dependency of given object is + * propagated in the current transaction. + */ +bool +HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) +{ + List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencyList) + { + /* first search in root transaction */ + if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency)) + { + return true; + } + + /* search in all nested sub-transactions */ + if (activeSubXactContexts == NIL) + { + continue; + } + SubXactContext *state = NULL; + foreach_ptr(state, activeSubXactContexts) + { + if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency)) + { + return true; + } + } + } + + return false; +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a9a855fb1..03ecbea72 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -135,6 +135,21 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) } +/* + * SendCommandListToWorkersWithMetadata sends all commands to all metadata workers + * with the current user. See `SendCommandToWorkersWithMetadata`for details. + */ +void +SendCommandListToWorkersWithMetadata(List *commands) +{ + char *command = NULL; + foreach_ptr(command, commands) + { + SendCommandToWorkersWithMetadata(command); + } +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index e2d35048a..ca4e632a9 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -10,11 +10,13 @@ #define TRANSACTION_MANAGMENT_H #include "access/xact.h" +#include "catalog/objectaddress.h" #include "lib/ilist.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "lib/stringinfo.h" #include "nodes/primnodes.h" +#include "utils/hsearch.h" /* forward declare, to avoid recursive includes */ struct DistObjectCacheEntry; @@ -58,6 +60,7 @@ typedef struct SubXactContext { SubTransactionId subId; StringInfo setLocalCmds; + HTAB *propagatedObjects; } SubXactContext; /* @@ -157,6 +160,11 @@ extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); extern bool MaybeExecutingUDF(void); +/* functions for tracking the objects propagated in current transaction */ +extern void TrackPropagatedObject(const ObjectAddress *objectAddress); +extern void TrackPropagatedTableAndSequences(Oid relationId); +extern void ResetPropagatedObjects(void); +extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress); /* initialization function(s) */ extern void InitializeTransactionManagement(void); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index be8fe5ed6..631940edf 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -73,6 +73,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); +extern void SendCommandListToWorkersWithMetadata(List *commands); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 731b1a908..2b71f5e1b 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -168,6 +168,7 @@ DEPS = { ], ), "grant_on_schema_propagation": TestDeps("minimal_schedule"), + "propagate_extension_commands": TestDeps("minimal_schedule"), } diff --git a/src/test/regress/expected/distributed_domain.out b/src/test/regress/expected/distributed_domain.out index 5043d4f05..30e388803 100644 --- a/src/test/regress/expected/distributed_domain.out +++ b/src/test/regress/expected/distributed_domain.out @@ -947,3 +947,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; NOTICE: type "domain_does_not_exist" does not exist, skipping SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index dcb87486d..2de95266b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1393,6 +1393,284 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq +drop cascades to table sc1.s1 +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq1 +drop cascades to table sc1.s1 +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); + create_schema_test +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_1.test +drop cascades to table test_1.test_1197064 -- Clean up the created schema SET client_min_messages TO WARNING; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object diff --git a/src/test/regress/sql/distributed_domain.sql b/src/test/regress/sql/distributed_domain.sql index b03a2040f..5bf3bd6a8 100644 --- a/src/test/regress/sql/distributed_domain.sql +++ b/src/test/regress/sql/distributed_domain.sql @@ -487,3 +487,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index d870b624f..146cf78d4 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -995,6 +995,219 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; + +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; + -- Clean up the created schema SET client_min_messages TO WARNING; From 205b15960696ecdf15021dfa1858079841a7e503 Mon Sep 17 00:00:00 2001 From: zhjwpku Date: Tue, 5 Sep 2023 23:40:22 +0800 Subject: [PATCH 6/9] get rid of {Push/Pop}OverrideSearchPath (#7145) --- .../distributed/commands/alter_table.c | 8 ++--- src/backend/distributed/commands/extension.c | 8 ++--- .../distributed/commands/foreign_constraint.c | 4 +-- src/backend/distributed/commands/function.c | 5 ++- src/backend/distributed/commands/statistics.c | 4 +-- src/backend/distributed/commands/trigger.c | 4 +-- src/backend/distributed/commands/view.c | 7 ++-- .../distributed/deparser/citus_ruleutils.c | 4 +-- .../deparser/deparse_domain_stmts.c | 12 +++---- .../deparser/deparse_publication_stmts.c | 4 +-- .../deparser/deparse_table_stmts.c | 4 +-- .../distributed/deparser/ruleutils_14.c | 22 ++++--------- .../distributed/deparser/ruleutils_15.c | 22 ++++--------- .../distributed/deparser/ruleutils_16.c | 22 ++++--------- .../distributed/operations/node_protocol.c | 8 ++--- .../replicate_none_dist_table_shard.c | 4 +-- .../distributed/utils/namespace_utils.c | 33 ++++++++++++------- src/include/distributed/namespace_utils.h | 3 +- 18 files changed, 80 insertions(+), 98 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 788a3b8b0..fbe7cfe07 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -53,6 +53,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/namespace_utils.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/replication_origin_session_utils.h" @@ -1764,10 +1765,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); /* * Push the transaction snapshot to be able to get vief definition with pg_get_viewdef @@ -1779,7 +1777,7 @@ CreateMaterializedViewDDLCommand(Oid matViewOid) char *viewDefinition = TextDatumGetCString(viewDefinitionDatum); PopActiveSnapshot(); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(query, "AS %s", viewDefinition); diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index ac4bf135e..5bddf1ede 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -50,7 +50,7 @@ static List * GetAllViews(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); -static List * GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId); +static List * GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId); /* @@ -985,7 +985,7 @@ CreateExtensionDDLCommand(const ObjectAddress *extensionAddress) /* any privilege granted on FDWs that belong to the extension should be included */ List *FDWGrants = - GenerateGrantCommandsOnExtesionDependentFDWs(extensionAddress->objectId); + GenerateGrantCommandsOnExtensionDependentFDWs(extensionAddress->objectId); ddlCommands = list_concat(ddlCommands, FDWGrants); @@ -1048,11 +1048,11 @@ RecreateExtensionStmt(Oid extensionOid) /* - * GenerateGrantCommandsOnExtesionDependentFDWs returns a list of commands that GRANTs + * GenerateGrantCommandsOnExtensionDependentFDWs returns a list of commands that GRANTs * the privileges on FDWs that are depending on the given extension. */ static List * -GenerateGrantCommandsOnExtesionDependentFDWs(Oid extensionId) +GenerateGrantCommandsOnExtensionDependentFDWs(Oid extensionId) { List *commands = NIL; List *FDWOids = GetDependentFDWsToExtension(extensionId); diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index 0d5156353..7c2d50f44 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -895,7 +895,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags) List *foreignKeyCommands = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid foreignKeyOid = InvalidOid; foreach_oid(foreignKeyOid, foreignKeyOids) @@ -906,7 +906,7 @@ GetForeignConstraintCommandsInternal(Oid relationId, int flags) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return foreignKeyCommands; } diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 76112ad9a..01911677d 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -909,15 +909,14 @@ GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace) else { Datum sqlTextDatum = (Datum) 0; - - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); sqlTextDatum = DirectFunctionCall1(pg_get_functiondef, ObjectIdGetDatum(funcOid)); createFunctionSQL = TextDatumGetCString(sqlTextDatum); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } return createFunctionSQL; diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index e5d5ac8ce..dae72ada9 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -530,7 +530,7 @@ GetExplicitStatisticsCommandList(Oid relationId) RelationClose(relation); /* generate fully-qualified names */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid statisticsId = InvalidOid; foreach_oid(statisticsId, statisticsIdList) @@ -579,7 +579,7 @@ GetExplicitStatisticsCommandList(Oid relationId) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return explicitStatisticsCommandList; } diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index c7776bdb6..7577dfd31 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -74,7 +74,7 @@ GetExplicitTriggerCommandList(Oid relationId) { List *createTriggerCommandList = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); List *triggerIdList = GetExplicitTriggerIdList(relationId); @@ -116,7 +116,7 @@ GetExplicitTriggerCommandList(Oid relationId) } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return createTriggerCommandList; } diff --git a/src/backend/distributed/commands/view.c b/src/backend/distributed/commands/view.c index 02d6815d9..7c4816144 100644 --- a/src/backend/distributed/commands/view.c +++ b/src/backend/distributed/commands/view.c @@ -479,10 +479,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid) * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); /* * Push the transaction snapshot to be able to get vief definition with pg_get_viewdef @@ -494,7 +491,7 @@ AppendViewDefinitionToCreateViewCommand(StringInfo buf, Oid viewOid) char *viewDefinition = TextDatumGetCString(viewDefinitionDatum); PopActiveSnapshot(); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, "AS %s ", viewDefinition); } diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 6b865e061..220ea3ec7 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -818,7 +818,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, * Switch to empty search_path to deparse_index_columns to produce fully- * qualified names in expressions. */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); /* index column or expression list begins here */ appendStringInfoChar(buffer, '('); @@ -855,7 +855,7 @@ deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } diff --git a/src/backend/distributed/deparser/deparse_domain_stmts.c b/src/backend/distributed/deparser/deparse_domain_stmts.c index 9891e0532..e517074ec 100644 --- a/src/backend/distributed/deparser/deparse_domain_stmts.c +++ b/src/backend/distributed/deparser/deparse_domain_stmts.c @@ -345,9 +345,9 @@ AppendAlterDomainStmtSetDefault(StringInfo buf, AlterDomainStmt *stmt) expr = TransformDefaultExpr(expr, stmt->typeName, baseTypeName); /* deparse while the searchpath is cleared to force qualification of identifiers */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, "SET DEFAULT %s", exprSql); } @@ -443,9 +443,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName, elog(ERROR, "missing expression for domain constraint"); } - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, " CHECK (%s)", exprSql); return; @@ -469,9 +469,9 @@ AppendConstraint(StringInfo buf, Constraint *constraint, List *domainName, elog(ERROR, "missing expression for domain default"); } - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *exprSql = deparse_expression(expr, NIL, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfo(buf, " DEFAULT %s", exprSql); return; diff --git a/src/backend/distributed/deparser/deparse_publication_stmts.c b/src/backend/distributed/deparser/deparse_publication_stmts.c index deb8e7285..e22333146 100644 --- a/src/backend/distributed/deparser/deparse_publication_stmts.c +++ b/src/backend/distributed/deparser/deparse_publication_stmts.c @@ -307,11 +307,11 @@ AppendWhereClauseExpression(StringInfo buf, RangeVar *tableName, List *relationContext = deparse_context_for(tableName->relname, relation->rd_id); - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *whereClauseString = deparse_expression(whereClause, relationContext, true, true); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); appendStringInfoString(buf, whereClauseString); diff --git a/src/backend/distributed/deparser/deparse_table_stmts.c b/src/backend/distributed/deparser/deparse_table_stmts.c index a90d38655..ff96d7fc3 100644 --- a/src/backend/distributed/deparser/deparse_table_stmts.c +++ b/src/backend/distributed/deparser/deparse_table_stmts.c @@ -562,9 +562,9 @@ DeparseRawExprForColumnDefault(Oid relationId, Oid columnTypeId, int32 columnTyp List *deparseContext = deparse_context_for(get_rel_name(relationId), relationId); - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); char *defaultExprStr = deparse_expression(defaultExpr, deparseContext, false, false); - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); RelationClose(relation); diff --git a/src/backend/distributed/deparser/ruleutils_14.c b/src/backend/distributed/deparser/ruleutils_14.c index b364221d8..6ab124537 100644 --- a/src/backend/distributed/deparser/ruleutils_14.c +++ b/src/backend/distributed/deparser/ruleutils_14.c @@ -53,6 +53,7 @@ #include "common/keywords.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -610,18 +611,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -638,7 +635,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -1955,8 +1952,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -1975,12 +1970,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2031,7 +2023,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/deparser/ruleutils_15.c b/src/backend/distributed/deparser/ruleutils_15.c index 2dded9b01..755e0f4cd 100644 --- a/src/backend/distributed/deparser/ruleutils_15.c +++ b/src/backend/distributed/deparser/ruleutils_15.c @@ -54,6 +54,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/multi_router_planner.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -624,18 +625,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(), since we set addCatalog to true; */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -652,7 +649,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -2038,8 +2035,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -2058,12 +2053,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2118,7 +2110,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/deparser/ruleutils_16.c b/src/backend/distributed/deparser/ruleutils_16.c index 37ba6e34b..31e8823b1 100644 --- a/src/backend/distributed/deparser/ruleutils_16.c +++ b/src/backend/distributed/deparser/ruleutils_16.c @@ -54,6 +54,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_ruleutils.h" #include "distributed/multi_router_planner.h" +#include "distributed/namespace_utils.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "funcapi.h" @@ -641,18 +642,14 @@ pg_get_rule_expr(Node *expression) { bool showImplicitCasts = true; deparse_context context; - OverrideSearchPath *overridePath = NULL; StringInfo buffer = makeStringInfo(); /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buffer; context.namespaces = NIL; @@ -669,7 +666,7 @@ pg_get_rule_expr(Node *expression) get_rule_expr(expression, &context, showImplicitCasts); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return buffer->data; } @@ -2052,8 +2049,6 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, deparse_context context; deparse_namespace dpns; - OverrideSearchPath *overridePath = NULL; - /* Guard against excessively long or deeply-nested queries */ CHECK_FOR_INTERRUPTS(); check_stack_depth(); @@ -2072,12 +2067,9 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; + * PushEmptySearchPath(). */ - overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); + int saveNestLevel = PushEmptySearchPath(); context.buf = buf; context.namespaces = lcons(&dpns, list_copy(parentnamespace)); @@ -2132,7 +2124,7 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace, } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } /* ---------- diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 14287992e..a3f7092d1 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -612,7 +612,7 @@ GetPreLoadTableCreationCommands(Oid relationId, { List *tableDDLEventList = NIL; - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); /* fetch table schema and column option definitions */ char *tableSchemaDef = pg_get_tableschemadef_string(relationId, @@ -665,7 +665,7 @@ GetPreLoadTableCreationCommands(Oid relationId, tableDDLEventList = list_concat(tableDDLEventList, policyCommands); /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); return tableDDLEventList; } @@ -754,7 +754,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE int indexFlags) { /* generate fully-qualified names */ - PushOverrideEmptySearchPath(CurrentMemoryContext); + int saveNestLevel = PushEmptySearchPath(); Oid indexId = indexForm->indexrelid; bool indexImpliedByConstraint = IndexImpliedByAConstraint(indexForm); @@ -805,7 +805,7 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } /* revert back to original search_path */ - PopOverrideSearchPath(); + PopEmptySearchPath(saveNestLevel); } diff --git a/src/backend/distributed/operations/replicate_none_dist_table_shard.c b/src/backend/distributed/operations/replicate_none_dist_table_shard.c index 945214aef..c28490367 100644 --- a/src/backend/distributed/operations/replicate_none_dist_table_shard.c +++ b/src/backend/distributed/operations/replicate_none_dist_table_shard.c @@ -158,7 +158,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) * local session because changes made to shards are allowed for Citus internal * backends anyway. */ - int save_nestlevel = NewGUCNestLevel(); + int saveNestLevel = NewGUCNestLevel(); SetLocalEnableLocalReferenceForeignKeys(false); SetLocalEnableManualChangesToShard(true); @@ -184,7 +184,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) bool localExecutionSupported = true; ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); - AtEOXact_GUC(true, save_nestlevel); + AtEOXact_GUC(true, saveNestLevel); } diff --git a/src/backend/distributed/utils/namespace_utils.c b/src/backend/distributed/utils/namespace_utils.c index a97adb573..4f822b7d2 100644 --- a/src/backend/distributed/utils/namespace_utils.c +++ b/src/backend/distributed/utils/namespace_utils.c @@ -11,22 +11,33 @@ #include "postgres.h" -#include "catalog/namespace.h" -#include "distributed/citus_ruleutils.h" #include "distributed/namespace_utils.h" +#include "utils/guc.h" #include "utils/regproc.h" /* - * PushOverrideEmptySearchPath pushes search_path to be NIL and sets addCatalog to - * true so that all objects outside of pg_catalog will be schema-prefixed. - * Afterwards, PopOverrideSearchPath can be used to revert the search_path back. + * We use the equivalent of a function SET option to allow the setting to + * persist for the exact duration of the transaction, guc.c takes care of + * undoing the setting on error. + * + * We set search_path to "pg_catalog" instead of "" to expose useful utilities. + */ +int +PushEmptySearchPath() +{ + int saveNestLevel = NewGUCNestLevel(); + (void) set_config_option("search_path", "pg_catalog", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + return saveNestLevel; +} + + +/* + * Restore the GUC variable search_path we set in PushEmptySearchPath */ void -PushOverrideEmptySearchPath(MemoryContext memoryContext) +PopEmptySearchPath(int saveNestLevel) { - OverrideSearchPath *overridePath = GetOverrideSearchPath(memoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - - PushOverrideSearchPath(overridePath); + AtEOXact_GUC(true, saveNestLevel); } diff --git a/src/include/distributed/namespace_utils.h b/src/include/distributed/namespace_utils.h index 7d64ead12..6be101d2a 100644 --- a/src/include/distributed/namespace_utils.h +++ b/src/include/distributed/namespace_utils.h @@ -10,6 +10,7 @@ #ifndef NAMESPACE_UTILS_H #define NAMESPACE_UTILS_H -extern void PushOverrideEmptySearchPath(MemoryContext memoryContext); +extern int PushEmptySearchPath(void); +extern void PopEmptySearchPath(int saveNestLevel); #endif /* NAMESPACE_UTILS_H */ From 5c658b4eb7da882e811304d479ac8a5ee5c42799 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Tue, 5 Sep 2023 19:42:39 +0300 Subject: [PATCH 7/9] PG16 - Add citus_truncate_trigger for Citus foreign tables (#7170) Since in PG16, truncate triggers are supported on foreign tables, we add the citus_truncate_trigger to Citus foreign tables as well, such that the TRUNCATE command is propagated to the table's single local shard as well. Note that TRUNCATE command was working for foreign tables even before this commit: see https://github.com/citusdata/citus/pull/7170#issuecomment-1706240593 for details This commit also adds tests with user-enabled truncate triggers on Citus foreign tables: both trigger on the shell table and on its single foreign local shard. Relevant PG commit: https://github.com/postgres/postgres/commit/3b00a94 --- .../citus_add_local_table_to_metadata.c | 9 +++ .../commands/create_distributed_table.c | 9 +++ src/test/regress/expected/pg16.out | 75 +++++++++++++++++++ src/test/regress/sql/pg16.sql | 58 ++++++++++++++ 4 files changed, 151 insertions(+) diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index ebc69d19b..c713ce099 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1478,11 +1478,20 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, static void FinalizeCitusLocalTableCreation(Oid relationId) { +#if PG_VERSION_NUM >= PG_VERSION_16 + + /* + * PG16+ supports truncate triggers on foreign tables + */ + if (RegularTable(relationId) || IsForeignTable(relationId)) +#else + /* * If it is a foreign table, then skip creating citus truncate trigger * as foreign tables do not support truncate triggers. */ if (RegularTable(relationId)) +#endif { CreateTruncateTrigger(relationId); } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index ff02593f5..dc06692b3 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1256,8 +1256,17 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, colocationId, citusTableParams.replicationModel, autoConverted); +#if PG_VERSION_NUM >= PG_VERSION_16 + + /* + * PG16+ supports truncate triggers on foreign tables + */ + if (RegularTable(relationId) || IsForeignTable(relationId)) +#else + /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) +#endif { CreateTruncateTrigger(relationId); } diff --git a/src/test/regress/expected/pg16.out b/src/test/regress/expected/pg16.out index 8c0fdc859..27abfdf7a 100644 --- a/src/test/regress/expected/pg16.out +++ b/src/test/regress/expected/pg16.out @@ -314,6 +314,80 @@ SELECT result FROM run_command_on_workers (2 rows) SET search_path TO pg16; +SET citus.next_shard_id TO 951000; +-- Foreign table TRUNCATE trigger +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/3b00a94 +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SET citus.use_citus_managed_tables TO ON; +CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial); +INSERT INTO foreign_table_test VALUES (1, 'text_test'); +CREATE EXTENSION postgres_fdw; +CREATE SERVER foreign_server + FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (host 'localhost', port :'master_port', dbname 'regression'); +CREATE USER MAPPING FOR CURRENT_USER + SERVER foreign_server + OPTIONS (user 'postgres'); +CREATE FOREIGN TABLE foreign_table ( + id integer NOT NULL, + data text, + a bigserial +) + SERVER foreign_server + OPTIONS (schema_name 'pg16', table_name 'foreign_table_test'); +-- verify it's a Citus foreign table +SELECT partmethod, repmodel FROM pg_dist_partition +WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid; + partmethod | repmodel +--------------------------------------------------------------------- + n | s +(1 row) + +INSERT INTO foreign_table VALUES (2, 'test_2'); +INSERT INTO foreign_table_test VALUES (3, 'test_3'); +CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; +CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; +CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); +SET citus.override_table_visibility TO off; +CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard(); +RESET citus.override_table_visibility; +SELECT * FROM foreign_table ORDER BY 1; + id | data | a +--------------------------------------------------------------------- + 1 | text_test | 1 + 2 | test_2 | 1 + 3 | test_3 | 2 +(3 rows) + +TRUNCATE foreign_table; +NOTICE: trigger_func() called: action = TRUNCATE, when = BEFORE, level = STATEMENT +CONTEXT: PL/pgSQL function trigger_func() line XX at RAISE +NOTICE: trigger_func_on_shard() called: action = TRUNCATE, when = BEFORE, level = STATEMENT +CONTEXT: PL/pgSQL function trigger_func_on_shard() line XX at RAISE +SELECT * FROM foreign_table ORDER BY 1; + id | data | a +--------------------------------------------------------------------- +(0 rows) + +RESET citus.use_citus_managed_tables; -- -- COPY FROM ... DEFAULT -- Already supported in Citus, adding all PG tests with a distributed table @@ -678,4 +752,5 @@ SELECT result FROM run_command_on_workers \set VERBOSITY terse SET client_min_messages TO ERROR; +DROP EXTENSION postgres_fdw CASCADE; DROP SCHEMA pg16 CASCADE; diff --git a/src/test/regress/sql/pg16.sql b/src/test/regress/sql/pg16.sql index 1df96e6a7..f5185deab 100644 --- a/src/test/regress/sql/pg16.sql +++ b/src/test/regress/sql/pg16.sql @@ -146,6 +146,63 @@ DROP DATABASE test_db; SELECT result FROM run_command_on_workers ($$DROP DATABASE test_db$$); SET search_path TO pg16; +SET citus.next_shard_id TO 951000; + +-- Foreign table TRUNCATE trigger +-- Relevant PG commit: +-- https://github.com/postgres/postgres/commit/3b00a94 +SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0); +SET citus.use_citus_managed_tables TO ON; +CREATE TABLE foreign_table_test (id integer NOT NULL, data text, a bigserial); +INSERT INTO foreign_table_test VALUES (1, 'text_test'); +CREATE EXTENSION postgres_fdw; +CREATE SERVER foreign_server + FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (host 'localhost', port :'master_port', dbname 'regression'); +CREATE USER MAPPING FOR CURRENT_USER + SERVER foreign_server + OPTIONS (user 'postgres'); +CREATE FOREIGN TABLE foreign_table ( + id integer NOT NULL, + data text, + a bigserial +) + SERVER foreign_server + OPTIONS (schema_name 'pg16', table_name 'foreign_table_test'); + +-- verify it's a Citus foreign table +SELECT partmethod, repmodel FROM pg_dist_partition +WHERE logicalrelid = 'foreign_table'::regclass ORDER BY logicalrelid; + +INSERT INTO foreign_table VALUES (2, 'test_2'); +INSERT INTO foreign_table_test VALUES (3, 'test_3'); + +CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; + +CREATE FUNCTION trigger_func_on_shard() RETURNS trigger LANGUAGE plpgsql AS $$ +BEGIN + RAISE NOTICE 'trigger_func_on_shard(%) called: action = %, when = %, level = %', + TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; + RETURN NULL; +END;$$; + +CREATE TRIGGER trig_stmt_before BEFORE TRUNCATE ON foreign_table + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); +SET citus.override_table_visibility TO off; +CREATE TRIGGER trig_stmt_shard_before BEFORE TRUNCATE ON foreign_table_951001 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func_on_shard(); +RESET citus.override_table_visibility; + +SELECT * FROM foreign_table ORDER BY 1; +TRUNCATE foreign_table; +SELECT * FROM foreign_table ORDER BY 1; + +RESET citus.use_citus_managed_tables; -- -- COPY FROM ... DEFAULT @@ -390,4 +447,5 @@ SELECT result FROM run_command_on_workers \set VERBOSITY terse SET client_min_messages TO ERROR; +DROP EXTENSION postgres_fdw CASCADE; DROP SCHEMA pg16 CASCADE; From b2fc763bc351afc20f8e6f5565e049d25578dd9d Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Wed, 6 Sep 2023 14:57:24 +0300 Subject: [PATCH 8/9] PG16 - Add tests with random_normal (#7183) Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176 --- src/test/regress/expected/pg16.out | 55 ++++++++++++++++++++++++++++++ src/test/regress/sql/pg16.sql | 35 +++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/test/regress/expected/pg16.out b/src/test/regress/expected/pg16.out index 27abfdf7a..a8ff5e47d 100644 --- a/src/test/regress/expected/pg16.out +++ b/src/test/regress/expected/pg16.out @@ -750,6 +750,61 @@ SELECT result FROM run_command_on_workers REINDEX (2 rows) +-- +-- random_normal() to provide normally-distributed random numbers +-- adding here the same tests as the ones with random() in aggregate_support.sql +-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176 +-- +CREATE TABLE dist_table (dist_col int, agg_col numeric); +SELECT create_distributed_table('dist_table', 'dist_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table (int_col int); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- Test the cases where the worker agg exec. returns no tuples. +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + +(1 row) + +-- run the same queries after loading some data +INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), + (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19); +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + percentile_disc +--------------------------------------------------------------------- + 3.22 +(1 row) + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + percentile_disc +--------------------------------------------------------------------- + 1.19 +(1 row) + \set VERBOSITY terse SET client_min_messages TO ERROR; DROP EXTENSION postgres_fdw CASCADE; diff --git a/src/test/regress/sql/pg16.sql b/src/test/regress/sql/pg16.sql index f5185deab..8cffb917e 100644 --- a/src/test/regress/sql/pg16.sql +++ b/src/test/regress/sql/pg16.sql @@ -445,6 +445,41 @@ REINDEX SYSTEM; SELECT result FROM run_command_on_workers ($$REINDEX SYSTEM$$); +-- +-- random_normal() to provide normally-distributed random numbers +-- adding here the same tests as the ones with random() in aggregate_support.sql +-- Relevant PG commit: https://github.com/postgres/postgres/commit/38d8176 +-- + +CREATE TABLE dist_table (dist_col int, agg_col numeric); +SELECT create_distributed_table('dist_table', 'dist_col'); + +CREATE TABLE ref_table (int_col int); +SELECT create_reference_table('ref_table'); + +-- Test the cases where the worker agg exec. returns no tuples. + +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + +-- run the same queries after loading some data + +INSERT INTO dist_table VALUES (2, 11.2), (3, NULL), (6, 3.22), (3, 4.23), (5, 5.25), + (4, 63.4), (75, NULL), (80, NULL), (96, NULL), (8, 1078), (0, 1.19); + +SELECT PERCENTILE_DISC(.25) WITHIN GROUP (ORDER BY agg_col) +FROM (SELECT *, random_normal() FROM dist_table) a; + +SELECT PERCENTILE_DISC((2 > random_normal(stddev => 1, mean => 0))::int::numeric / 10) + WITHIN GROUP (ORDER BY agg_col) +FROM dist_table +LEFT JOIN ref_table ON TRUE; + \set VERBOSITY terse SET client_min_messages TO ERROR; DROP EXTENSION postgres_fdw CASCADE; From 7e5136f2de512d2a9d89350298f80f650ffd2e4d Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Wed, 6 Sep 2023 16:40:36 +0300 Subject: [PATCH 9/9] Add tests with publications with schema and table of the same schema (#7184) Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f 13a185f It was backpatched through PG15 so I added this test in publication.sql instead of pg16.sql --- src/test/regress/expected/publication.out | 154 ++++++++++++++++++++++ src/test/regress/sql/publication.sql | 106 +++++++++++++++ 2 files changed, 260 insertions(+) diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 702d23f1f..c761efb3e 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -375,6 +375,158 @@ END; CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; RESET citus.create_object_propagation; DROP SCHEMA deptest CASCADE; +-- +-- PG16 allows publications with schema and table of the same schema. +-- backpatched to PG15 +-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f +-- +CREATE SCHEMA publication2; +CREATE TABLE publication2.test1 (id int); +SELECT create_distributed_table('publication2.test1', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- should be able to create publication with schema and table of the same +-- schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +CREATE TABLE publication.test2 (id int); +SELECT create_distributed_table('publication.test2', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication +ALTER TABLE test2 SET SCHEMA publication2; +-- should be able to add a table of the same schema to the schema publication +CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int); +SELECT create_distributed_table('publication2.test3', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2, TABLE publication2.test3 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- should be able to drop the table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1, TABLE publication2.test2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +DROP PUBLICATION testpub_for_tbl_schema; +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +-- should be able to set publication with schema and table of the same schema +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE ((test1.id < 99)) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- test that using column list for table is disallowed if any schemas are +-- part of the publication +DROP PUBLICATION testpub_for_tbl_schema; +-- failure - cannot use column list and schema together +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - only publish schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2 WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - add a table with column list when there is already a schema in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - only publish table with column list +ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema FOR TABLE publication2.test3 (y) WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - specify a schema when there is already a column list in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2; +ERROR: cannot add schema to publication "testpub_for_tbl_schema" +DETAIL: Schemas cannot be added if any tables that specify a column list are already part of the publication. +-- failure - cannot SET column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +-- ok - drop table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + c +--------------------------------------------------------------------- + SELECT worker_create_or_replace_object('CREATE PUBLICATION testpub_for_tbl_schema WITH (publish_via_partition_root = ''false'', publish = ''insert, update, delete, truncate'')'); +(1 row) + +-- failure - cannot ADD column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y); +ERROR: cannot use column list for relation "publication2.test3" in publication "testpub_for_tbl_schema" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. -- make sure we can sync all the publication metadata SELECT start_metadata_sync_to_all_nodes(); start_metadata_sync_to_all_nodes @@ -386,7 +538,9 @@ DROP PUBLICATION pubdep; DROP PUBLICATION "pub-mix"; DROP PUBLICATION pubtables; DROP PUBLICATION pubpartitioned; +DROP PUBLICATION testpub_for_tbl_schema; SET client_min_messages TO ERROR; DROP SCHEMA publication CASCADE; DROP SCHEMA "publication-1" CASCADE; DROP SCHEMA citus_schema_1 CASCADE; +DROP SCHEMA publication2 CASCADE; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 8bd2ea923..06bdc39fe 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -273,6 +273,110 @@ CREATE PUBLICATION pubdep FOR TABLES IN SCHEMA deptest; RESET citus.create_object_propagation; DROP SCHEMA deptest CASCADE; +-- +-- PG16 allows publications with schema and table of the same schema. +-- backpatched to PG15 +-- Relevant PG commit: https://github.com/postgres/postgres/commit/13a185f +-- + +CREATE SCHEMA publication2; +CREATE TABLE publication2.test1 (id int); +SELECT create_distributed_table('publication2.test1', 'id'); + +-- should be able to create publication with schema and table of the same +-- schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test1; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +CREATE TABLE publication.test2 (id int); +SELECT create_distributed_table('publication.test2', 'id'); +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication.test2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- should be able to have publication2 schema and its new table test2 in testpub_for_tbl_schema publication +ALTER TABLE test2 SET SCHEMA publication2; + +-- should be able to add a table of the same schema to the schema publication +CREATE TABLE publication2.test3 (x int primary key, y int, "column-1" int); +SELECT create_distributed_table('publication2.test3', 'x'); +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- should be able to drop the table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +DROP PUBLICATION testpub_for_tbl_schema; +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +-- should be able to set publication with schema and table of the same schema +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test1 WHERE (id < 99); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- test that using column list for table is disallowed if any schemas are +-- part of the publication +DROP PUBLICATION testpub_for_tbl_schema; + +-- failure - cannot use column list and schema together +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + +-- ok - only publish schema +CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA publication2; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - add a table with column list when there is already a schema in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLE publication2.test3(y); + +-- ok - only publish table with column list +ALTER PUBLICATION testpub_for_tbl_schema SET TABLE publication2.test3(y); +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - specify a schema when there is already a column list in the +-- publication +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2; + +-- failure - cannot SET column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema SET TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + +-- ok - drop table +ALTER PUBLICATION testpub_for_tbl_schema DROP TABLE publication2.test3; +SELECT DISTINCT c FROM ( + SELECT unnest(result::text[]) c + FROM run_command_on_workers($$ + SELECT array_agg(c) FROM (SELECT c FROM unnest(activate_node_snapshot()) c WHERE c LIKE '%CREATE PUBLICATION%' AND c LIKE '%testpub_for_tbl_schema%' ORDER BY 1) s$$) + ORDER BY c) s; + +-- failure - cannot ADD column list and schema together +ALTER PUBLICATION testpub_for_tbl_schema ADD TABLES IN SCHEMA publication2, TABLE publication2.test3(y); + -- make sure we can sync all the publication metadata SELECT start_metadata_sync_to_all_nodes(); @@ -280,8 +384,10 @@ DROP PUBLICATION pubdep; DROP PUBLICATION "pub-mix"; DROP PUBLICATION pubtables; DROP PUBLICATION pubpartitioned; +DROP PUBLICATION testpub_for_tbl_schema; SET client_min_messages TO ERROR; DROP SCHEMA publication CASCADE; DROP SCHEMA "publication-1" CASCADE; DROP SCHEMA citus_schema_1 CASCADE; +DROP SCHEMA publication2 CASCADE;