diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3b993250f..ff02593f5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1659,6 +1659,7 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*tableAddress, RelationRelationId, relationId); EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress)); + TrackPropagatedTableAndSequences(relationId); } diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index ceec83324..977efb145 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -112,15 +112,35 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) dependency->objectSubId, ExclusiveLock); } - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - const char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), - ddlCommands); + /* + * We need to propagate dependencies via the current user's metadata connection if + * any dependency for the target is created in the current transaction. Our assumption + * is that if we rely on a dependency created in the current transaction, then the + * current user, most probably, has permissions to create the target object as well. + * Note that, user still may not be able to create the target due to no permissions + * for any of its dependencies. But this is ok since it should be rare. + * + * If we opted to use a separate superuser connection for the target, then we would + * have visibility issues since propagated dependencies would be invisible to + * the separate connection until we locally commit. + */ + if (HasAnyDependencyInPropagatedObjects(target)) + { + SendCommandListToWorkersWithMetadata(ddlCommands); + } + else + { + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + const char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), + ddlCommands); + } } /* diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 1945218b6..10e424623 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -937,6 +937,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, foreach_ptr(address, addresses) { MarkObjectDistributed(address); + TrackPropagatedObject(address); } } } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 5add48009..9a7bd9089 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -19,6 +19,8 @@ #include "access/twophase.h" #include "access/xact.h" +#include "catalog/dependency.h" +#include "common/hashfn.h" #include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" @@ -30,6 +32,7 @@ #include "distributed/local_executor.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/maintenanced.h" +#include "distributed/metadata/dependency.h" #include "distributed/multi_executor.h" #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" @@ -89,14 +92,25 @@ StringInfo activeSetStmts; * Though a list, we treat this as a stack, pushing on subxact contexts whenever * e.g. a SAVEPOINT is executed (though this is actually performed by providing * PostgreSQL with a sub-xact callback). At present, the context of a subxact - * includes a subxact identifier as well as any SET LOCAL statements propagated - * to workers during the sub-transaction. + * includes + * - a subxact identifier, + * - any SET LOCAL statements propagated to workers during the sub-transaction, + * - all objects propagated to workers during the sub-transaction. * * To be clear, last item of activeSubXactContexts list corresponds to top of * stack. */ static List *activeSubXactContexts = NIL; +/* + * PropagatedObjectsInTx is a set of objects propagated in the root transaction. + * We also keep track of objects propagated in sub-transactions in activeSubXactContexts. + * Any committed sub-transaction would cause the objects, which are propagated during + * the sub-transaction, to be moved to upper transaction's set. Objects are discarded + * when the sub-transaction is aborted. + */ +static HTAB *PropagatedObjectsInTx = NULL; + /* some pre-allocated memory so we don't need to call malloc() during callbacks */ MemoryContext CitusXactCallbackContext = NULL; @@ -142,11 +156,17 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); -static void PopSubXact(SubTransactionId subId); +static void PopSubXact(SubTransactionId subId, bool commit); static void ResetGlobalVariables(void); static bool SwallowErrors(void (*func)(void)); static void ForceAllInProgressConnectionsToClose(void); static void EnsurePrepareTransactionIsAllowed(void); +static HTAB * CurrentTransactionPropagatedObjects(bool readonly); +static HTAB * ParentTransactionPropagatedObjects(bool readonly); +static void MovePropagatedObjectsToParentTransaction(void); +static bool DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, + const ObjectAddress *dependency); +static HTAB * CreateTxPropagatedObjectsHash(void); /* @@ -321,6 +341,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* * Make sure that we give the shared connections back to the shared @@ -391,6 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + ResetPropagatedObjects(); /* Reset any local replication origin session since transaction has been aborted.*/ ResetReplicationOriginLocalSession(); @@ -638,7 +660,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, switch (event) { /* - * Our subtransaction stack should be consistent with postgres' internal + * Our sub-transaction stack should be consistent with postgres' internal * transaction stack. In case of subxact begin, postgres calls our * callback after it has pushed the transaction into stack, so we have to * do the same even if worker commands fail, so we PushSubXact() first. @@ -672,7 +694,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRelease(subId); } - PopSubXact(subId); + PopSubXact(subId, true); /* Set CachedDuringCitusCreation to one level lower to represent citus creation is done */ @@ -706,7 +728,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRollback(subId); } - PopSubXact(subId); + PopSubXact(subId, false); /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus @@ -775,6 +797,9 @@ PushSubXact(SubTransactionId subId) state->subId = subId; state->setLocalCmds = activeSetStmts; + /* we lazily create hashset when any object is propagated during sub-transaction */ + state->propagatedObjects = NULL; + /* append to list and reset active set stmts for upcoming sub-xact */ activeSubXactContexts = lappend(activeSubXactContexts, state); activeSetStmts = makeStringInfo(); @@ -783,7 +808,7 @@ PushSubXact(SubTransactionId subId) /* PopSubXact pops subId from the stack of active sub-transactions. */ static void -PopSubXact(SubTransactionId subId) +PopSubXact(SubTransactionId subId, bool commit) { SubXactContext *state = llast(activeSubXactContexts); @@ -806,6 +831,16 @@ PopSubXact(SubTransactionId subId) */ activeSetStmts = state->setLocalCmds; + /* + * Keep subtransaction's propagated objects at parent transaction + * if subtransaction committed. Otherwise, discard them. + */ + if (commit) + { + MovePropagatedObjectsToParentTransaction(); + } + hash_destroy(state->propagatedObjects); + /* * Free state to avoid memory leaks when we create subxacts for each row, * e.g. in exception handling of UDFs. @@ -913,3 +948,227 @@ EnsurePrepareTransactionIsAllowed(void) errmsg("cannot use 2PC in transactions involving " "multiple servers"))); } + + +/* + * CurrentTransactionPropagatedObjects returns the objects propagated in current + * sub-transaction or the root transaction if no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the current sub-transaction. + */ +static HTAB * +CurrentTransactionPropagatedObjects(bool readonly) +{ + if (activeSubXactContexts == NIL) + { + /* hashset in the root transaction if there is no sub-transaction */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* hashset in top level sub-transaction */ + SubXactContext *state = llast(activeSubXactContexts); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for sub-transaction, for mutating uses */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * ParentTransactionPropagatedObjects returns the objects propagated in parent + * transaction of active sub-transaction. It returns the root transaction if + * no sub-transaction exists. + * + * If the propagated objects are readonly it will not create the hashmap if it does not + * already exist in the target sub-transaction. + */ +static HTAB * +ParentTransactionPropagatedObjects(bool readonly) +{ + int nestingLevel = list_length(activeSubXactContexts); + if (nestingLevel <= 1) + { + /* + * The parent is the root transaction, when there is single level sub-transaction + * or no sub-transaction. + */ + if (PropagatedObjectsInTx == NULL && !readonly) + { + /* lazily create hashset for root transaction, for mutating uses */ + PropagatedObjectsInTx = CreateTxPropagatedObjectsHash(); + } + return PropagatedObjectsInTx; + } + + /* parent is upper sub-transaction */ + Assert(nestingLevel >= 2); + SubXactContext *state = list_nth(activeSubXactContexts, nestingLevel - 2); + if (state->propagatedObjects == NULL && !readonly) + { + /* lazily create hashset for parent sub-transaction */ + state->propagatedObjects = CreateTxPropagatedObjectsHash(); + } + return state->propagatedObjects; +} + + +/* + * MovePropagatedObjectsToParentTransaction moves all objects propagated in the current + * sub-transaction to the parent transaction. This should only be called when there is + * active sub-transaction. + */ +static void +MovePropagatedObjectsToParentTransaction(void) +{ + Assert(llast(activeSubXactContexts) != NULL); + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(true); + if (currentPropagatedObjects == NULL) + { + /* nothing to move */ + return; + } + + /* + * Only after we know we have objects to move into the parent do we get a handle on + * a guaranteed existing parent hash table. This makes sure that the parents only + * get populated once there are objects to be tracked. + */ + HTAB *parentPropagatedObjects = ParentTransactionPropagatedObjects(false); + + HASH_SEQ_STATUS propagatedObjectsSeq; + hash_seq_init(&propagatedObjectsSeq, currentPropagatedObjects); + ObjectAddress *objectAddress = NULL; + while ((objectAddress = hash_seq_search(&propagatedObjectsSeq)) != NULL) + { + hash_search(parentPropagatedObjects, objectAddress, HASH_ENTER, NULL); + } +} + + +/* + * DependencyInPropagatedObjectsHash checks if dependency is in given hashset + * of propagated objects. + */ +static bool +DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, const + ObjectAddress *dependency) +{ + if (propagatedObjects == NULL) + { + return false; + } + + bool found = false; + hash_search(propagatedObjects, dependency, HASH_FIND, &found); + return found; +} + + +/* + * CreateTxPropagatedObjectsHash creates a hashset to keep track of the objects + * propagated in the current root transaction or sub-transaction. + */ +static HTAB * +CreateTxPropagatedObjectsHash(void) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ObjectAddress); + info.entrysize = sizeof(ObjectAddress); + info.hash = tag_hash; + info.hcxt = CitusXactCallbackContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + return hash_create("Tx Propagated Objects", 16, &info, hashFlags); +} + + +/* + * TrackPropagatedObject adds given object into the objects propagated in the current + * sub-transaction. + */ +void +TrackPropagatedObject(const ObjectAddress *objectAddress) +{ + HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(false); + hash_search(currentPropagatedObjects, objectAddress, HASH_ENTER, NULL); +} + + +/* + * TrackPropagatedTableAndSequences adds given table and its sequences to the objects + * propagated in the current sub-transaction. + */ +void +TrackPropagatedTableAndSequences(Oid relationId) +{ + /* track table */ + ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*tableAddress, RelationRelationId, relationId); + TrackPropagatedObject(tableAddress); + + /* track its sequences */ + List *ownedSeqIdList = getOwnedSequences(relationId); + Oid ownedSeqId = InvalidOid; + foreach_oid(ownedSeqId, ownedSeqIdList) + { + ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId); + TrackPropagatedObject(seqAddress); + } +} + + +/* + * ResetPropagatedObjects destroys hashset of propagated objects in the root transaction. + */ +void +ResetPropagatedObjects(void) +{ + hash_destroy(PropagatedObjectsInTx); + PropagatedObjectsInTx = NULL; +} + + +/* + * HasAnyDependencyInPropagatedObjects decides if any dependency of given object is + * propagated in the current transaction. + */ +bool +HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress) +{ + List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencyList) + { + /* first search in root transaction */ + if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency)) + { + return true; + } + + /* search in all nested sub-transactions */ + if (activeSubXactContexts == NIL) + { + continue; + } + SubXactContext *state = NULL; + foreach_ptr(state, activeSubXactContexts) + { + if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency)) + { + return true; + } + } + } + + return false; +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a9a855fb1..03ecbea72 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -135,6 +135,21 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command) } +/* + * SendCommandListToWorkersWithMetadata sends all commands to all metadata workers + * with the current user. See `SendCommandToWorkersWithMetadata`for details. + */ +void +SendCommandListToWorkersWithMetadata(List *commands) +{ + char *command = NULL; + foreach_ptr(command, commands) + { + SendCommandToWorkersWithMetadata(command); + } +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index e2d35048a..ca4e632a9 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -10,11 +10,13 @@ #define TRANSACTION_MANAGMENT_H #include "access/xact.h" +#include "catalog/objectaddress.h" #include "lib/ilist.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "lib/stringinfo.h" #include "nodes/primnodes.h" +#include "utils/hsearch.h" /* forward declare, to avoid recursive includes */ struct DistObjectCacheEntry; @@ -58,6 +60,7 @@ typedef struct SubXactContext { SubTransactionId subId; StringInfo setLocalCmds; + HTAB *propagatedObjects; } SubXactContext; /* @@ -157,6 +160,11 @@ extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); extern bool MaybeExecutingUDF(void); +/* functions for tracking the objects propagated in current transaction */ +extern void TrackPropagatedObject(const ObjectAddress *objectAddress); +extern void TrackPropagatedTableAndSequences(Oid relationId); +extern void ResetPropagatedObjects(void); +extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress); /* initialization function(s) */ extern void InitializeTransactionManagement(void); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index be8fe5ed6..631940edf 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -73,6 +73,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); +extern void SendCommandListToWorkersWithMetadata(List *commands); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 731b1a908..2b71f5e1b 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -168,6 +168,7 @@ DEPS = { ], ), "grant_on_schema_propagation": TestDeps("minimal_schedule"), + "propagate_extension_commands": TestDeps("minimal_schedule"), } diff --git a/src/test/regress/expected/distributed_domain.out b/src/test/regress/expected/distributed_domain.out index 5043d4f05..30e388803 100644 --- a/src/test/regress/expected/distributed_domain.out +++ b/src/test/regress/expected/distributed_domain.out @@ -947,3 +947,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; NOTICE: type "domain_does_not_exist" does not exist, skipping SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index dcb87486d..2de95266b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1393,6 +1393,284 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq +drop cascades to table sc1.s1 +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to sequence sc1.seq1 +drop cascades to table sc1.s1 +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SEQUENCE seq1; +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +NOTICE: drop cascades to table sc1.s1 +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); + create_schema_test +--------------------------------------------------------------------- + +(1 row) + +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_1.test +drop cascades to table test_1.test_1197064 -- Clean up the created schema SET client_min_messages TO WARNING; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object diff --git a/src/test/regress/sql/distributed_domain.sql b/src/test/regress/sql/distributed_domain.sql index b03a2040f..5bf3bd6a8 100644 --- a/src/test/regress/sql/distributed_domain.sql +++ b/src/test/regress/sql/distributed_domain.sql @@ -487,3 +487,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist; SET client_min_messages TO warning; DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE; +DROP ROLE domain_owner; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index d870b624f..146cf78d4 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -995,6 +995,219 @@ BEGIN; ALTER SCHEMA bar RENAME TO foo; ROLLBACK; +-- below tests are to verify dependency propagation with nested sub-transactions +-- TEST1 +BEGIN; + CREATE SCHEMA sc1; + CREATE SEQUENCE sc1.seq; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST2 +CREATE SCHEMA sc1; +BEGIN; + CREATE SEQUENCE sc1.seq1; + CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST3 +SET citus.enable_metadata_sync TO off; +CREATE SCHEMA sc1; +SET citus.enable_metadata_sync TO on; +BEGIN; + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST4 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST5 +BEGIN; + SAVEPOINT sp1; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST6 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST7 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST8 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp2; + ROLLBACK TO SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; + +-- TEST9 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + ROLLBACK TO SAVEPOINT sp2; + + SAVEPOINT sp3; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SEQUENCE seq1; + +-- TEST10 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + ROLLBACK TO SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + SET LOCAL citus.enable_metadata_sync TO off; + CREATE SCHEMA sc1; + SET LOCAL citus.enable_metadata_sync TO on; + + CREATE TABLE sc1.s1(id int); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; + +-- TEST11 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SCHEMA sc1; + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; + + CREATE SEQUENCE seq1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- TEST12 +BEGIN; + SAVEPOINT sp1; + SAVEPOINT sp2; + CREATE SCHEMA sc2; + RELEASE SAVEPOINT sp2; + SAVEPOINT sp3; + CREATE SCHEMA sc3; + SAVEPOINT sp4; + CREATE SEQUENCE seq1; + CREATE SCHEMA sc1; + CREATE TABLE sc1.s1(id int default(nextval('seq1'))); + SELECT create_distributed_table('sc1.s1','id'); + RELEASE SAVEPOINT sp4; + RELEASE SAVEPOINT sp3; + RELEASE SAVEPOINT sp1; +COMMIT; +DROP SCHEMA sc1 CASCADE; +DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SEQUENCE seq1; + +-- issue-6614 +CREATE FUNCTION create_schema_test() RETURNS void AS $$ +BEGIN + SET citus.create_object_propagation = 'deferred'; + CREATE SCHEMA test_1; + CREATE TABLE test_1.test ( + id bigserial constraint test_pk primary key, + creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null + ); + PERFORM create_reference_table('test_1.test'); + RETURN; +END; +$$ LANGUAGE plpgsql; +SELECT create_schema_test(); +SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$); +DROP FUNCTION create_schema_test; +DROP SCHEMA test_1 CASCADE; + -- Clean up the created schema SET client_min_messages TO WARNING;