mirror of https://github.com/citusdata/citus.git
Fixes visibility problems with dependency propagation (#7028)
**Problem:** Previously we always used an outside superuser connection to overcome permission issues for the current user while propagating dependencies. That has mainly 2 problems: 1. Visibility issues during dependency propagation, (metadata connection propagates some objects like a schema, and outside transaction does not see it and tries to create it again) 2. Security issues (it is preferrable to use current user's connection instead of extension superuser) **Solution (high level):** Now, we try to make a smarter decision on whether should we use an outside superuser connection or current user's metadata connection. We prefer using current user's connection if any of the objects, which is already propagated in the current transaction, is a dependency for a target object. We do that since we assume if current user has permissions to create the dependency, then it can most probably propagate the target as well. Our assumption is expected to hold most of the times but it can still be wrong. In those cases, transaction would fail and user should set the GUC `citus.create_object_propagation` to `deferred` to work around it. **Solution:** 1. We track all objects propagated in the current transaction (we can handle subtransactions), 2. We propagate dependencies via the current user's metadata connection if any dependency is created in the current transaction to address issues listed above. Otherwise, we still use an outside superuser connection. DESCRIPTION: Fixes some object propagation errors seen with transaction blocks. Fixes https://github.com/citusdata/citus/issues/6614 --------- Co-authored-by: Nils Dijk <nils@citusdata.com>pull/7145/head
parent
9f067731c0
commit
8eb3360017
|
@ -1659,6 +1659,7 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
|
|||
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||
TrackPropagatedTableAndSequences(relationId);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -112,15 +112,35 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
|||
dependency->objectSubId, ExclusiveLock);
|
||||
}
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
const char *nodeName = workerNode->workerName;
|
||||
uint32 nodePort = workerNode->workerPort;
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort,
|
||||
CitusExtensionOwnerName(),
|
||||
ddlCommands);
|
||||
/*
|
||||
* We need to propagate dependencies via the current user's metadata connection if
|
||||
* any dependency for the target is created in the current transaction. Our assumption
|
||||
* is that if we rely on a dependency created in the current transaction, then the
|
||||
* current user, most probably, has permissions to create the target object as well.
|
||||
* Note that, user still may not be able to create the target due to no permissions
|
||||
* for any of its dependencies. But this is ok since it should be rare.
|
||||
*
|
||||
* If we opted to use a separate superuser connection for the target, then we would
|
||||
* have visibility issues since propagated dependencies would be invisible to
|
||||
* the separate connection until we locally commit.
|
||||
*/
|
||||
if (HasAnyDependencyInPropagatedObjects(target))
|
||||
{
|
||||
SendCommandListToWorkersWithMetadata(ddlCommands);
|
||||
}
|
||||
else
|
||||
{
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
const char *nodeName = workerNode->workerName;
|
||||
uint32 nodePort = workerNode->workerPort;
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort,
|
||||
CitusExtensionOwnerName(),
|
||||
ddlCommands);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -937,6 +937,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
foreach_ptr(address, addresses)
|
||||
{
|
||||
MarkObjectDistributed(address);
|
||||
TrackPropagatedObject(address);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
#include "access/twophase.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/citus_safe_lib.h"
|
||||
#include "distributed/connection_management.h"
|
||||
|
@ -30,6 +32,7 @@
|
|||
#include "distributed/local_executor.h"
|
||||
#include "distributed/locally_reserved_shared_connections.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata/dependency.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_logical_replication.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
|
@ -89,14 +92,25 @@ StringInfo activeSetStmts;
|
|||
* Though a list, we treat this as a stack, pushing on subxact contexts whenever
|
||||
* e.g. a SAVEPOINT is executed (though this is actually performed by providing
|
||||
* PostgreSQL with a sub-xact callback). At present, the context of a subxact
|
||||
* includes a subxact identifier as well as any SET LOCAL statements propagated
|
||||
* to workers during the sub-transaction.
|
||||
* includes
|
||||
* - a subxact identifier,
|
||||
* - any SET LOCAL statements propagated to workers during the sub-transaction,
|
||||
* - all objects propagated to workers during the sub-transaction.
|
||||
*
|
||||
* To be clear, last item of activeSubXactContexts list corresponds to top of
|
||||
* stack.
|
||||
*/
|
||||
static List *activeSubXactContexts = NIL;
|
||||
|
||||
/*
|
||||
* PropagatedObjectsInTx is a set of objects propagated in the root transaction.
|
||||
* We also keep track of objects propagated in sub-transactions in activeSubXactContexts.
|
||||
* Any committed sub-transaction would cause the objects, which are propagated during
|
||||
* the sub-transaction, to be moved to upper transaction's set. Objects are discarded
|
||||
* when the sub-transaction is aborted.
|
||||
*/
|
||||
static HTAB *PropagatedObjectsInTx = NULL;
|
||||
|
||||
/* some pre-allocated memory so we don't need to call malloc() during callbacks */
|
||||
MemoryContext CitusXactCallbackContext = NULL;
|
||||
|
||||
|
@ -142,11 +156,17 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
|
|||
/* remaining functions */
|
||||
static void AdjustMaxPreparedTransactions(void);
|
||||
static void PushSubXact(SubTransactionId subId);
|
||||
static void PopSubXact(SubTransactionId subId);
|
||||
static void PopSubXact(SubTransactionId subId, bool commit);
|
||||
static void ResetGlobalVariables(void);
|
||||
static bool SwallowErrors(void (*func)(void));
|
||||
static void ForceAllInProgressConnectionsToClose(void);
|
||||
static void EnsurePrepareTransactionIsAllowed(void);
|
||||
static HTAB * CurrentTransactionPropagatedObjects(bool readonly);
|
||||
static HTAB * ParentTransactionPropagatedObjects(bool readonly);
|
||||
static void MovePropagatedObjectsToParentTransaction(void);
|
||||
static bool DependencyInPropagatedObjectsHash(HTAB *propagatedObjects,
|
||||
const ObjectAddress *dependency);
|
||||
static HTAB * CreateTxPropagatedObjectsHash(void);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -321,6 +341,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
|
||||
ResetGlobalVariables();
|
||||
ResetRelationAccessHash();
|
||||
ResetPropagatedObjects();
|
||||
|
||||
/*
|
||||
* Make sure that we give the shared connections back to the shared
|
||||
|
@ -391,6 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
|
||||
ResetGlobalVariables();
|
||||
ResetRelationAccessHash();
|
||||
ResetPropagatedObjects();
|
||||
|
||||
/* Reset any local replication origin session since transaction has been aborted.*/
|
||||
ResetReplicationOriginLocalSession();
|
||||
|
@ -638,7 +660,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|||
switch (event)
|
||||
{
|
||||
/*
|
||||
* Our subtransaction stack should be consistent with postgres' internal
|
||||
* Our sub-transaction stack should be consistent with postgres' internal
|
||||
* transaction stack. In case of subxact begin, postgres calls our
|
||||
* callback after it has pushed the transaction into stack, so we have to
|
||||
* do the same even if worker commands fail, so we PushSubXact() first.
|
||||
|
@ -672,7 +694,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|||
{
|
||||
CoordinatedRemoteTransactionsSavepointRelease(subId);
|
||||
}
|
||||
PopSubXact(subId);
|
||||
PopSubXact(subId, true);
|
||||
|
||||
/* Set CachedDuringCitusCreation to one level lower to represent citus creation is done */
|
||||
|
||||
|
@ -706,7 +728,7 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|||
{
|
||||
CoordinatedRemoteTransactionsSavepointRollback(subId);
|
||||
}
|
||||
PopSubXact(subId);
|
||||
PopSubXact(subId, false);
|
||||
|
||||
/*
|
||||
* Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus
|
||||
|
@ -775,6 +797,9 @@ PushSubXact(SubTransactionId subId)
|
|||
state->subId = subId;
|
||||
state->setLocalCmds = activeSetStmts;
|
||||
|
||||
/* we lazily create hashset when any object is propagated during sub-transaction */
|
||||
state->propagatedObjects = NULL;
|
||||
|
||||
/* append to list and reset active set stmts for upcoming sub-xact */
|
||||
activeSubXactContexts = lappend(activeSubXactContexts, state);
|
||||
activeSetStmts = makeStringInfo();
|
||||
|
@ -783,7 +808,7 @@ PushSubXact(SubTransactionId subId)
|
|||
|
||||
/* PopSubXact pops subId from the stack of active sub-transactions. */
|
||||
static void
|
||||
PopSubXact(SubTransactionId subId)
|
||||
PopSubXact(SubTransactionId subId, bool commit)
|
||||
{
|
||||
SubXactContext *state = llast(activeSubXactContexts);
|
||||
|
||||
|
@ -806,6 +831,16 @@ PopSubXact(SubTransactionId subId)
|
|||
*/
|
||||
activeSetStmts = state->setLocalCmds;
|
||||
|
||||
/*
|
||||
* Keep subtransaction's propagated objects at parent transaction
|
||||
* if subtransaction committed. Otherwise, discard them.
|
||||
*/
|
||||
if (commit)
|
||||
{
|
||||
MovePropagatedObjectsToParentTransaction();
|
||||
}
|
||||
hash_destroy(state->propagatedObjects);
|
||||
|
||||
/*
|
||||
* Free state to avoid memory leaks when we create subxacts for each row,
|
||||
* e.g. in exception handling of UDFs.
|
||||
|
@ -913,3 +948,227 @@ EnsurePrepareTransactionIsAllowed(void)
|
|||
errmsg("cannot use 2PC in transactions involving "
|
||||
"multiple servers")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CurrentTransactionPropagatedObjects returns the objects propagated in current
|
||||
* sub-transaction or the root transaction if no sub-transaction exists.
|
||||
*
|
||||
* If the propagated objects are readonly it will not create the hashmap if it does not
|
||||
* already exist in the current sub-transaction.
|
||||
*/
|
||||
static HTAB *
|
||||
CurrentTransactionPropagatedObjects(bool readonly)
|
||||
{
|
||||
if (activeSubXactContexts == NIL)
|
||||
{
|
||||
/* hashset in the root transaction if there is no sub-transaction */
|
||||
if (PropagatedObjectsInTx == NULL && !readonly)
|
||||
{
|
||||
/* lazily create hashset for root transaction, for mutating uses */
|
||||
PropagatedObjectsInTx = CreateTxPropagatedObjectsHash();
|
||||
}
|
||||
return PropagatedObjectsInTx;
|
||||
}
|
||||
|
||||
/* hashset in top level sub-transaction */
|
||||
SubXactContext *state = llast(activeSubXactContexts);
|
||||
if (state->propagatedObjects == NULL && !readonly)
|
||||
{
|
||||
/* lazily create hashset for sub-transaction, for mutating uses */
|
||||
state->propagatedObjects = CreateTxPropagatedObjectsHash();
|
||||
}
|
||||
return state->propagatedObjects;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ParentTransactionPropagatedObjects returns the objects propagated in parent
|
||||
* transaction of active sub-transaction. It returns the root transaction if
|
||||
* no sub-transaction exists.
|
||||
*
|
||||
* If the propagated objects are readonly it will not create the hashmap if it does not
|
||||
* already exist in the target sub-transaction.
|
||||
*/
|
||||
static HTAB *
|
||||
ParentTransactionPropagatedObjects(bool readonly)
|
||||
{
|
||||
int nestingLevel = list_length(activeSubXactContexts);
|
||||
if (nestingLevel <= 1)
|
||||
{
|
||||
/*
|
||||
* The parent is the root transaction, when there is single level sub-transaction
|
||||
* or no sub-transaction.
|
||||
*/
|
||||
if (PropagatedObjectsInTx == NULL && !readonly)
|
||||
{
|
||||
/* lazily create hashset for root transaction, for mutating uses */
|
||||
PropagatedObjectsInTx = CreateTxPropagatedObjectsHash();
|
||||
}
|
||||
return PropagatedObjectsInTx;
|
||||
}
|
||||
|
||||
/* parent is upper sub-transaction */
|
||||
Assert(nestingLevel >= 2);
|
||||
SubXactContext *state = list_nth(activeSubXactContexts, nestingLevel - 2);
|
||||
if (state->propagatedObjects == NULL && !readonly)
|
||||
{
|
||||
/* lazily create hashset for parent sub-transaction */
|
||||
state->propagatedObjects = CreateTxPropagatedObjectsHash();
|
||||
}
|
||||
return state->propagatedObjects;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MovePropagatedObjectsToParentTransaction moves all objects propagated in the current
|
||||
* sub-transaction to the parent transaction. This should only be called when there is
|
||||
* active sub-transaction.
|
||||
*/
|
||||
static void
|
||||
MovePropagatedObjectsToParentTransaction(void)
|
||||
{
|
||||
Assert(llast(activeSubXactContexts) != NULL);
|
||||
HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(true);
|
||||
if (currentPropagatedObjects == NULL)
|
||||
{
|
||||
/* nothing to move */
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Only after we know we have objects to move into the parent do we get a handle on
|
||||
* a guaranteed existing parent hash table. This makes sure that the parents only
|
||||
* get populated once there are objects to be tracked.
|
||||
*/
|
||||
HTAB *parentPropagatedObjects = ParentTransactionPropagatedObjects(false);
|
||||
|
||||
HASH_SEQ_STATUS propagatedObjectsSeq;
|
||||
hash_seq_init(&propagatedObjectsSeq, currentPropagatedObjects);
|
||||
ObjectAddress *objectAddress = NULL;
|
||||
while ((objectAddress = hash_seq_search(&propagatedObjectsSeq)) != NULL)
|
||||
{
|
||||
hash_search(parentPropagatedObjects, objectAddress, HASH_ENTER, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DependencyInPropagatedObjectsHash checks if dependency is in given hashset
|
||||
* of propagated objects.
|
||||
*/
|
||||
static bool
|
||||
DependencyInPropagatedObjectsHash(HTAB *propagatedObjects, const
|
||||
ObjectAddress *dependency)
|
||||
{
|
||||
if (propagatedObjects == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
hash_search(propagatedObjects, dependency, HASH_FIND, &found);
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTxPropagatedObjectsHash creates a hashset to keep track of the objects
|
||||
* propagated in the current root transaction or sub-transaction.
|
||||
*/
|
||||
static HTAB *
|
||||
CreateTxPropagatedObjectsHash(void)
|
||||
{
|
||||
HASHCTL info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(ObjectAddress);
|
||||
info.entrysize = sizeof(ObjectAddress);
|
||||
info.hash = tag_hash;
|
||||
info.hcxt = CitusXactCallbackContext;
|
||||
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
|
||||
return hash_create("Tx Propagated Objects", 16, &info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TrackPropagatedObject adds given object into the objects propagated in the current
|
||||
* sub-transaction.
|
||||
*/
|
||||
void
|
||||
TrackPropagatedObject(const ObjectAddress *objectAddress)
|
||||
{
|
||||
HTAB *currentPropagatedObjects = CurrentTransactionPropagatedObjects(false);
|
||||
hash_search(currentPropagatedObjects, objectAddress, HASH_ENTER, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TrackPropagatedTableAndSequences adds given table and its sequences to the objects
|
||||
* propagated in the current sub-transaction.
|
||||
*/
|
||||
void
|
||||
TrackPropagatedTableAndSequences(Oid relationId)
|
||||
{
|
||||
/* track table */
|
||||
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||
TrackPropagatedObject(tableAddress);
|
||||
|
||||
/* track its sequences */
|
||||
List *ownedSeqIdList = getOwnedSequences(relationId);
|
||||
Oid ownedSeqId = InvalidOid;
|
||||
foreach_oid(ownedSeqId, ownedSeqIdList)
|
||||
{
|
||||
ObjectAddress *seqAddress = palloc0(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*seqAddress, RelationRelationId, ownedSeqId);
|
||||
TrackPropagatedObject(seqAddress);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetPropagatedObjects destroys hashset of propagated objects in the root transaction.
|
||||
*/
|
||||
void
|
||||
ResetPropagatedObjects(void)
|
||||
{
|
||||
hash_destroy(PropagatedObjectsInTx);
|
||||
PropagatedObjectsInTx = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasAnyDependencyInPropagatedObjects decides if any dependency of given object is
|
||||
* propagated in the current transaction.
|
||||
*/
|
||||
bool
|
||||
HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress)
|
||||
{
|
||||
List *dependencyList = GetAllSupportedDependenciesForObject(objectAddress);
|
||||
ObjectAddress *dependency = NULL;
|
||||
foreach_ptr(dependency, dependencyList)
|
||||
{
|
||||
/* first search in root transaction */
|
||||
if (DependencyInPropagatedObjectsHash(PropagatedObjectsInTx, dependency))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/* search in all nested sub-transactions */
|
||||
if (activeSubXactContexts == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
SubXactContext *state = NULL;
|
||||
foreach_ptr(state, activeSubXactContexts)
|
||||
{
|
||||
if (DependencyInPropagatedObjectsHash(state->propagatedObjects, dependency))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -135,6 +135,21 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendCommandListToWorkersWithMetadata sends all commands to all metadata workers
|
||||
* with the current user. See `SendCommandToWorkersWithMetadata`for details.
|
||||
*/
|
||||
void
|
||||
SendCommandListToWorkersWithMetadata(List *commands)
|
||||
{
|
||||
char *command = NULL;
|
||||
foreach_ptr(command, commands)
|
||||
{
|
||||
SendCommandToWorkersWithMetadata(command);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
|
||||
* TargetWorkerSet.
|
||||
|
|
|
@ -10,11 +10,13 @@
|
|||
#define TRANSACTION_MANAGMENT_H
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "catalog/objectaddress.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
||||
/* forward declare, to avoid recursive includes */
|
||||
struct DistObjectCacheEntry;
|
||||
|
@ -58,6 +60,7 @@ typedef struct SubXactContext
|
|||
{
|
||||
SubTransactionId subId;
|
||||
StringInfo setLocalCmds;
|
||||
HTAB *propagatedObjects;
|
||||
} SubXactContext;
|
||||
|
||||
/*
|
||||
|
@ -157,6 +160,11 @@ extern bool IsMultiStatementTransaction(void);
|
|||
extern void EnsureDistributedTransactionId(void);
|
||||
extern bool MaybeExecutingUDF(void);
|
||||
|
||||
/* functions for tracking the objects propagated in current transaction */
|
||||
extern void TrackPropagatedObject(const ObjectAddress *objectAddress);
|
||||
extern void TrackPropagatedTableAndSequences(Oid relationId);
|
||||
extern void ResetPropagatedObjects(void);
|
||||
extern bool HasAnyDependencyInPropagatedObjects(const ObjectAddress *objectAddress);
|
||||
|
||||
/* initialization function(s) */
|
||||
extern void InitializeTransactionManagement(void);
|
||||
|
|
|
@ -73,6 +73,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
|
|||
commandList);
|
||||
extern void SendCommandToWorkersWithMetadata(const char *command);
|
||||
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
|
||||
extern void SendCommandListToWorkersWithMetadata(List *commands);
|
||||
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
||||
extern void EnsureNoModificationsHaveBeenDone(void);
|
||||
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
||||
|
|
|
@ -168,6 +168,7 @@ DEPS = {
|
|||
],
|
||||
),
|
||||
"grant_on_schema_propagation": TestDeps("minimal_schedule"),
|
||||
"propagate_extension_commands": TestDeps("minimal_schedule"),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -947,3 +947,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist;
|
|||
NOTICE: type "domain_does_not_exist" does not exist, skipping
|
||||
SET client_min_messages TO warning;
|
||||
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;
|
||||
DROP ROLE domain_owner;
|
||||
|
|
|
@ -1393,6 +1393,284 @@ BEGIN;
|
|||
|
||||
ALTER SCHEMA bar RENAME TO foo;
|
||||
ROLLBACK;
|
||||
-- below tests are to verify dependency propagation with nested sub-transactions
|
||||
-- TEST1
|
||||
BEGIN;
|
||||
CREATE SCHEMA sc1;
|
||||
CREATE SEQUENCE sc1.seq;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to sequence sc1.seq
|
||||
drop cascades to table sc1.s1
|
||||
-- TEST2
|
||||
CREATE SCHEMA sc1;
|
||||
BEGIN;
|
||||
CREATE SEQUENCE sc1.seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to sequence sc1.seq1
|
||||
drop cascades to table sc1.s1
|
||||
-- TEST3
|
||||
SET citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET citus.enable_metadata_sync TO on;
|
||||
BEGIN;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
-- TEST4
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp1;
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
-- TEST5
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SEQUENCE seq1;
|
||||
-- TEST6
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp2;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
-- TEST7
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SEQUENCE seq1;
|
||||
-- TEST8
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
ROLLBACK TO SAVEPOINT sp1;
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
-- TEST9
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
ROLLBACK TO SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SEQUENCE seq1;
|
||||
-- TEST10
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
-- TEST11
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
-- TEST12
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE SCHEMA sc1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RELEASE SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
NOTICE: drop cascades to table sc1.s1
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
-- issue-6614
|
||||
CREATE FUNCTION create_schema_test() RETURNS void AS $$
|
||||
BEGIN
|
||||
SET citus.create_object_propagation = 'deferred';
|
||||
CREATE SCHEMA test_1;
|
||||
CREATE TABLE test_1.test (
|
||||
id bigserial constraint test_pk primary key,
|
||||
creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null
|
||||
);
|
||||
PERFORM create_reference_table('test_1.test');
|
||||
RETURN;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_schema_test();
|
||||
create_schema_test
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
t
|
||||
(3 rows)
|
||||
|
||||
DROP FUNCTION create_schema_test;
|
||||
DROP SCHEMA test_1 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table test_1.test
|
||||
drop cascades to table test_1.test_1197064
|
||||
-- Clean up the created schema
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object
|
||||
|
|
|
@ -487,3 +487,4 @@ DROP DOMAIN IF EXISTS domain_does_not_exist;
|
|||
|
||||
SET client_min_messages TO warning;
|
||||
DROP SCHEMA distributed_domain, distributed_domain_moved CASCADE;
|
||||
DROP ROLE domain_owner;
|
||||
|
|
|
@ -995,6 +995,219 @@ BEGIN;
|
|||
ALTER SCHEMA bar RENAME TO foo;
|
||||
ROLLBACK;
|
||||
|
||||
-- below tests are to verify dependency propagation with nested sub-transactions
|
||||
-- TEST1
|
||||
BEGIN;
|
||||
CREATE SCHEMA sc1;
|
||||
CREATE SEQUENCE sc1.seq;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST2
|
||||
CREATE SCHEMA sc1;
|
||||
BEGIN;
|
||||
CREATE SEQUENCE sc1.seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('sc1.seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST3
|
||||
SET citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET citus.enable_metadata_sync TO on;
|
||||
BEGIN;
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST4
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp1;
|
||||
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST5
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
|
||||
-- TEST6
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp2;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST7
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
|
||||
-- TEST8
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
ROLLBACK TO SAVEPOINT sp1;
|
||||
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
|
||||
-- TEST9
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
ROLLBACK TO SAVEPOINT sp2;
|
||||
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
|
||||
-- TEST10
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SCHEMA sc1;
|
||||
ROLLBACK TO SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
SET LOCAL citus.enable_metadata_sync TO off;
|
||||
CREATE SCHEMA sc1;
|
||||
SET LOCAL citus.enable_metadata_sync TO on;
|
||||
|
||||
CREATE TABLE sc1.s1(id int);
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
|
||||
-- TEST11
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SCHEMA sc1;
|
||||
RELEASE SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
|
||||
-- TEST12
|
||||
BEGIN;
|
||||
SAVEPOINT sp1;
|
||||
SAVEPOINT sp2;
|
||||
CREATE SCHEMA sc2;
|
||||
RELEASE SAVEPOINT sp2;
|
||||
SAVEPOINT sp3;
|
||||
CREATE SCHEMA sc3;
|
||||
SAVEPOINT sp4;
|
||||
CREATE SEQUENCE seq1;
|
||||
CREATE SCHEMA sc1;
|
||||
CREATE TABLE sc1.s1(id int default(nextval('seq1')));
|
||||
SELECT create_distributed_table('sc1.s1','id');
|
||||
RELEASE SAVEPOINT sp4;
|
||||
RELEASE SAVEPOINT sp3;
|
||||
RELEASE SAVEPOINT sp1;
|
||||
COMMIT;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
DROP SCHEMA sc3 CASCADE;
|
||||
DROP SEQUENCE seq1;
|
||||
|
||||
-- issue-6614
|
||||
CREATE FUNCTION create_schema_test() RETURNS void AS $$
|
||||
BEGIN
|
||||
SET citus.create_object_propagation = 'deferred';
|
||||
CREATE SCHEMA test_1;
|
||||
CREATE TABLE test_1.test (
|
||||
id bigserial constraint test_pk primary key,
|
||||
creation_date timestamp constraint test_creation_date_df default timezone('UTC'::text, CURRENT_TIMESTAMP) not null
|
||||
);
|
||||
PERFORM create_reference_table('test_1.test');
|
||||
RETURN;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
SELECT create_schema_test();
|
||||
SELECT result FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = 'test_1.test'::regclass $$);
|
||||
DROP FUNCTION create_schema_test;
|
||||
DROP SCHEMA test_1 CASCADE;
|
||||
|
||||
-- Clean up the created schema
|
||||
SET client_min_messages TO WARNING;
|
||||
|
||||
|
|
Loading…
Reference in New Issue