mirror of https://github.com/citusdata/citus.git
refactor code where GetObjectAddressFromParseTree is called because it returns list of addresses now
parent
9d232d7b00
commit
ebb6d1c8c0
|
@ -1302,7 +1302,7 @@ ErrorIfUnsupportedCascadeObjects(Oid relationId)
|
||||||
*
|
*
|
||||||
* Extension dependency is different than the rest. If an object depends on an extension
|
* Extension dependency is different than the rest. If an object depends on an extension
|
||||||
* dropping the object would drop the extension too.
|
* dropping the object would drop the extension too.
|
||||||
* So we check with IsObjectAddressOwnedByExtension function.
|
* So we check with IsAnyObjectAddressOwnedByExtension function.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
DoesCascadeDropUnsupportedObject(Oid classId, Oid objectId, HTAB *nodeMap)
|
DoesCascadeDropUnsupportedObject(Oid classId, Oid objectId, HTAB *nodeMap)
|
||||||
|
@ -1315,10 +1315,9 @@ DoesCascadeDropUnsupportedObject(Oid classId, Oid objectId, HTAB *nodeMap)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress objectAddress = { 0 };
|
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(objectAddress, classId, objectId);
|
ObjectAddressSet(*objectAddress, classId, objectId);
|
||||||
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(objectAddress), NULL))
|
||||||
if (IsObjectAddressOwnedByExtension(&objectAddress, NULL))
|
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -307,8 +307,8 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress tableAddress = { 0 };
|
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Ensure that the sequences used in column defaults of the table
|
* Ensure that the sequences used in column defaults of the table
|
||||||
|
@ -320,7 +320,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
|
||||||
* Ensure dependencies exist as we will create shell table on the other nodes
|
* Ensure dependencies exist as we will create shell table on the other nodes
|
||||||
* in the MX case.
|
* in the MX case.
|
||||||
*/
|
*/
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that existing reference tables have been replicated to all
|
* Make sure that existing reference tables have been replicated to all
|
||||||
|
|
|
@ -61,22 +61,26 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
|
List *addresses = GetObjectAddressListFromParseTree(stmt, false);
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
EnsureSequentialMode(ops->objectType);
|
EnsureSequentialMode(ops->objectType);
|
||||||
|
|
||||||
/* If the object has any unsupported dependency warn, and only create locally */
|
/* If the object has any unsupported dependency warn, and only create locally */
|
||||||
DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(&address);
|
DeferredErrorMessage *depError = DeferErrorIfAnyObjectHasUnsupportedDependency(
|
||||||
|
addresses);
|
||||||
if (depError != NULL)
|
if (depError != NULL)
|
||||||
{
|
{
|
||||||
RaiseDeferredError(depError, WARNING);
|
RaiseDeferredError(depError, WARNING);
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&address);
|
EnsureAllObjectDependenciesExistOnAllNodes(addresses);
|
||||||
|
|
||||||
List *commands = GetDependencyCreateDDLCommands(&address);
|
List *commands = GetAllDependencyCreateDDLCommands(addresses);
|
||||||
|
|
||||||
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
|
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
|
||||||
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
|
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
|
||||||
|
@ -111,8 +115,12 @@ PreprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString,
|
||||||
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
|
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
|
||||||
Assert(ops != NULL);
|
Assert(ops != NULL);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
|
List *addresses = GetObjectAddressListFromParseTree(stmt, false);
|
||||||
if (!ShouldPropagateObject(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -156,8 +164,12 @@ PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString)
|
||||||
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
|
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
|
||||||
Assert(ops != NULL);
|
Assert(ops != NULL);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
|
List *addresses = GetObjectAddressListFromParseTree(stmt, false);
|
||||||
if (!ShouldPropagateObject(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -168,7 +180,7 @@ PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&address);
|
EnsureAllObjectDependenciesExistOnAllNodes(addresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -223,11 +235,10 @@ PreprocessDropDistributedObjectStmt(Node *node, const char *queryString,
|
||||||
Relation rel = NULL; /* not used, but required to pass to get_object_address */
|
Relation rel = NULL; /* not used, but required to pass to get_object_address */
|
||||||
ObjectAddress address = get_object_address(stmt->removeType, object, &rel,
|
ObjectAddress address = get_object_address(stmt->removeType, object, &rel,
|
||||||
AccessShareLock, stmt->missing_ok);
|
AccessShareLock, stmt->missing_ok);
|
||||||
if (IsObjectDistributed(&address))
|
|
||||||
{
|
|
||||||
ObjectAddress *addressPtr = palloc0(sizeof(ObjectAddress));
|
ObjectAddress *addressPtr = palloc0(sizeof(ObjectAddress));
|
||||||
*addressPtr = address;
|
*addressPtr = address;
|
||||||
|
if (IsAnyObjectDistributed(list_make1(addressPtr)))
|
||||||
|
{
|
||||||
distributedObjects = lappend(distributedObjects, object);
|
distributedObjects = lappend(distributedObjects, object);
|
||||||
distributedObjectAddresses = lappend(distributedObjectAddresses, addressPtr);
|
distributedObjectAddresses = lappend(distributedObjectAddresses, addressPtr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -442,10 +442,9 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
* via their own connection and committed immediately so they become visible to all
|
* via their own connection and committed immediately so they become visible to all
|
||||||
* sessions creating shards.
|
* sessions creating shards.
|
||||||
*/
|
*/
|
||||||
ObjectAddress tableAddress = { 0 };
|
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
|
||||||
|
|
||||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
char replicationModel = DecideReplicationModel(distributionMethod,
|
||||||
colocateWithTableName,
|
colocateWithTableName,
|
||||||
|
|
|
@ -36,6 +36,9 @@ static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
|
||||||
static int ObjectAddressComparator(const void *a, const void *b);
|
static int ObjectAddressComparator(const void *a, const void *b);
|
||||||
static List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
static List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
||||||
AddressPredicate predicate);
|
AddressPredicate predicate);
|
||||||
|
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
||||||
|
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
|
||||||
|
static bool ShouldPropagateObject(const ObjectAddress *address);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
|
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
|
||||||
|
@ -51,7 +54,7 @@ static List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
||||||
* This is solved by creating the dependencies in an idempotent manner, either via
|
* This is solved by creating the dependencies in an idempotent manner, either via
|
||||||
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
|
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
||||||
{
|
{
|
||||||
List *dependenciesWithCommands = NIL;
|
List *dependenciesWithCommands = NIL;
|
||||||
|
@ -142,6 +145,21 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureAllObjectDependenciesExistOnAllNodes iteratively calls EnsureDependenciesExistOnAllNodes
|
||||||
|
* for given targets.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(const List *targets)
|
||||||
|
{
|
||||||
|
ObjectAddress *target = NULL;
|
||||||
|
foreach_ptr(target, targets)
|
||||||
|
{
|
||||||
|
EnsureDependenciesExistOnAllNodes(target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureDependenciesCanBeDistributed ensures all dependencies of the given object
|
* EnsureDependenciesCanBeDistributed ensures all dependencies of the given object
|
||||||
* can be distributed.
|
* can be distributed.
|
||||||
|
@ -153,7 +171,8 @@ EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress)
|
||||||
ErrorIfCircularDependencyExists(objectAddress);
|
ErrorIfCircularDependencyExists(objectAddress);
|
||||||
|
|
||||||
/* If the object has any unsupported dependency, error out */
|
/* If the object has any unsupported dependency, error out */
|
||||||
DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(objectAddress);
|
DeferredErrorMessage *depError = DeferErrorIfAnyObjectHasUnsupportedDependency(
|
||||||
|
list_make1((ObjectAddress *) objectAddress));
|
||||||
|
|
||||||
if (depError != NULL)
|
if (depError != NULL)
|
||||||
{
|
{
|
||||||
|
@ -310,7 +329,7 @@ GetDistributableDependenciesForObject(const ObjectAddress *target)
|
||||||
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
|
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
|
||||||
* commands to execute on a worker to create the object.
|
* commands to execute on a worker to create the object.
|
||||||
*/
|
*/
|
||||||
List *
|
static List *
|
||||||
GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
||||||
{
|
{
|
||||||
switch (getObjectClass(dependency))
|
switch (getObjectClass(dependency))
|
||||||
|
@ -488,6 +507,25 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetAllDependencyCreateDDLCommands iteratively calls GetDependencyCreateDDLCommands
|
||||||
|
* for given dependencies.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
GetAllDependencyCreateDDLCommands(const List *dependencies)
|
||||||
|
{
|
||||||
|
List *commands = NIL;
|
||||||
|
|
||||||
|
ObjectAddress *dependency = NULL;
|
||||||
|
foreach_ptr(dependency, dependencies)
|
||||||
|
{
|
||||||
|
commands = list_concat(commands, GetDependencyCreateDDLCommands(dependency));
|
||||||
|
}
|
||||||
|
|
||||||
|
return commands;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReplicateAllObjectsToNodeCommandList returns commands to replicate all
|
* ReplicateAllObjectsToNodeCommandList returns commands to replicate all
|
||||||
* previously marked objects to a worker node. The function also sets
|
* previously marked objects to a worker node. The function also sets
|
||||||
|
@ -531,7 +569,7 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
|
||||||
ObjectAddress *dependency = NULL;
|
ObjectAddress *dependency = NULL;
|
||||||
foreach_ptr(dependency, dependencies)
|
foreach_ptr(dependency, dependencies)
|
||||||
{
|
{
|
||||||
if (IsObjectAddressOwnedByExtension(dependency, NULL))
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* we expect extension-owned objects to be created as a result
|
* we expect extension-owned objects to be created as a result
|
||||||
|
@ -663,7 +701,7 @@ ShouldPropagateCreateInCoordinatedTransction()
|
||||||
* ShouldPropagateObject determines if we should be propagating DDLs based
|
* ShouldPropagateObject determines if we should be propagating DDLs based
|
||||||
* on their object address.
|
* on their object address.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
ShouldPropagateObject(const ObjectAddress *address)
|
ShouldPropagateObject(const ObjectAddress *address)
|
||||||
{
|
{
|
||||||
if (!ShouldPropagate())
|
if (!ShouldPropagate())
|
||||||
|
@ -671,7 +709,7 @@ ShouldPropagateObject(const ObjectAddress *address)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsObjectDistributed(address))
|
if (!IsAnyObjectDistributed(list_make1((ObjectAddress *) address)))
|
||||||
{
|
{
|
||||||
/* do not propagate for non-distributed types */
|
/* do not propagate for non-distributed types */
|
||||||
return false;
|
return false;
|
||||||
|
@ -681,6 +719,26 @@ ShouldPropagateObject(const ObjectAddress *address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldPropagateAnyObject determines if we should be propagating DDLs based
|
||||||
|
* on their object addresses.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ShouldPropagateAnyObject(List *addresses)
|
||||||
|
{
|
||||||
|
ObjectAddress *address = NULL;
|
||||||
|
foreach_ptr(address, addresses)
|
||||||
|
{
|
||||||
|
if (ShouldPropagateObject(address))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list
|
* FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list
|
||||||
* only containing the ObjectAddress *'s for which the predicate returned true.
|
* only containing the ObjectAddress *'s for which the predicate returned true.
|
||||||
|
|
|
@ -181,9 +181,12 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString)
|
||||||
(void *) createExtensionStmtSql,
|
(void *) createExtensionStmtSql,
|
||||||
ENABLE_DDL_PROPAGATION);
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
ObjectAddress extensionAddress = GetObjectAddressFromParseTree(node, false);
|
List *extensionAddresses = GetObjectAddressListFromParseTree(node, false);
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&extensionAddress);
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(extensionAddresses) == 1);
|
||||||
|
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(extensionAddresses);
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
}
|
}
|
||||||
|
@ -319,10 +322,9 @@ FilterDistributedExtensions(List *extensionObjectList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = { 0 };
|
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(address, ExtensionRelationId, extensionOid);
|
ObjectAddressSet(*address, ExtensionRelationId, extensionOid);
|
||||||
|
if (!IsAnyObjectDistributed(list_make1(address)))
|
||||||
if (!IsObjectDistributed(&address))
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -411,7 +413,10 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString,
|
||||||
List *
|
List *
|
||||||
PostprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
|
PostprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
|
||||||
{
|
{
|
||||||
ObjectAddress extensionAddress = GetObjectAddressFromParseTree(node, false);
|
List *extensionAddresses = GetObjectAddressListFromParseTree(node, false);
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(extensionAddresses) == 1);
|
||||||
|
|
||||||
if (!ShouldPropagateExtensionCommand(node))
|
if (!ShouldPropagateExtensionCommand(node))
|
||||||
{
|
{
|
||||||
|
@ -419,7 +424,7 @@ PostprocessAlterExtensionSchemaStmt(Node *node, const char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* dependencies (schema) have changed let's ensure they exist */
|
/* dependencies (schema) have changed let's ensure they exist */
|
||||||
EnsureDependenciesExistOnAllNodes(&extensionAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(extensionAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -504,7 +509,7 @@ PostprocessAlterExtensionCitusUpdateStmt(Node *node)
|
||||||
*
|
*
|
||||||
* Note that this function is not responsible for ensuring if dependencies exist on
|
* Note that this function is not responsible for ensuring if dependencies exist on
|
||||||
* nodes and satisfying these dependendencies if not exists, which is already done by
|
* nodes and satisfying these dependendencies if not exists, which is already done by
|
||||||
* EnsureDependenciesExistOnAllNodes on demand. Hence, this function is just designed
|
* EnsureAllObjectDependenciesExistOnAllNodes on demand. Hence, this function is just designed
|
||||||
* to be used when "ALTER EXTENSION citus UPDATE" is executed.
|
* to be used when "ALTER EXTENSION citus UPDATE" is executed.
|
||||||
* This is because we want to add existing objects that would have already been in
|
* This is because we want to add existing objects that would have already been in
|
||||||
* pg_dist_object if we had created them in new version of Citus to pg_dist_object.
|
* pg_dist_object if we had created them in new version of Citus to pg_dist_object.
|
||||||
|
|
|
@ -64,6 +64,7 @@ PreprocessGrantOnFDWStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
Assert(list_length(stmt->objects) == 1);
|
Assert(list_length(stmt->objects) == 1);
|
||||||
|
|
||||||
char *sql = DeparseTreeNode((Node *) stmt);
|
char *sql = DeparseTreeNode((Node *) stmt);
|
||||||
|
@ -87,12 +88,15 @@ NameListHasFDWOwnedByDistributedExtension(List *FDWNames)
|
||||||
foreach_ptr(FDWValue, FDWNames)
|
foreach_ptr(FDWValue, FDWNames)
|
||||||
{
|
{
|
||||||
/* captures the extension address during lookup */
|
/* captures the extension address during lookup */
|
||||||
ObjectAddress extensionAddress = { 0 };
|
ObjectAddress *extensionAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddress FDWAddress = GetObjectAddressByFDWName(strVal(FDWValue), false);
|
ObjectAddress FDWAddress = GetObjectAddressByFDWName(strVal(FDWValue), false);
|
||||||
|
|
||||||
if (IsObjectAddressOwnedByExtension(&FDWAddress, &extensionAddress))
|
ObjectAddress *copyFDWAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
*copyFDWAddress = FDWAddress;
|
||||||
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(copyFDWAddress),
|
||||||
|
extensionAddress))
|
||||||
{
|
{
|
||||||
if (IsObjectDistributed(&extensionAddress))
|
if (IsAnyObjectDistributed(list_make1(extensionAddress)))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,6 +102,7 @@ PreprocessGrantOnForeignServerStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
Assert(list_length(stmt->objects) == 1);
|
Assert(list_length(stmt->objects) == 1);
|
||||||
|
|
||||||
char *sql = DeparseTreeNode((Node *) stmt);
|
char *sql = DeparseTreeNode((Node *) stmt);
|
||||||
|
@ -247,15 +248,14 @@ NameListHasDistributedServer(List *serverNames)
|
||||||
foreach_ptr(serverValue, serverNames)
|
foreach_ptr(serverValue, serverNames)
|
||||||
{
|
{
|
||||||
List *addresses = GetObjectAddressByServerName(strVal(serverValue), false);
|
List *addresses = GetObjectAddressByServerName(strVal(serverValue), false);
|
||||||
if (list_length(addresses) > 1)
|
|
||||||
{
|
|
||||||
ereport(ERROR, errmsg(
|
|
||||||
"citus does not support multiple object addresses in NameListHasDistributedServer"));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
ObjectAddress *address = linitial(addresses);
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
|
||||||
if (IsObjectDistributed(address))
|
if (IsAnyObjectDistributed(list_make1(address)))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,7 +128,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
text *colocateWithText = NULL; /* optional */
|
text *colocateWithText = NULL; /* optional */
|
||||||
|
|
||||||
StringInfoData ddlCommand = { 0 };
|
StringInfoData ddlCommand = { 0 };
|
||||||
ObjectAddress functionAddress = { 0 };
|
ObjectAddress *functionAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
|
||||||
Oid distributionArgumentOid = InvalidOid;
|
Oid distributionArgumentOid = InvalidOid;
|
||||||
bool colocatedWithReferenceTable = false;
|
bool colocatedWithReferenceTable = false;
|
||||||
|
@ -203,9 +203,9 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
EnsureFunctionOwner(funcOid);
|
EnsureFunctionOwner(funcOid);
|
||||||
|
|
||||||
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
|
ObjectAddressSet(*functionAddress, ProcedureRelationId, funcOid);
|
||||||
|
|
||||||
if (RecreateSameNonColocatedFunction(functionAddress,
|
if (RecreateSameNonColocatedFunction(*functionAddress,
|
||||||
distributionArgumentName,
|
distributionArgumentName,
|
||||||
colocateWithTableNameDefault,
|
colocateWithTableNameDefault,
|
||||||
forceDelegationAddress))
|
forceDelegationAddress))
|
||||||
|
@ -224,9 +224,10 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
* pg_dist_object, and not propagate the CREATE FUNCTION. Function
|
* pg_dist_object, and not propagate the CREATE FUNCTION. Function
|
||||||
* will be created by the virtue of the extension creation.
|
* will be created by the virtue of the extension creation.
|
||||||
*/
|
*/
|
||||||
if (IsObjectAddressOwnedByExtension(&functionAddress, &extensionAddress))
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(functionAddress),
|
||||||
|
&extensionAddress))
|
||||||
{
|
{
|
||||||
EnsureExtensionFunctionCanBeDistributed(functionAddress, extensionAddress,
|
EnsureExtensionFunctionCanBeDistributed(*functionAddress, extensionAddress,
|
||||||
distributionArgumentName);
|
distributionArgumentName);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -237,7 +238,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
EnsureSequentialMode(OBJECT_FUNCTION);
|
EnsureSequentialMode(OBJECT_FUNCTION);
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&functionAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(functionAddress));
|
||||||
|
|
||||||
const char *createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
|
const char *createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
|
||||||
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
|
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
|
||||||
|
@ -257,7 +258,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
ddlCommand.data);
|
ddlCommand.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
MarkObjectDistributed(&functionAddress);
|
MarkObjectDistributed(functionAddress);
|
||||||
|
|
||||||
if (distributionArgumentName != NULL)
|
if (distributionArgumentName != NULL)
|
||||||
{
|
{
|
||||||
|
@ -272,12 +273,12 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
distributionArgumentOid,
|
distributionArgumentOid,
|
||||||
colocateWithTableName,
|
colocateWithTableName,
|
||||||
forceDelegationAddress,
|
forceDelegationAddress,
|
||||||
&functionAddress);
|
functionAddress);
|
||||||
}
|
}
|
||||||
else if (!colocatedWithReferenceTable)
|
else if (!colocatedWithReferenceTable)
|
||||||
{
|
{
|
||||||
DistributeFunctionColocatedWithDistributedTable(funcOid, colocateWithTableName,
|
DistributeFunctionColocatedWithDistributedTable(funcOid, colocateWithTableName,
|
||||||
&functionAddress);
|
functionAddress);
|
||||||
}
|
}
|
||||||
else if (colocatedWithReferenceTable)
|
else if (colocatedWithReferenceTable)
|
||||||
{
|
{
|
||||||
|
@ -288,7 +289,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
ErrorIfAnyNodeDoesNotHaveMetadata();
|
ErrorIfAnyNodeDoesNotHaveMetadata();
|
||||||
|
|
||||||
DistributeFunctionColocatedWithReferenceTable(&functionAddress);
|
DistributeFunctionColocatedWithReferenceTable(functionAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
@ -1308,7 +1309,7 @@ ShouldPropagateAlterFunction(const ObjectAddress *address)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsObjectDistributed(address))
|
if (!IsAnyObjectDistributed(list_make1((ObjectAddress *) address)))
|
||||||
{
|
{
|
||||||
/* do not propagate alter function for non-distributed functions */
|
/* do not propagate alter function for non-distributed functions */
|
||||||
return false;
|
return false;
|
||||||
|
@ -1373,15 +1374,19 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress functionAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *functionAddresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
|
|
||||||
if (IsObjectAddressOwnedByExtension(&functionAddress, NULL))
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(functionAddresses) == 1);
|
||||||
|
|
||||||
|
if (IsAnyObjectAddressOwnedByExtension(functionAddresses, NULL))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the function has any unsupported dependency, create it locally */
|
/* If the function has any unsupported dependency, create it locally */
|
||||||
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&functionAddress);
|
DeferredErrorMessage *errMsg = DeferErrorIfAnyObjectHasUnsupportedDependency(
|
||||||
|
functionAddresses);
|
||||||
|
|
||||||
if (errMsg != NULL)
|
if (errMsg != NULL)
|
||||||
{
|
{
|
||||||
|
@ -1389,11 +1394,14 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&functionAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(functionAddresses);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *functionAddress = linitial(functionAddresses);
|
||||||
|
|
||||||
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
|
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
|
||||||
commands = list_concat(commands, CreateFunctionDDLCommandsIdempotent(
|
commands = list_concat(commands, CreateFunctionDDLCommandsIdempotent(
|
||||||
&functionAddress));
|
functionAddress));
|
||||||
commands = list_concat(commands, list_make1(ENABLE_DDL_PROPAGATION));
|
commands = list_concat(commands, list_make1(ENABLE_DDL_PROPAGATION));
|
||||||
|
|
||||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||||
|
@ -1494,8 +1502,15 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString,
|
||||||
AlterFunctionStmt *stmt = castNode(AlterFunctionStmt, node);
|
AlterFunctionStmt *stmt = castNode(AlterFunctionStmt, node);
|
||||||
AssertObjectTypeIsFunctional(stmt->objtype);
|
AssertObjectTypeIsFunctional(stmt->objtype);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
if (!ShouldPropagateAlterFunction(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAlterFunction(address))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -1549,20 +1564,26 @@ PreprocessAlterFunctionDependsStmt(Node *node, const char *queryString,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
if (!IsObjectDistributed(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!IsAnyObjectDistributed(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Distributed objects should not start depending on an extension, this will break
|
* Distributed objects should not start depending on an extension, this will break
|
||||||
* the dependency resolving mechanism we use to replicate distributed objects to new
|
* the dependency resolving mechanism we use to replicate distributed objects to new
|
||||||
* workers
|
* workers
|
||||||
*/
|
*/
|
||||||
|
|
||||||
const char *functionName =
|
const char *functionName =
|
||||||
getObjectIdentity_compat(&address, /* missingOk: */ false);
|
getObjectIdentity_compat(address, /* missingOk: */ false);
|
||||||
ereport(ERROR, (errmsg("distrtibuted functions are not allowed to depend on an "
|
ereport(ERROR, (errmsg("distrtibuted functions are not allowed to depend on an "
|
||||||
"extension"),
|
"extension"),
|
||||||
errdetail("Function \"%s\" is already distributed. Functions from "
|
errdetail("Function \"%s\" is already distributed. Functions from "
|
||||||
|
@ -1920,7 +1941,7 @@ EnsureExtensionFunctionCanBeDistributed(const ObjectAddress functionAddress,
|
||||||
/*
|
/*
|
||||||
* Ensure corresponding extension is in pg_dist_object.
|
* Ensure corresponding extension is in pg_dist_object.
|
||||||
* Functions owned by an extension are depending internally on that extension,
|
* Functions owned by an extension are depending internally on that extension,
|
||||||
* hence EnsureDependenciesExistOnAllNodes() creates the extension, which in
|
* hence EnsureAllObjectDependenciesExistOnAllNodes() creates the extension, which in
|
||||||
* turn creates the function, and thus we don't have to create it ourself like
|
* turn creates the function, and thus we don't have to create it ourself like
|
||||||
* we do for non-extension functions.
|
* we do for non-extension functions.
|
||||||
*/
|
*/
|
||||||
|
@ -1930,7 +1951,9 @@ EnsureExtensionFunctionCanBeDistributed(const ObjectAddress functionAddress,
|
||||||
get_extension_name(extensionAddress.objectId),
|
get_extension_name(extensionAddress.objectId),
|
||||||
get_func_name(functionAddress.objectId))));
|
get_func_name(functionAddress.objectId))));
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&functionAddress);
|
ObjectAddress *copyFunctionAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
*copyFunctionAddress = functionAddress;
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(copyFunctionAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2004,7 +2027,7 @@ PostprocessGrantOnFunctionStmt(Node *node, const char *queryString)
|
||||||
ObjectAddress *functionAddress = NULL;
|
ObjectAddress *functionAddress = NULL;
|
||||||
foreach_ptr(functionAddress, distributedFunctions)
|
foreach_ptr(functionAddress, distributedFunctions)
|
||||||
{
|
{
|
||||||
EnsureDependenciesExistOnAllNodes(functionAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(functionAddress));
|
||||||
}
|
}
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -2083,7 +2106,7 @@ FilterDistributedFunctions(GrantStmt *grantStmt)
|
||||||
* if this function from GRANT .. ON FUNCTION .. is a distributed
|
* if this function from GRANT .. ON FUNCTION .. is a distributed
|
||||||
* function, add it to the list
|
* function, add it to the list
|
||||||
*/
|
*/
|
||||||
if (IsObjectDistributed(functionAddress))
|
if (IsAnyObjectDistributed(list_make1(functionAddress)))
|
||||||
{
|
{
|
||||||
grantFunctionList = lappend(grantFunctionList, functionAddress);
|
grantFunctionList = lappend(grantFunctionList, functionAddress);
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,9 +238,9 @@ CollectGrantTableIdList(GrantStmt *grantStmt)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check for distributed sequences included in GRANT ON TABLE statement */
|
/* check for distributed sequences included in GRANT ON TABLE statement */
|
||||||
ObjectAddress sequenceAddress = { 0 };
|
ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(sequenceAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*sequenceAddress, RelationRelationId, relationId);
|
||||||
if (IsObjectDistributed(&sequenceAddress))
|
if (IsAnyObjectDistributed(list_make1(sequenceAddress)))
|
||||||
{
|
{
|
||||||
grantTableList = lappend_oid(grantTableList, relationId);
|
grantTableList = lappend_oid(grantTableList, relationId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -761,9 +761,9 @@ PostprocessIndexStmt(Node *node, const char *queryString)
|
||||||
Oid indexRelationId = get_relname_relid(indexStmt->idxname, schemaId);
|
Oid indexRelationId = get_relname_relid(indexStmt->idxname, schemaId);
|
||||||
|
|
||||||
/* ensure dependencies of index exist on all nodes */
|
/* ensure dependencies of index exist on all nodes */
|
||||||
ObjectAddress address = { 0 };
|
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(address, RelationRelationId, indexRelationId);
|
ObjectAddressSet(*address, RelationRelationId, indexRelationId);
|
||||||
EnsureDependenciesExistOnAllNodes(&address);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(address));
|
||||||
|
|
||||||
/* furtheron we are only processing CONCURRENT index statements */
|
/* furtheron we are only processing CONCURRENT index statements */
|
||||||
if (!indexStmt->concurrent)
|
if (!indexStmt->concurrent)
|
||||||
|
@ -772,7 +772,7 @@ PostprocessIndexStmt(Node *node, const char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureDependenciesExistOnAllNodes could have distributed objects that are required
|
* EnsureAllObjectDependenciesExistOnAllNodes could have distributed objects that are required
|
||||||
* by this index. During the propagation process an active snapshout might be left as
|
* by this index. During the propagation process an active snapshout might be left as
|
||||||
* a side effect of inserting the local tuples via SPI. To not leak a snapshot like
|
* a side effect of inserting the local tuples via SPI. To not leak a snapshot like
|
||||||
* that we will pop any snapshot if we have any right before we commit.
|
* that we will pop any snapshot if we have any right before we commit.
|
||||||
|
|
|
@ -137,8 +137,12 @@ RoleSpecToObjectAddress(RoleSpec *role, bool missing_ok)
|
||||||
List *
|
List *
|
||||||
PostprocessAlterRoleStmt(Node *node, const char *queryString)
|
PostprocessAlterRoleStmt(Node *node, const char *queryString)
|
||||||
{
|
{
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(node, false);
|
List *addresses = GetObjectAddressListFromParseTree(node, false);
|
||||||
if (!ShouldPropagateObject(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -208,14 +212,17 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(node, false);
|
List *addresses = GetObjectAddressListFromParseTree(node, false);
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* stmt->role could be NULL when the statement is on 'ALL' roles, we do propagate for
|
* stmt->role could be NULL when the statement is on 'ALL' roles, we do propagate for
|
||||||
* ALL roles. If it is not NULL the role is for a specific role. If that role is not
|
* ALL roles. If it is not NULL the role is for a specific role. If that role is not
|
||||||
* distributed we will not propagate the statement
|
* distributed we will not propagate the statement
|
||||||
*/
|
*/
|
||||||
if (stmt->role != NULL && !IsObjectDistributed(&address))
|
if (stmt->role != NULL && !IsAnyObjectDistributed(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -1056,7 +1063,6 @@ FilterDistributedRoles(List *roles)
|
||||||
foreach_ptr(roleNode, roles)
|
foreach_ptr(roleNode, roles)
|
||||||
{
|
{
|
||||||
RoleSpec *role = castNode(RoleSpec, roleNode);
|
RoleSpec *role = castNode(RoleSpec, roleNode);
|
||||||
ObjectAddress roleAddress = { 0 };
|
|
||||||
Oid roleOid = get_rolespec_oid(role, true);
|
Oid roleOid = get_rolespec_oid(role, true);
|
||||||
if (roleOid == InvalidOid)
|
if (roleOid == InvalidOid)
|
||||||
{
|
{
|
||||||
|
@ -1066,8 +1072,9 @@ FilterDistributedRoles(List *roles)
|
||||||
*/
|
*/
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid);
|
ObjectAddress *roleAddress = palloc0(sizeof(ObjectAddress));
|
||||||
if (IsObjectDistributed(&roleAddress))
|
ObjectAddressSet(*roleAddress, AuthIdRelationId, roleOid);
|
||||||
|
if (IsAnyObjectDistributed(list_make1(roleAddress)))
|
||||||
{
|
{
|
||||||
distributedRoles = lappend(distributedRoles, role);
|
distributedRoles = lappend(distributedRoles, role);
|
||||||
}
|
}
|
||||||
|
@ -1137,12 +1144,13 @@ PostprocessGrantRoleStmt(Node *node, const char *queryString)
|
||||||
RoleSpec *role = NULL;
|
RoleSpec *role = NULL;
|
||||||
foreach_ptr(role, stmt->grantee_roles)
|
foreach_ptr(role, stmt->grantee_roles)
|
||||||
{
|
{
|
||||||
ObjectAddress roleAddress = { 0 };
|
|
||||||
Oid roleOid = get_rolespec_oid(role, false);
|
Oid roleOid = get_rolespec_oid(role, false);
|
||||||
ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid);
|
ObjectAddress *roleAddress = palloc0(sizeof(ObjectAddress));
|
||||||
if (IsObjectDistributed(&roleAddress))
|
ObjectAddressSet(*roleAddress, AuthIdRelationId, roleOid);
|
||||||
|
|
||||||
|
if (IsAnyObjectDistributed(list_make1(roleAddress)))
|
||||||
{
|
{
|
||||||
EnsureDependenciesExistOnAllNodes(&roleAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(roleAddress));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NIL;
|
return NIL;
|
||||||
|
|
|
@ -259,10 +259,9 @@ FilterDistributedSchemas(List *schemas)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = { 0 };
|
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(address, NamespaceRelationId, schemaOid);
|
ObjectAddressSet(*address, NamespaceRelationId, schemaOid);
|
||||||
|
if (!IsAnyObjectDistributed(list_make1(address)))
|
||||||
if (!IsObjectDistributed(&address))
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -268,18 +268,16 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
Oid seqOid = RangeVarGetRelid(seq, NoLock, stmt->missing_ok);
|
Oid seqOid = RangeVarGetRelid(seq, NoLock, stmt->missing_ok);
|
||||||
|
|
||||||
ObjectAddress sequenceAddress = { 0 };
|
ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(sequenceAddress, RelationRelationId, seqOid);
|
ObjectAddressSet(*sequenceAddress, RelationRelationId, seqOid);
|
||||||
|
if (!IsAnyObjectDistributed(list_make1(sequenceAddress)))
|
||||||
if (!IsObjectDistributed(&sequenceAddress))
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* collect information for all distributed sequences */
|
/* collect information for all distributed sequences */
|
||||||
ObjectAddress *addressp = palloc(sizeof(ObjectAddress));
|
distributedSequenceAddresses = lappend(distributedSequenceAddresses,
|
||||||
*addressp = sequenceAddress;
|
sequenceAddress);
|
||||||
distributedSequenceAddresses = lappend(distributedSequenceAddresses, addressp);
|
|
||||||
distributedSequencesList = lappend(distributedSequencesList, objectNameList);
|
distributedSequencesList = lappend(distributedSequencesList, objectNameList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,10 +332,13 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility
|
||||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||||
Assert(stmt->renameType == OBJECT_SEQUENCE);
|
Assert(stmt->renameType == OBJECT_SEQUENCE);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt,
|
||||||
stmt->missing_ok);
|
stmt->missing_ok);
|
||||||
|
|
||||||
if (!ShouldPropagateObject(&address))
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -395,21 +396,27 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
|
||||||
{
|
{
|
||||||
AlterSeqStmt *stmt = castNode(AlterSeqStmt, node);
|
AlterSeqStmt *stmt = castNode(AlterSeqStmt, node);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt,
|
||||||
stmt->missing_ok);
|
stmt->missing_ok);
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
/* error out if the sequence is distributed */
|
/* error out if the sequence is distributed */
|
||||||
if (IsObjectDistributed(&address))
|
if (IsAnyObjectDistributed(addresses))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg(
|
ereport(ERROR, (errmsg(
|
||||||
"Altering a distributed sequence is currently not supported.")));
|
"Altering a distributed sequence is currently not supported.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* error out if the sequence is used in a distributed table
|
* error out if the sequence is used in a distributed table
|
||||||
* and this is an ALTER SEQUENCE .. AS .. statement
|
* and this is an ALTER SEQUENCE .. AS .. statement
|
||||||
*/
|
*/
|
||||||
Oid citusTableId = SequenceUsedInDistributedTable(&address);
|
Oid citusTableId = SequenceUsedInDistributedTable(address);
|
||||||
if (citusTableId != InvalidOid)
|
if (citusTableId != InvalidOid)
|
||||||
{
|
{
|
||||||
List *options = stmt->options;
|
List *options = stmt->options;
|
||||||
|
@ -463,6 +470,7 @@ SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return InvalidOid;
|
return InvalidOid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -498,9 +506,13 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
|
||||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||||
Assert(stmt->objectType == OBJECT_SEQUENCE);
|
Assert(stmt->objectType == OBJECT_SEQUENCE);
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt,
|
||||||
stmt->missing_ok);
|
stmt->missing_ok);
|
||||||
if (!ShouldPropagateObject(&address))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -572,16 +584,19 @@ PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString)
|
||||||
{
|
{
|
||||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||||
Assert(stmt->objectType == OBJECT_SEQUENCE);
|
Assert(stmt->objectType == OBJECT_SEQUENCE);
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt,
|
||||||
stmt->missing_ok);
|
stmt->missing_ok);
|
||||||
|
|
||||||
if (!ShouldPropagateObject(&address))
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(addresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* dependencies have changed (schema) let's ensure they exist */
|
/* dependencies have changed (schema) let's ensure they exist */
|
||||||
EnsureDependenciesExistOnAllNodes(&address);
|
EnsureAllObjectDependenciesExistOnAllNodes(addresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -601,8 +616,12 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
|
||||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||||
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_SEQUENCE);
|
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_SEQUENCE);
|
||||||
|
|
||||||
ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *sequenceAddresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
if (!ShouldPropagateObject(&sequenceAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(sequenceAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(sequenceAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -649,14 +668,18 @@ PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString)
|
||||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||||
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_SEQUENCE);
|
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_SEQUENCE);
|
||||||
|
|
||||||
ObjectAddress sequenceAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *sequenceAddresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
if (!ShouldPropagateObject(&sequenceAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(sequenceAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(sequenceAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* dependencies have changed (owner) let's ensure they exist */
|
/* dependencies have changed (owner) let's ensure they exist */
|
||||||
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(sequenceAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -744,10 +767,10 @@ PostprocessGrantOnSequenceStmt(Node *node, const char *queryString)
|
||||||
RangeVar *sequence = NULL;
|
RangeVar *sequence = NULL;
|
||||||
foreach_ptr(sequence, distributedSequences)
|
foreach_ptr(sequence, distributedSequences)
|
||||||
{
|
{
|
||||||
ObjectAddress sequenceAddress = { 0 };
|
ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress));
|
||||||
Oid sequenceOid = RangeVarGetRelid(sequence, NoLock, false);
|
Oid sequenceOid = RangeVarGetRelid(sequence, NoLock, false);
|
||||||
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
|
ObjectAddressSet(*sequenceAddress, RelationRelationId, sequenceOid);
|
||||||
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(sequenceAddress));
|
||||||
}
|
}
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -866,15 +889,15 @@ FilterDistributedSequences(GrantStmt *stmt)
|
||||||
RangeVar *sequenceRangeVar = NULL;
|
RangeVar *sequenceRangeVar = NULL;
|
||||||
foreach_ptr(sequenceRangeVar, stmt->objects)
|
foreach_ptr(sequenceRangeVar, stmt->objects)
|
||||||
{
|
{
|
||||||
ObjectAddress sequenceAddress = { 0 };
|
|
||||||
Oid sequenceOid = RangeVarGetRelid(sequenceRangeVar, NoLock, missing_ok);
|
Oid sequenceOid = RangeVarGetRelid(sequenceRangeVar, NoLock, missing_ok);
|
||||||
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
|
ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
ObjectAddressSet(*sequenceAddress, RelationRelationId, sequenceOid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if this sequence from GRANT .. ON SEQUENCE .. is a distributed
|
* if this sequence from GRANT .. ON SEQUENCE .. is a distributed
|
||||||
* sequence, add it to the list
|
* sequence, add it to the list
|
||||||
*/
|
*/
|
||||||
if (IsObjectDistributed(&sequenceAddress))
|
if (IsAnyObjectDistributed(list_make1(sequenceAddress)))
|
||||||
{
|
{
|
||||||
grantSequenceList = lappend(grantSequenceList, sequenceRangeVar);
|
grantSequenceList = lappend(grantSequenceList, sequenceRangeVar);
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,9 +122,12 @@ PostprocessCreateStatisticsStmt(Node *node, const char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
bool missingOk = false;
|
bool missingOk = false;
|
||||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree((Node *) stmt, missingOk);
|
List *objectAddresses = GetObjectAddressListFromParseTree((Node *) stmt, missingOk);
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(objectAddresses) == 1);
|
||||||
|
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(objectAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -306,9 +309,12 @@ PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString)
|
||||||
}
|
}
|
||||||
|
|
||||||
bool missingOk = false;
|
bool missingOk = false;
|
||||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree((Node *) stmt, missingOk);
|
List *objectAddresses = GetObjectAddressListFromParseTree((Node *) stmt, missingOk);
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(objectAddresses) == 1);
|
||||||
|
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(objectAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -449,10 +455,9 @@ PostprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress statisticsAddress = { 0 };
|
ObjectAddress *statisticsAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(statisticsAddress, StatisticExtRelationId, statsOid);
|
ObjectAddressSet(*statisticsAddress, StatisticExtRelationId, statsOid);
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(statisticsAddress));
|
||||||
EnsureDependenciesExistOnAllNodes(&statisticsAddress);
|
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -649,13 +649,19 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||||
/*
|
/*
|
||||||
* We will let Postgres deal with missing_ok
|
* We will let Postgres deal with missing_ok
|
||||||
*/
|
*/
|
||||||
ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *tableAddresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(tableAddress) == 1);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *tableAddress = linitial(tableAddresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check whether we are dealing with a sequence or view here and route queries
|
* Check whether we are dealing with a sequence or view here and route queries
|
||||||
* accordingly to the right processor function.
|
* accordingly to the right processor function.
|
||||||
*/
|
*/
|
||||||
char relKind = get_rel_relkind(tableAddress.objectId);
|
char relKind = get_rel_relkind(tableAddress->objectId);
|
||||||
if (relKind == RELKIND_SEQUENCE)
|
if (relKind == RELKIND_SEQUENCE)
|
||||||
{
|
{
|
||||||
stmt->objectType = OBJECT_SEQUENCE;
|
stmt->objectType = OBJECT_SEQUENCE;
|
||||||
|
@ -667,12 +673,12 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||||
return PostprocessAlterViewSchemaStmt((Node *) stmt, queryString);
|
return PostprocessAlterViewSchemaStmt((Node *) stmt, queryString);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ShouldPropagate() || !IsCitusTable(tableAddress.objectId))
|
if (!ShouldPropagate() || !IsCitusTable(tableAddress->objectId))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(tableAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -1776,9 +1782,15 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
|
List *addresses = GetObjectAddressListFromParseTree((Node *) stmt,
|
||||||
stmt->missing_ok);
|
stmt->missing_ok);
|
||||||
Oid relationId = address.objectId;
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(addresses) == 1);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
Oid relationId = address->objectId;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check whether we are dealing with a sequence or view here and route queries
|
* Check whether we are dealing with a sequence or view here and route queries
|
||||||
|
@ -1990,9 +2002,9 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
||||||
EnsureRelationHasCompatibleSequenceTypes(relationId);
|
EnsureRelationHasCompatibleSequenceTypes(relationId);
|
||||||
|
|
||||||
/* changing a relation could introduce new dependencies */
|
/* changing a relation could introduce new dependencies */
|
||||||
ObjectAddress tableAddress = { 0 };
|
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* for the new sequences coming with this ALTER TABLE statement */
|
/* for the new sequences coming with this ALTER TABLE statement */
|
||||||
|
|
|
@ -224,8 +224,12 @@ PostprocessCreateTriggerStmt(Node *node, const char *queryString)
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
ErrorOutForTriggerIfNotSupported(relationId);
|
ErrorOutForTriggerIfNotSupported(relationId);
|
||||||
|
|
||||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree(node, missingOk);
|
List *objectAddresses = GetObjectAddressListFromParseTree(node, missingOk);
|
||||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(objectAddresses) == 1);
|
||||||
|
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(objectAddresses);
|
||||||
|
|
||||||
char *triggerName = createTriggerStmt->trigname;
|
char *triggerName = createTriggerStmt->trigname;
|
||||||
return CitusCreateTriggerCommandDDLJob(relationId, triggerName,
|
return CitusCreateTriggerCommandDDLJob(relationId, triggerName,
|
||||||
|
|
|
@ -117,8 +117,12 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString,
|
||||||
Assert(stmt->renameType == OBJECT_ATTRIBUTE);
|
Assert(stmt->renameType == OBJECT_ATTRIBUTE);
|
||||||
Assert(stmt->relationType == OBJECT_TYPE);
|
Assert(stmt->relationType == OBJECT_TYPE);
|
||||||
|
|
||||||
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *typeAddresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
if (!ShouldPropagateObject(&typeAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(objectAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(typeAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -853,8 +853,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
*/
|
*/
|
||||||
if (ops && ops->markDistributed)
|
if (ops && ops->markDistributed)
|
||||||
{
|
{
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(parsetree, false);
|
List *addresses = GetObjectAddressListFromParseTree(parsetree, false);
|
||||||
MarkObjectDistributed(&address);
|
ObjectAddress *address = NULL;
|
||||||
|
foreach_ptr(address, addresses)
|
||||||
|
{
|
||||||
|
MarkObjectDistributed(address);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,22 +94,27 @@ PostprocessViewStmt(Node *node, const char *queryString)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
List *viewAddresses = GetObjectAddressListFromParseTree((Node *) stmt, false);
|
||||||
|
|
||||||
if (IsObjectAddressOwnedByExtension(&viewAddress, NULL))
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (IsAnyObjectAddressOwnedByExtension(viewAddresses, NULL))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the view has any unsupported dependency, create it locally */
|
/* If the view has any unsupported dependency, create it locally */
|
||||||
if (ErrorOrWarnIfObjectHasUnsupportedDependency(&viewAddress))
|
if (ErrorOrWarnIfAnyObjectHasUnsupportedDependency(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&viewAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(viewAddresses);
|
||||||
|
|
||||||
char *command = CreateViewDDLCommand(viewAddress.objectId);
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *viewAddress = linitial(viewAddresses);
|
||||||
|
char *command = CreateViewDDLCommand(viewAddress->objectId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We'd typically use NodeDDLTaskList() for generating node-level DDL commands,
|
* We'd typically use NodeDDLTaskList() for generating node-level DDL commands,
|
||||||
|
@ -140,7 +145,7 @@ PostprocessViewStmt(Node *node, const char *queryString)
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ddlJob->targetObjectAddress = viewAddress;
|
ddlJob->targetObjectAddress = *viewAddress;
|
||||||
ddlJob->metadataSyncCommand = command;
|
ddlJob->metadataSyncCommand = command;
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
|
|
||||||
|
@ -442,10 +447,9 @@ IsViewDistributed(Oid viewOid)
|
||||||
Assert(get_rel_relkind(viewOid) == RELKIND_VIEW ||
|
Assert(get_rel_relkind(viewOid) == RELKIND_VIEW ||
|
||||||
get_rel_relkind(viewOid) == RELKIND_MATVIEW);
|
get_rel_relkind(viewOid) == RELKIND_MATVIEW);
|
||||||
|
|
||||||
ObjectAddress viewAddress = { 0 };
|
ObjectAddress *viewAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
ObjectAddressSet(*viewAddress, RelationRelationId, viewOid);
|
||||||
|
return IsAnyObjectDistributed(list_make1(viewAddress));
|
||||||
return IsObjectDistributed(&viewAddress);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -458,8 +462,12 @@ PreprocessAlterViewStmt(Node *node, const char *queryString, ProcessUtilityConte
|
||||||
{
|
{
|
||||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||||
|
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *viewAddresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
if (!ShouldPropagateObject(&viewAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -471,12 +479,15 @@ PreprocessAlterViewStmt(Node *node, const char *queryString, ProcessUtilityConte
|
||||||
/* reconstruct alter statement in a portable fashion */
|
/* reconstruct alter statement in a portable fashion */
|
||||||
const char *alterViewStmtSql = DeparseTreeNode((Node *) stmt);
|
const char *alterViewStmtSql = DeparseTreeNode((Node *) stmt);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *viewAddress = linitial(viewAddresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* To avoid sequential mode, we are using metadata connection. For the
|
* To avoid sequential mode, we are using metadata connection. For the
|
||||||
* detailed explanation, please check the comment on PostprocessViewStmt.
|
* detailed explanation, please check the comment on PostprocessViewStmt.
|
||||||
*/
|
*/
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ddlJob->targetObjectAddress = viewAddress;
|
ddlJob->targetObjectAddress = *viewAddress;
|
||||||
ddlJob->metadataSyncCommand = alterViewStmtSql;
|
ddlJob->metadataSyncCommand = alterViewStmtSql;
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
|
|
||||||
|
@ -493,24 +504,28 @@ PostprocessAlterViewStmt(Node *node, const char *queryString)
|
||||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||||
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_VIEW);
|
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_VIEW);
|
||||||
|
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *viewAddresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
if (!ShouldPropagateObject(&viewAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsObjectAddressOwnedByExtension(&viewAddress, NULL))
|
if (IsAnyObjectAddressOwnedByExtension(viewAddresses, NULL))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the view has any unsupported dependency, create it locally */
|
/* If the view has any unsupported dependency, create it locally */
|
||||||
if (ErrorOrWarnIfObjectHasUnsupportedDependency(&viewAddress))
|
if (ErrorOrWarnIfAnyObjectHasUnsupportedDependency(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
EnsureDependenciesExistOnAllNodes(&viewAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(viewAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -541,8 +556,12 @@ List *
|
||||||
PreprocessRenameViewStmt(Node *node, const char *queryString,
|
PreprocessRenameViewStmt(Node *node, const char *queryString,
|
||||||
ProcessUtilityContext processUtilityContext)
|
ProcessUtilityContext processUtilityContext)
|
||||||
{
|
{
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree(node, true);
|
List *viewAddresses = GetObjectAddressListFromParseTree(node, true);
|
||||||
if (!ShouldPropagateObject(&viewAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -555,12 +574,15 @@ PreprocessRenameViewStmt(Node *node, const char *queryString,
|
||||||
/* deparse sql*/
|
/* deparse sql*/
|
||||||
const char *renameStmtSql = DeparseTreeNode(node);
|
const char *renameStmtSql = DeparseTreeNode(node);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *viewAddress = linitial(viewAddresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* To avoid sequential mode, we are using metadata connection. For the
|
* To avoid sequential mode, we are using metadata connection. For the
|
||||||
* detailed explanation, please check the comment on PostprocessViewStmt.
|
* detailed explanation, please check the comment on PostprocessViewStmt.
|
||||||
*/
|
*/
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ddlJob->targetObjectAddress = viewAddress;
|
ddlJob->targetObjectAddress = *viewAddress;
|
||||||
ddlJob->metadataSyncCommand = renameStmtSql;
|
ddlJob->metadataSyncCommand = renameStmtSql;
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
|
|
||||||
|
@ -596,8 +618,12 @@ PreprocessAlterViewSchemaStmt(Node *node, const char *queryString,
|
||||||
{
|
{
|
||||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||||
|
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *viewAddresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
if (!ShouldPropagateObject(&viewAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -608,12 +634,15 @@ PreprocessAlterViewSchemaStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
const char *sql = DeparseTreeNode((Node *) stmt);
|
const char *sql = DeparseTreeNode((Node *) stmt);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *viewAddress = linitial(viewAddresses);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* To avoid sequential mode, we are using metadata connection. For the
|
* To avoid sequential mode, we are using metadata connection. For the
|
||||||
* detailed explanation, please check the comment on PostprocessViewStmt.
|
* detailed explanation, please check the comment on PostprocessViewStmt.
|
||||||
*/
|
*/
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||||
ddlJob->targetObjectAddress = viewAddress;
|
ddlJob->targetObjectAddress = *viewAddress;
|
||||||
ddlJob->metadataSyncCommand = sql;
|
ddlJob->metadataSyncCommand = sql;
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
|
|
||||||
|
@ -631,14 +660,18 @@ PostprocessAlterViewSchemaStmt(Node *node, const char *queryString)
|
||||||
{
|
{
|
||||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||||
|
|
||||||
ObjectAddress viewAddress = GetObjectAddressFromParseTree((Node *) stmt, true);
|
List *viewAddresses = GetObjectAddressListFromParseTree((Node *) stmt, true);
|
||||||
if (!ShouldPropagateObject(&viewAddress))
|
|
||||||
|
/* the code-path only supports a single object */
|
||||||
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
if (!ShouldPropagateAnyObject(viewAddresses))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* dependencies have changed (schema) let's ensure they exist */
|
/* dependencies have changed (schema) let's ensure they exist */
|
||||||
EnsureDependenciesExistOnAllNodes(&viewAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(viewAddresses);
|
||||||
|
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,11 +20,11 @@
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetObjectAddressFromParseTree returns the ObjectAddress of the main target of the parse
|
* GetObjectAddressListFromParseTree returns the list of ObjectAddress of the main target of the parse
|
||||||
* tree.
|
* tree.
|
||||||
*/
|
*/
|
||||||
ObjectAddress
|
List *
|
||||||
GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
|
GetObjectAddressListFromParseTree(Node *parseTree, bool missing_ok)
|
||||||
{
|
{
|
||||||
const DistributeObjectOps *ops = GetDistributeObjectOps(parseTree);
|
const DistributeObjectOps *ops = GetDistributeObjectOps(parseTree);
|
||||||
|
|
||||||
|
@ -33,19 +33,7 @@ GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok)
|
||||||
ereport(ERROR, (errmsg("unsupported statement to get object address for")));
|
ereport(ERROR, (errmsg("unsupported statement to get object address for")));
|
||||||
}
|
}
|
||||||
|
|
||||||
List *objectAddresses = ops->address(parseTree, missing_ok);
|
return ops->address(parseTree, missing_ok);
|
||||||
|
|
||||||
if (list_length(objectAddresses) > 1)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg(
|
|
||||||
"citus does not support multiple object addresses in GetObjectAddressFromParseTree")));
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert(list_length(objectAddresses) == 1);
|
|
||||||
|
|
||||||
ObjectAddress *objectAddress = linitial(objectAddresses);
|
|
||||||
|
|
||||||
return *objectAddress;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,8 @@ static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid
|
||||||
static List * GetTypeConstraintDependencyDefinition(Oid typeId);
|
static List * GetTypeConstraintDependencyDefinition(Oid typeId);
|
||||||
static List * CreateObjectAddressDependencyDefList(Oid classId, List *objectIdList);
|
static List * CreateObjectAddressDependencyDefList(Oid classId, List *objectIdList);
|
||||||
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
|
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
|
||||||
|
static DeferredErrorMessage * DeferErrorIfHasUnsupportedDependency(const ObjectAddress *
|
||||||
|
objectAddress);
|
||||||
|
|
||||||
/* forward declarations for functions to interact with the ObjectAddressCollector */
|
/* forward declarations for functions to interact with the ObjectAddressCollector */
|
||||||
static void InitObjectAddressCollector(ObjectAddressCollector *collector);
|
static void InitObjectAddressCollector(ObjectAddressCollector *collector);
|
||||||
|
@ -176,7 +178,10 @@ static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
|
||||||
static List * GetDependentRoleIdsFDW(Oid FDWOid);
|
static List * GetDependentRoleIdsFDW(Oid FDWOid);
|
||||||
static List * ExpandRolesToGroups(Oid roleid);
|
static List * ExpandRolesToGroups(Oid roleid);
|
||||||
static ViewDependencyNode * BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap);
|
static ViewDependencyNode * BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap);
|
||||||
|
static bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||||
|
ObjectAddress *extensionAddress);
|
||||||
|
static bool ErrorOrWarnIfObjectHasUnsupportedDependency(const
|
||||||
|
ObjectAddress *objectAddress);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetUniqueDependenciesList takes a list of object addresses and returns a new list
|
* GetUniqueDependenciesList takes a list of object addresses and returns a new list
|
||||||
|
@ -774,8 +779,8 @@ SupportedDependencyByCitus(const ObjectAddress *address)
|
||||||
* object doesn't have any unsupported dependency, else throws a message with proper level
|
* object doesn't have any unsupported dependency, else throws a message with proper level
|
||||||
* (except the cluster doesn't have any node) and return true.
|
* (except the cluster doesn't have any node) and return true.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
ErrorOrWarnIfObjectHasUnsupportedDependency(ObjectAddress *objectAddress)
|
ErrorOrWarnIfObjectHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
||||||
{
|
{
|
||||||
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(objectAddress);
|
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(objectAddress);
|
||||||
if (errMsg != NULL)
|
if (errMsg != NULL)
|
||||||
|
@ -805,7 +810,7 @@ ErrorOrWarnIfObjectHasUnsupportedDependency(ObjectAddress *objectAddress)
|
||||||
* is not distributed yet, we can create it locally to not affect user's local
|
* is not distributed yet, we can create it locally to not affect user's local
|
||||||
* usage experience.
|
* usage experience.
|
||||||
*/
|
*/
|
||||||
else if (IsObjectDistributed(objectAddress))
|
else if (IsAnyObjectDistributed(list_make1((ObjectAddress *) objectAddress)))
|
||||||
{
|
{
|
||||||
RaiseDeferredError(errMsg, ERROR);
|
RaiseDeferredError(errMsg, ERROR);
|
||||||
}
|
}
|
||||||
|
@ -821,11 +826,31 @@ ErrorOrWarnIfObjectHasUnsupportedDependency(ObjectAddress *objectAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorOrWarnIfAnyObjectHasUnsupportedDependency iteratively calls
|
||||||
|
* ErrorOrWarnIfObjectHasUnsupportedDependency for given addresses.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ErrorOrWarnIfAnyObjectHasUnsupportedDependency(List *objectAddresses)
|
||||||
|
{
|
||||||
|
ObjectAddress *objectAddress = NULL;
|
||||||
|
foreach_ptr(objectAddress, objectAddresses)
|
||||||
|
{
|
||||||
|
if (ErrorOrWarnIfObjectHasUnsupportedDependency(objectAddress))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeferErrorIfHasUnsupportedDependency returns deferred error message if the given
|
* DeferErrorIfHasUnsupportedDependency returns deferred error message if the given
|
||||||
* object has any undistributable dependency.
|
* object has any undistributable dependency.
|
||||||
*/
|
*/
|
||||||
DeferredErrorMessage *
|
static DeferredErrorMessage *
|
||||||
DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
||||||
{
|
{
|
||||||
ObjectAddress *undistributableDependency = GetUndistributableDependency(
|
ObjectAddress *undistributableDependency = GetUndistributableDependency(
|
||||||
|
@ -858,7 +883,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
||||||
* Otherwise, callers are expected to throw the error returned from this
|
* Otherwise, callers are expected to throw the error returned from this
|
||||||
* function as a hard one by ignoring the detail part.
|
* function as a hard one by ignoring the detail part.
|
||||||
*/
|
*/
|
||||||
if (!IsObjectDistributed(objectAddress))
|
if (!IsAnyObjectDistributed(list_make1((ObjectAddress *) objectAddress)))
|
||||||
{
|
{
|
||||||
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
|
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
|
||||||
objectDescription);
|
objectDescription);
|
||||||
|
@ -873,7 +898,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
||||||
objectDescription,
|
objectDescription,
|
||||||
dependencyDescription);
|
dependencyDescription);
|
||||||
|
|
||||||
if (IsObjectDistributed(objectAddress))
|
if (IsAnyObjectDistributed(list_make1((ObjectAddress *) objectAddress)))
|
||||||
{
|
{
|
||||||
appendStringInfo(hintInfo,
|
appendStringInfo(hintInfo,
|
||||||
"Distribute \"%s\" first to modify \"%s\" on worker nodes",
|
"Distribute \"%s\" first to modify \"%s\" on worker nodes",
|
||||||
|
@ -900,6 +925,28 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeferErrorIfAnyObjectHasUnsupportedDependency iteratively calls
|
||||||
|
* DeferErrorIfHasUnsupportedDependency for given addresses.
|
||||||
|
*/
|
||||||
|
DeferredErrorMessage *
|
||||||
|
DeferErrorIfAnyObjectHasUnsupportedDependency(const List *objectAddresses)
|
||||||
|
{
|
||||||
|
DeferredErrorMessage *deferredErrorMessage = NULL;
|
||||||
|
ObjectAddress *objectAddress = NULL;
|
||||||
|
foreach_ptr(objectAddress, objectAddresses)
|
||||||
|
{
|
||||||
|
deferredErrorMessage = DeferErrorIfHasUnsupportedDependency(objectAddress);
|
||||||
|
if (deferredErrorMessage)
|
||||||
|
{
|
||||||
|
return deferredErrorMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetUndistributableDependency checks whether object has any non-distributable
|
* GetUndistributableDependency checks whether object has any non-distributable
|
||||||
* dependency. If any one found, it will be returned.
|
* dependency. If any one found, it will be returned.
|
||||||
|
@ -936,7 +983,7 @@ GetUndistributableDependency(const ObjectAddress *objectAddress)
|
||||||
/*
|
/*
|
||||||
* If object is distributed already, ignore it.
|
* If object is distributed already, ignore it.
|
||||||
*/
|
*/
|
||||||
if (IsObjectDistributed(dependency))
|
if (IsAnyObjectDistributed(list_make1(dependency)))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1015,7 +1062,7 @@ IsTableOwnedByExtension(Oid relationId)
|
||||||
* If extensionAddress is not set to a NULL pointer the function will write the extension
|
* If extensionAddress is not set to a NULL pointer the function will write the extension
|
||||||
* address this function depends on into this location.
|
* address this function depends on into this location.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||||
ObjectAddress *extensionAddress)
|
ObjectAddress *extensionAddress)
|
||||||
{
|
{
|
||||||
|
@ -1055,6 +1102,27 @@ IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsAnyObjectAddressOwnedByExtension iteratively calls IsObjectAddressOwnedByExtension
|
||||||
|
* for given addresses to determine if any address is owned by an extension.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsAnyObjectAddressOwnedByExtension(const List *targets,
|
||||||
|
ObjectAddress *extensionAddress)
|
||||||
|
{
|
||||||
|
ObjectAddress *target = NULL;
|
||||||
|
foreach_ptr(target, targets)
|
||||||
|
{
|
||||||
|
if (IsObjectAddressOwnedByExtension(target, extensionAddress))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FollowNewSupportedDependencies applies filters on pg_depend entries to follow all
|
* FollowNewSupportedDependencies applies filters on pg_depend entries to follow all
|
||||||
* objects which should be distributed before the root object can safely be created.
|
* objects which should be distributed before the root object can safely be created.
|
||||||
|
@ -1097,7 +1165,9 @@ FollowNewSupportedDependencies(ObjectAddressCollector *collector,
|
||||||
* If the object is already distributed it is not a `new` object that needs to be
|
* If the object is already distributed it is not a `new` object that needs to be
|
||||||
* distributed before we create a dependent object
|
* distributed before we create a dependent object
|
||||||
*/
|
*/
|
||||||
if (IsObjectDistributed(&address))
|
ObjectAddress *copyAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
*copyAddress = address;
|
||||||
|
if (IsAnyObjectDistributed(list_make1(copyAddress)))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@
|
||||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||||
Datum *paramValues);
|
Datum *paramValues);
|
||||||
|
static bool IsObjectDistributed(const ObjectAddress *address);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
|
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
|
||||||
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
|
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
|
||||||
|
@ -240,17 +241,18 @@ ShouldMarkRelationDistributed(Oid relationId)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress relationAddress = { 0 };
|
ObjectAddress *relationAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*relationAddress, RelationRelationId, relationId);
|
||||||
|
|
||||||
bool pgObject = (relationId < FirstNormalObjectId);
|
bool pgObject = (relationId < FirstNormalObjectId);
|
||||||
bool isObjectSupported = SupportedDependencyByCitus(&relationAddress);
|
bool isObjectSupported = SupportedDependencyByCitus(relationAddress);
|
||||||
bool ownedByExtension = IsTableOwnedByExtension(relationId);
|
bool ownedByExtension = IsTableOwnedByExtension(relationId);
|
||||||
bool alreadyDistributed = IsObjectDistributed(&relationAddress);
|
bool alreadyDistributed = IsObjectDistributed(relationAddress);
|
||||||
bool hasUnsupportedDependency =
|
bool hasUnsupportedDependency =
|
||||||
DeferErrorIfHasUnsupportedDependency(&relationAddress) != NULL;
|
DeferErrorIfAnyObjectHasUnsupportedDependency(list_make1(relationAddress)) !=
|
||||||
|
NULL;
|
||||||
bool hasCircularDependency =
|
bool hasCircularDependency =
|
||||||
DeferErrorIfCircularDependencyExists(&relationAddress) != NULL;
|
DeferErrorIfCircularDependencyExists(relationAddress) != NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* pgObject: Citus never marks pg objects as distributed
|
* pgObject: Citus never marks pg objects as distributed
|
||||||
|
@ -390,7 +392,7 @@ UnmarkObjectDistributed(const ObjectAddress *address)
|
||||||
* IsObjectDistributed returns if the object addressed is already distributed in the
|
* IsObjectDistributed returns if the object addressed is already distributed in the
|
||||||
* cluster. This performs a local indexed lookup in pg_dist_object.
|
* cluster. This performs a local indexed lookup in pg_dist_object.
|
||||||
*/
|
*/
|
||||||
bool
|
static bool
|
||||||
IsObjectDistributed(const ObjectAddress *address)
|
IsObjectDistributed(const ObjectAddress *address)
|
||||||
{
|
{
|
||||||
ScanKeyData key[3];
|
ScanKeyData key[3];
|
||||||
|
@ -422,6 +424,26 @@ IsObjectDistributed(const ObjectAddress *address)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsAnyObjectDistributed iteratively calls IsObjectDistributed for given addresses to
|
||||||
|
* determine if any object is distributed.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsAnyObjectDistributed(const List *addresses)
|
||||||
|
{
|
||||||
|
ObjectAddress *address = NULL;
|
||||||
|
foreach_ptr(address, addresses)
|
||||||
|
{
|
||||||
|
if (IsObjectDistributed(address))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all
|
* GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all
|
||||||
* distributed objects as marked in pg_dist_object
|
* distributed objects as marked in pg_dist_object
|
||||||
|
|
|
@ -356,10 +356,9 @@ CreateDependingViewsOnWorkers(Oid relationId)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress viewAddress = { 0 };
|
ObjectAddress *viewAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
ObjectAddressSet(*viewAddress, RelationRelationId, viewOid);
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(viewAddress));
|
||||||
EnsureDependenciesExistOnAllNodes(&viewAddress);
|
|
||||||
|
|
||||||
char *createViewCommand = CreateViewDDLCommand(viewOid);
|
char *createViewCommand = CreateViewDDLCommand(viewOid);
|
||||||
char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid);
|
char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid);
|
||||||
|
@ -367,7 +366,7 @@ CreateDependingViewsOnWorkers(Oid relationId)
|
||||||
SendCommandToWorkersWithMetadata(createViewCommand);
|
SendCommandToWorkersWithMetadata(createViewCommand);
|
||||||
SendCommandToWorkersWithMetadata(alterViewOwnerCommand);
|
SendCommandToWorkersWithMetadata(alterViewOwnerCommand);
|
||||||
|
|
||||||
MarkObjectDistributed(&viewAddress);
|
MarkObjectDistributed(viewAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
||||||
|
@ -603,10 +602,10 @@ ShouldSyncSequenceMetadata(Oid relationId)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObjectAddress sequenceAddress = { 0 };
|
ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(sequenceAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*sequenceAddress, RelationRelationId, relationId);
|
||||||
|
|
||||||
return IsObjectDistributed(&sequenceAddress);
|
return IsAnyObjectDistributed(list_make1(sequenceAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
|
||||||
text *tableNameText = PG_GETARG_TEXT_P(0);
|
text *tableNameText = PG_GETARG_TEXT_P(0);
|
||||||
int32 shardCount = PG_GETARG_INT32(1);
|
int32 shardCount = PG_GETARG_INT32(1);
|
||||||
int32 replicationFactor = PG_GETARG_INT32(2);
|
int32 replicationFactor = PG_GETARG_INT32(2);
|
||||||
ObjectAddress tableAddress = { 0 };
|
|
||||||
|
|
||||||
Oid distributedTableId = ResolveRelationId(tableNameText, false);
|
Oid distributedTableId = ResolveRelationId(tableNameText, false);
|
||||||
|
|
||||||
|
@ -83,8 +82,9 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
|
||||||
* via their own connection and committed immediately so they become visible to all
|
* via their own connection and committed immediately so they become visible to all
|
||||||
* sessions creating shards.
|
* sessions creating shards.
|
||||||
*/
|
*/
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, distributedTableId);
|
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
ObjectAddressSet(*tableAddress, RelationRelationId, distributedTableId);
|
||||||
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||||
|
|
||||||
EnsureReferenceTablesExistOnAllNodes();
|
EnsureReferenceTablesExistOnAllNodes();
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
text *relationNameText = PG_GETARG_TEXT_P(0);
|
text *relationNameText = PG_GETARG_TEXT_P(0);
|
||||||
char *relationName = text_to_cstring(relationNameText);
|
char *relationName = text_to_cstring(relationNameText);
|
||||||
uint32 attemptableNodeCount = 0;
|
uint32 attemptableNodeCount = 0;
|
||||||
ObjectAddress tableAddress = { 0 };
|
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
|
||||||
|
|
||||||
uint32 candidateNodeIndex = 0;
|
uint32 candidateNodeIndex = 0;
|
||||||
List *candidateNodeList = NIL;
|
List *candidateNodeList = NIL;
|
||||||
|
@ -115,8 +115,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
* via their own connection and committed immediately so they become visible to all
|
* via their own connection and committed immediately so they become visible to all
|
||||||
* sessions creating shards.
|
* sessions creating shards.
|
||||||
*/
|
*/
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
|
||||||
EnsureReferenceTablesExistOnAllNodes();
|
EnsureReferenceTablesExistOnAllNodes();
|
||||||
|
|
||||||
/* don't allow the table to be dropped */
|
/* don't allow the table to be dropped */
|
||||||
|
|
|
@ -181,8 +181,13 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
|
||||||
* same subject.
|
* same subject.
|
||||||
*/
|
*/
|
||||||
Node *parseTree = ParseTreeNode(linitial(sqlStatements));
|
Node *parseTree = ParseTreeNode(linitial(sqlStatements));
|
||||||
ObjectAddress address = GetObjectAddressFromParseTree(parseTree, true);
|
List *addresses = GetObjectAddressListFromParseTree(parseTree, true);
|
||||||
if (ObjectExists(&address))
|
Assert(list_length(viewAddresses) == 1);
|
||||||
|
|
||||||
|
/* We have already asserted that we have exactly 1 address in the addresses. */
|
||||||
|
ObjectAddress *address = linitial(addresses);
|
||||||
|
|
||||||
|
if (ObjectExists(address))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Object with name from statement is already found locally, check if states are
|
* Object with name from statement is already found locally, check if states are
|
||||||
|
@ -195,7 +200,7 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
|
||||||
* recreate our version of the object. This we can compare to what the coordinator
|
* recreate our version of the object. This we can compare to what the coordinator
|
||||||
* sent us. If they match we don't do anything.
|
* sent us. If they match we don't do anything.
|
||||||
*/
|
*/
|
||||||
List *localSqlStatements = CreateStmtListByObjectAddress(&address);
|
List *localSqlStatements = CreateStmtListByObjectAddress(address);
|
||||||
if (CompareStringList(sqlStatements, localSqlStatements))
|
if (CompareStringList(sqlStatements, localSqlStatements))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -208,9 +213,9 @@ WorkerCreateOrReplaceObject(List *sqlStatements)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *newName = GenerateBackupNameForCollision(&address);
|
char *newName = GenerateBackupNameForCollision(address);
|
||||||
|
|
||||||
RenameStmt *renameStmt = CreateRenameStatement(&address, newName);
|
RenameStmt *renameStmt = CreateRenameStatement(address, newName);
|
||||||
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
|
const char *sqlRenameStmt = DeparseTreeNode((Node *) renameStmt);
|
||||||
ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
|
ProcessUtilityParseTree((Node *) renameStmt, sqlRenameStmt,
|
||||||
PROCESS_UTILITY_QUERY,
|
PROCESS_UTILITY_QUERY,
|
||||||
|
|
|
@ -127,7 +127,8 @@ WorkerDropDistributedTable(Oid relationId)
|
||||||
relation_close(distributedRelation, AccessShareLock);
|
relation_close(distributedRelation, AccessShareLock);
|
||||||
|
|
||||||
/* prepare distributedTableObject for dropping the table */
|
/* prepare distributedTableObject for dropping the table */
|
||||||
ObjectAddress distributedTableObject = { RelationRelationId, relationId, 0 };
|
ObjectAddress *distributedTableObject = palloc0(sizeof(ObjectAddress));
|
||||||
|
ObjectAddressSet(*distributedTableObject, RelationRelationId, relationId);
|
||||||
|
|
||||||
/* Drop dependent sequences from pg_dist_object */
|
/* Drop dependent sequences from pg_dist_object */
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||||
|
@ -144,7 +145,7 @@ WorkerDropDistributedTable(Oid relationId)
|
||||||
UnmarkObjectDistributed(&ownedSequenceAddress);
|
UnmarkObjectDistributed(&ownedSequenceAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmarkObjectDistributed(&distributedTableObject);
|
UnmarkObjectDistributed(distributedTableObject);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Remove metadata before object's itself to make functions no-op within
|
* Remove metadata before object's itself to make functions no-op within
|
||||||
|
@ -177,7 +178,7 @@ WorkerDropDistributedTable(Oid relationId)
|
||||||
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
|
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
|
||||||
* table.
|
* table.
|
||||||
*/
|
*/
|
||||||
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
|
if (!IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL))
|
||||||
{
|
{
|
||||||
char *relName = get_rel_name(relationId);
|
char *relName = get_rel_name(relationId);
|
||||||
Oid schemaId = get_rel_namespace(relationId);
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
|
@ -238,12 +239,9 @@ worker_drop_shell_table(PG_FUNCTION_ARGS)
|
||||||
relation_close(distributedRelation, AccessShareLock);
|
relation_close(distributedRelation, AccessShareLock);
|
||||||
|
|
||||||
/* prepare distributedTableObject for dropping the table */
|
/* prepare distributedTableObject for dropping the table */
|
||||||
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
|
ObjectAddress *distributedTableObject = palloc0(sizeof(ObjectAddress));
|
||||||
distributedTableObject.classId = RelationRelationId;
|
ObjectAddressSet(*distributedTableObject, RelationRelationId, relationId);
|
||||||
distributedTableObject.objectId = relationId;
|
if (IsAnyObjectAddressOwnedByExtension(list_make1(distributedTableObject), NULL))
|
||||||
distributedTableObject.objectSubId = 0;
|
|
||||||
|
|
||||||
if (IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
|
|
||||||
{
|
{
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -270,7 +268,7 @@ worker_drop_shell_table(PG_FUNCTION_ARGS)
|
||||||
*
|
*
|
||||||
* We drop the table with cascade since other tables may be referring to it.
|
* We drop the table with cascade since other tables may be referring to it.
|
||||||
*/
|
*/
|
||||||
performDeletion(&distributedTableObject, DROP_CASCADE,
|
performDeletion(distributedTableObject, DROP_CASCADE,
|
||||||
PERFORM_DELETION_INTERNAL);
|
PERFORM_DELETION_INTERNAL);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
|
|
@ -148,7 +148,7 @@ extern void QualifyAlterTypeOwnerStmt(Node *stmt);
|
||||||
extern char * GetTypeNamespaceNameByNameList(List *names);
|
extern char * GetTypeNamespaceNameByNameList(List *names);
|
||||||
extern Oid TypeOidGetNamespaceOid(Oid typeOid);
|
extern Oid TypeOidGetNamespaceOid(Oid typeOid);
|
||||||
|
|
||||||
extern ObjectAddress GetObjectAddressFromParseTree(Node *parseTree, bool missing_ok);
|
extern List * GetObjectAddressListFromParseTree(Node *parseTree, bool missing_ok);
|
||||||
extern List * RenameAttributeStmtObjectAddress(Node *stmt, bool missing_ok);
|
extern List * RenameAttributeStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||||
|
|
||||||
/* forward declarations for deparse_view_stmts.c */
|
/* forward declarations for deparse_view_stmts.c */
|
||||||
|
|
|
@ -23,10 +23,9 @@ extern List * GetUniqueDependenciesList(List *objectAddressesList);
|
||||||
extern List * GetDependenciesForObject(const ObjectAddress *target);
|
extern List * GetDependenciesForObject(const ObjectAddress *target);
|
||||||
extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target);
|
extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target);
|
||||||
extern List * GetAllDependenciesForObject(const ObjectAddress *target);
|
extern List * GetAllDependenciesForObject(const ObjectAddress *target);
|
||||||
extern bool ErrorOrWarnIfObjectHasUnsupportedDependency(ObjectAddress *objectAddress);
|
extern bool ErrorOrWarnIfAnyObjectHasUnsupportedDependency(List *objectAddresses);
|
||||||
extern DeferredErrorMessage * DeferErrorIfHasUnsupportedDependency(const
|
extern DeferredErrorMessage * DeferErrorIfAnyObjectHasUnsupportedDependency(const List *
|
||||||
ObjectAddress *
|
objectAddresses);
|
||||||
objectAddress);
|
|
||||||
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
|
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
|
||||||
extern bool SupportedDependencyByCitus(const ObjectAddress *address);
|
extern bool SupportedDependencyByCitus(const ObjectAddress *address);
|
||||||
extern List * GetPgDependTuplesForDependingObjects(Oid targetObjectClassId,
|
extern List * GetPgDependTuplesForDependingObjects(Oid targetObjectClassId,
|
||||||
|
|
|
@ -20,14 +20,14 @@
|
||||||
|
|
||||||
extern bool ObjectExists(const ObjectAddress *address);
|
extern bool ObjectExists(const ObjectAddress *address);
|
||||||
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
||||||
extern bool IsObjectDistributed(const ObjectAddress *address);
|
extern bool IsAnyObjectDistributed(const List *addresses);
|
||||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,
|
||||||
ObjectAddress *extensionAddress);
|
ObjectAddress *extensionAddress);
|
||||||
extern ObjectAddress PgGetObjectAddress(char *ttype, ArrayType *namearr,
|
extern ObjectAddress PgGetObjectAddress(char *ttype, ArrayType *namearr,
|
||||||
ArrayType *argsarr);
|
ArrayType *argsarr);
|
||||||
|
|
|
@ -258,15 +258,15 @@ extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
extern void CreateTruncateTrigger(Oid relationId);
|
extern void CreateTruncateTrigger(Oid relationId);
|
||||||
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
||||||
|
|
||||||
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
extern void EnsureAllObjectDependenciesExistOnAllNodes(const List *targets);
|
||||||
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
|
extern DeferredErrorMessage * DeferErrorIfCircularDependencyExists(const
|
||||||
ObjectAddress *
|
ObjectAddress *
|
||||||
objectAddress);
|
objectAddress);
|
||||||
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
|
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
|
||||||
extern List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
|
extern List * GetAllDependencyCreateDDLCommands(const List *dependencies);
|
||||||
extern bool ShouldPropagate(void);
|
extern bool ShouldPropagate(void);
|
||||||
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
|
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
|
||||||
extern bool ShouldPropagateObject(const ObjectAddress *address);
|
extern bool ShouldPropagateAnyObject(List *addresses);
|
||||||
extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort);
|
extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort);
|
||||||
|
|
||||||
/* Remaining metadata utility functions */
|
/* Remaining metadata utility functions */
|
||||||
|
|
Loading…
Reference in New Issue