Merge pull request #5931 from citusdata/refactor/dedupe-object-propagation

Refactor: reduce complexity and code duplication for Object Propagation
release-11.0-refactor-ddl
Nils Dijk 2022-05-18 16:30:31 +02:00
parent c2d9e88bf5
commit 5e4c0e4bea
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
17 changed files with 488 additions and 2921 deletions

View File

@ -1,89 +0,0 @@
/*-------------------------------------------------------------------------
*
* aggregate.c
* Commands for distributing AGGREGATE statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "nodes/parsenodes.h"
#include "utils/lsyscache.h"
/*
* PreprocessDefineAggregateStmt only qualifies the node with schema name.
* We will handle the rest in the Postprocess phase.
*/
List *
PreprocessDefineAggregateStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
QualifyTreeNode((Node *) node);
return NIL;
}
/*
* PostprocessDefineAggregateStmt actually creates the plan we need to execute for
* aggregate propagation.
* This is the downside of using the locally created aggregate to get the sql statement.
*
* If the aggregate depends on any non-distributed relation, Citus can not distribute it.
* In order to not to prevent users from creating local aggregates on the coordinator,
* a WARNING message will be sent to the user about the case instead of erroring out.
*
* Besides creating the plan we also make sure all (new) dependencies of the aggregate
* are created on all nodes.
*/
List *
PostprocessDefineAggregateStmt(Node *node, const char *queryString)
{
DefineStmt *stmt = castNode(DefineStmt, node);
if (!ShouldPropagate())
{
return NIL;
}
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
EnsureCoordinator();
EnsureSequentialMode(OBJECT_AGGREGATE);
/* If the aggregate has any unsupported dependency, create it locally */
DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(&address);
if (depError != NULL)
{
RaiseDeferredError(depError, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
List *commands = CreateFunctionDDLCommandsIdempotent(&address);
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -36,9 +36,6 @@
static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner, static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner,
char **quotedCollationName); char **quotedCollationName);
static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok,
List **addresses);
static bool ShouldPropagateDefineCollationStmt(void);
/* /*
* GetCreateCollationDDLInternal returns a CREATE COLLATE sql string for the * GetCreateCollationDDLInternal returns a CREATE COLLATE sql string for the
@ -162,267 +159,6 @@ AlterCollationOwnerObjectAddress(Node *node, bool missing_ok)
} }
/*
* FilterNameListForDistributedCollations takes a list of objects to delete.
* This list is filtered against the collations that are distributed.
*
* The original list will not be touched, a new list will be created with only the objects
* in there.
*
* objectAddresses is replaced with a list of object addresses for the filtered objects.
*/
static List *
FilterNameListForDistributedCollations(List *objects, bool missing_ok,
List **objectAddresses)
{
List *result = NIL;
*objectAddresses = NIL;
List *collName = NULL;
foreach_ptr(collName, objects)
{
Oid collOid = get_collation_oid(collName, true);
ObjectAddress collAddress = { 0 };
if (!OidIsValid(collOid))
{
continue;
}
ObjectAddressSet(collAddress, CollationRelationId, collOid);
if (IsObjectDistributed(&collAddress))
{
ObjectAddress *address = palloc0(sizeof(ObjectAddress));
*address = collAddress;
*objectAddresses = lappend(*objectAddresses, address);
result = lappend(result, collName);
}
}
return result;
}
List *
PreprocessDropCollationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
/*
* We swap the list of objects to remove during deparse so we need a reference back to
* the old list to put back
*/
List *distributedTypeAddresses = NIL;
if (!ShouldPropagate())
{
return NIL;
}
QualifyTreeNode((Node *) stmt);
List *oldCollations = stmt->objects;
List *distributedCollations =
FilterNameListForDistributedCollations(oldCollations, stmt->missing_ok,
&distributedTypeAddresses);
if (list_length(distributedCollations) <= 0)
{
/* no distributed types to drop */
return NIL;
}
/*
* managing collations can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* collations, so we block the call.
*/
EnsureCoordinator();
/*
* remove the entries for the distributed objects on dropping
*/
ObjectAddress *addressItem = NULL;
foreach_ptr(addressItem, distributedTypeAddresses)
{
UnmarkObjectDistributed(addressItem);
}
/*
* temporary swap the lists of objects to delete with the distributed objects and
* deparse to an executable sql statement for the workers
*/
stmt->objects = distributedCollations;
char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldCollations;
EnsureSequentialMode(OBJECT_COLLATION);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterCollationOwnerStmt is called for change of ownership of collations
* before the ownership is changed on the local instance.
*
* If the type for which the owner is changed is distributed we execute the change on all
* the workers to keep the type in sync across the cluster.
*/
List *
PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_COLLATION);
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&collationAddress))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_COLLATION);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterCollationOwnerStmt is invoked after the owner has been changed locally.
* Since changing the owner could result in new dependencies being found for this object
* we re-ensure all the dependencies for the collation do exist.
*
* This is solely to propagate the new owner (and all its dependencies) if it was not
* already distributed in the cluster.
*/
List *
PostprocessAlterCollationOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_COLLATION);
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&collationAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&collationAddress);
return NIL;
}
/*
* PreprocessRenameCollationStmt is called when the user is renaming the collation. The invocation happens
* before the statement is applied locally.
*
* As the collation already exists we have access to the ObjectAddress for the collation, this is
* used to check if the collation is distributed. If the collation is distributed the rename is
* executed on all the workers to keep the collation in sync across the cluster.
*/
List *
PreprocessRenameCollationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&collationAddress))
{
return NIL;
}
EnsureCoordinator();
/* fully qualify */
QualifyTreeNode((Node *) stmt);
/* deparse sql*/
char *renameStmtSql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_COLLATION);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterCollationSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers.
*/
List *
PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_COLLATION);
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&collationAddress))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_COLLATION);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterCollationSchemaStmt is executed after the change has been applied locally, we
* can now use the new dependencies of the type to ensure all its dependencies exist on
* the workers before we apply the commands remotely.
*/
List *
PostprocessAlterCollationSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_COLLATION);
ObjectAddress collationAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&collationAddress))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&collationAddress);
return NIL;
}
/* /*
* RenameCollationStmtObjectAddress returns the ObjectAddress of the type that is the object * RenameCollationStmtObjectAddress returns the ObjectAddress of the type that is the object
* of the RenameStmt. Errors if missing_ok is false. * of the RenameStmt. Errors if missing_ok is false.
@ -544,89 +280,3 @@ DefineCollationStmtObjectAddress(Node *node, bool missing_ok)
return address; return address;
} }
/*
* PreprocessDefineCollationStmt executed before the collation has been
* created locally to ensure that if the collation create statement will
* be propagated, the node is a coordinator node
*/
List *
PreprocessDefineCollationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION);
if (!ShouldPropagateDefineCollationStmt())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_COLLATION);
return NIL;
}
/*
* PostprocessDefineCollationStmt executed after the collation has been
* created locally and before we create it on the worker nodes.
* As we now have access to ObjectAddress of the collation that is just
* created, we can mark it as distributed to make sure that its
* dependencies exist on all nodes.
*/
List *
PostprocessDefineCollationStmt(Node *node, const char *queryString)
{
Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION);
if (!ShouldPropagateDefineCollationStmt())
{
return NIL;
}
ObjectAddress collationAddress =
DefineCollationStmtObjectAddress(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(
&collationAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&collationAddress);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
commands = list_concat(commands, CreateCollationDDLsIdempotent(
collationAddress.objectId));
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* ShouldPropagateDefineCollationStmt checks if collation define
* statement should be propagated. Don't propagate if:
* - metadata syncing if off
* - create statement should be propagated according the the ddl propagation policy
*/
static bool
ShouldPropagateDefineCollationStmt()
{
if (!ShouldPropagate())
{
return false;
}
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return false;
}
return true;
}

View File

@ -0,0 +1,274 @@
/*-------------------------------------------------------------------------
*
* common.c
*
* Most of the object propagation code consists of mostly the same
* operations, varying slightly in parameters passed around. This
* file contains most of the reusable logic in object propagation.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/objectaddress.h"
#include "nodes/parsenodes.h"
#include "tcop/utility.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_transaction.h"
/*
* PostprocessCreateDistributedObjectFromCatalogStmt is a common function that can be used
* for most objects during their creation phase. After the creation has happened locally
* this function creates idempotent statements to recreate the object addressed by the
* ObjectAddress of resolved from the creation statement.
*
* Since object already need to be able to create idempotent creation sql to support
* scaleout operations we can reuse this logic during the initial creation of the objects
* to reduce the complexity of implementation of new DDL commands.
*/
List *
PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryString)
{
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
Assert(ops != NULL);
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
if (ops->featureFlag && *ops->featureFlag == false)
{
/* not propagating when a configured feature flag is turned off by the user */
return NIL;
}
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
EnsureCoordinator();
EnsureSequentialMode(ops->objectType);
/* If the object has any unsupported dependency warn, and only create locally */
DeferredErrorMessage *depError = DeferErrorIfHasUnsupportedDependency(&address);
if (depError != NULL)
{
RaiseDeferredError(depError, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
List *commands = GetDependencyCreateDDLCommands(&address);
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDistributedObjectStmt handles any updates to distributed objects by
* creating the fully qualified sql to apply to all workers after checking all
* predconditions that apply to propagating changes.
*
* Preconditions are (in order):
* - not in a CREATE/ALTER EXTENSION code block
* - citus.enable_metadata_sync is turned on
* - object being altered is distributed
* - any object specific feature flag is turned on when a feature flag is available
*
* Once we conclude to propagate the changes to the workers we make sure that the command
* has been executed on the coordinator and force any ongoing transaction to run in
* sequential mode. If any of these steps fail we raise an error to inform the user.
*
* Lastly we recreate a fully qualified version of the original sql and prepare the tasks
* to send these sql commands to the workers. These tasks include instructions to prevent
* recursion of propagation with Citus' MX functionality.
*/
List *
PreprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
Assert(ops != NULL);
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
if (ops->featureFlag && *ops->featureFlag == false)
{
/* not propagating when a configured feature flag is turned off by the user */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(ops->objectType);
QualifyTreeNode(stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDistributedObjectStmt is the counter part of
* PreprocessAlterDistributedObjectStmt that should be executed after the object has been
* changed locally.
*
* We perform the same precondition checks as before to skip this operation if any of the
* failed during preprocessing. Since we already raised an error on other checks we don't
* have to repeat them here, as they will never fail during postprocessing.
*
* When objects get altered they can start depending on undistributed objects. Now that
* the objects has been changed locally we can find these new dependencies and make sure
* they get created on the workers before we send the command list to the workers.
*/
List *
PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString)
{
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
Assert(ops != NULL);
ObjectAddress address = GetObjectAddressFromParseTree(stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
if (ops->featureFlag && *ops->featureFlag == false)
{
/* not propagating when a configured feature flag is turned off by the user */
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PreprocessDropDistributedObjectStmt is a general purpose hook that can propagate any
* DROP statement.
*
* DROP statements are one of the few DDL statements that can work on many different
* objects at once. Instead of resolving just one ObjectAddress and check it is
* distributed we will need to lookup many different object addresses. Only if an object
* was _not_ distributed we will need to remove it from the list of objects before we
* recreate the sql statement.
*
* Given that we actually _do_ need to drop them locally we can't simply remove them from
* the object list. Instead we create a new list where we only add distributed objects to.
* Before we recreate the sql statement we put this list on the drop statement, so that
* the SQL created will only contain the objects that are actually distributed in the
* cluster. After we have the SQL we restore the old list so that all objects get deleted
* locally.
*
* The reason we need to go through all this effort is taht we can't resolve the object
* addresses anymore after the objects have been removed locally. Meaning during the
* postprocessing we cannot understand which objects were distributed to begin with.
*/
List *
PreprocessDropDistributedObjectStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
/*
* We swap the list of objects to remove during deparse so we need a reference back to
* the old list to put back
*/
List *originalObjects = stmt->objects;
if (!ShouldPropagate())
{
return NIL;
}
QualifyTreeNode(node);
List *distributedObjects = NIL;
List *distributedObjectAddresses = NIL;
Node *object = NULL;
foreach_ptr(object, stmt->objects)
{
/* TODO understand if the lock should be sth else */
Relation rel = NULL; /* not used, but required to pass to get_object_address */
ObjectAddress address = get_object_address(stmt->removeType, object, &rel,
AccessShareLock, stmt->missing_ok);
if (IsObjectDistributed(&address))
{
ObjectAddress *addressPtr = palloc0(sizeof(ObjectAddress));
*addressPtr = address;
distributedObjects = lappend(distributedObjects, object);
distributedObjectAddresses = lappend(distributedObjectAddresses, addressPtr);
}
}
if (list_length(distributedObjects) <= 0)
{
/* no distributed objects to drop */
return NIL;
}
/*
* managing objects can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
*/
EnsureCoordinator();
/*
* remove the entries for the distributed objects on dropping
*/
ObjectAddress *address = NULL;
foreach_ptr(address, distributedObjectAddresses)
{
UnmarkObjectDistributed(address);
}
/*
* temporary swap the lists of objects to delete with the distributed objects and
* deparse to an executable sql statement for the workers
*/
stmt->objects = distributedObjects;
char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = originalObjects;
EnsureSequentialMode(stmt->removeType);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -36,75 +36,6 @@ static Oid get_database_owner(Oid db_oid);
bool EnableAlterDatabaseOwner = false; bool EnableAlterDatabaseOwner = false;
/*
* PreprocessAlterDatabaseOwnerStmt is called during the utility hook before the alter
* command is applied locally on the coordinator. This will verify if the command needs to
* be propagated to the workers and if so prepares a list of ddl commands to execute.
*/
List *
PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DATABASE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
if (!EnableAlterDatabaseOwner)
{
/* don't propagate if GUC is turned off */
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_DATABASE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDatabaseOwnerStmt is called during the utility hook after the alter
* database command has been applied locally.
*
* Its main purpose is to propagate the newly formed dependencies onto the nodes before
* applying the change of owner of the databse. This ensures, for systems that have role
* management, that the roles will be created before applying the alter owner command.
*/
List *
PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DATABASE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
if (!EnableAlterDatabaseOwner)
{
/* don't propagate if GUC is turned off */
return NIL;
}
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;
}
/* /*
* AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the * AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the
* object of the AlterOwnerStmt. Errors if missing_ok is false. * object of the AlterOwnerStmt. Errors if missing_ok is false.

View File

@ -34,7 +34,6 @@ typedef bool (*AddressPredicate)(const ObjectAddress *);
static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
static int ObjectAddressComparator(const void *a, const void *b); static int ObjectAddressComparator(const void *a, const void *b);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static List * FilterObjectAddressListByPredicate(List *objectAddressList, static List * FilterObjectAddressListByPredicate(List *objectAddressList,
AddressPredicate predicate); AddressPredicate predicate);
@ -289,7 +288,7 @@ GetDistributableDependenciesForObject(const ObjectAddress *target)
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl * GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
* commands to execute on a worker to create the object. * commands to execute on a worker to create the object.
*/ */
static List * List *
GetDependencyCreateDDLCommands(const ObjectAddress *dependency) GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{ {
switch (getObjectClass(dependency)) switch (getObjectClass(dependency))

View File

@ -16,6 +16,7 @@
#include "distributed/deparser.h" #include "distributed/deparser.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/commands/utility_hook.h"
static DistributeObjectOps NoDistributeOps = { static DistributeObjectOps NoDistributeOps = {
.deparse = NULL, .deparse = NULL,
@ -28,31 +29,34 @@ static DistributeObjectOps NoDistributeOps = {
static DistributeObjectOps Aggregate_AlterObjectSchema = { static DistributeObjectOps Aggregate_AlterObjectSchema = {
.deparse = DeparseAlterFunctionSchemaStmt, .deparse = DeparseAlterFunctionSchemaStmt,
.qualify = QualifyAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt,
.preprocess = PreprocessAlterFunctionSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionSchemaStmtObjectAddress, .address = AlterFunctionSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Aggregate_AlterOwner = { static DistributeObjectOps Aggregate_AlterOwner = {
.deparse = DeparseAlterFunctionOwnerStmt, .deparse = DeparseAlterFunctionOwnerStmt,
.qualify = QualifyAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt,
.preprocess = PreprocessAlterFunctionOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionOwnerObjectAddress, .address = AlterFunctionOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Aggregate_Define = { static DistributeObjectOps Aggregate_Define = {
.deparse = NULL, .deparse = NULL,
.qualify = QualifyDefineAggregateStmt, .qualify = QualifyDefineAggregateStmt,
.preprocess = PreprocessDefineAggregateStmt, .preprocess = NULL,
.postprocess = PostprocessDefineAggregateStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_AGGREGATE,
.address = DefineAggregateStmtObjectAddress, .address = DefineAggregateStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps Aggregate_Drop = { static DistributeObjectOps Aggregate_Drop = {
.deparse = DeparseDropFunctionStmt, .deparse = DeparseDropFunctionStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropFunctionStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -60,16 +64,18 @@ static DistributeObjectOps Aggregate_Drop = {
static DistributeObjectOps Aggregate_Rename = { static DistributeObjectOps Aggregate_Rename = {
.deparse = DeparseRenameFunctionStmt, .deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt,
.preprocess = PreprocessRenameFunctionStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_FUNCTION,
.address = RenameFunctionStmtObjectAddress, .address = RenameFunctionStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Any_AlterEnum = { static DistributeObjectOps Any_AlterEnum = {
.deparse = DeparseAlterEnumStmt, .deparse = DeparseAlterEnumStmt,
.qualify = QualifyAlterEnumStmt, .qualify = QualifyAlterEnumStmt,
.preprocess = PreprocessAlterEnumStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TYPE,
.address = AlterEnumStmtObjectAddress, .address = AlterEnumStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -92,9 +98,10 @@ static DistributeObjectOps Any_AlterExtensionContents = {
static DistributeObjectOps Any_AlterForeignServer = { static DistributeObjectOps Any_AlterForeignServer = {
.deparse = DeparseAlterForeignServerStmt, .deparse = DeparseAlterForeignServerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterForeignServerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .objectType = OBJECT_FOREIGN_SERVER,
.address = AlterForeignServerStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Any_AlterFunction = { static DistributeObjectOps Any_AlterFunction = {
@ -148,24 +155,29 @@ static DistributeObjectOps Any_Cluster = {
static DistributeObjectOps Any_CompositeType = { static DistributeObjectOps Any_CompositeType = {
.deparse = DeparseCompositeTypeStmt, .deparse = DeparseCompositeTypeStmt,
.qualify = QualifyCompositeTypeStmt, .qualify = QualifyCompositeTypeStmt,
.preprocess = PreprocessCompositeTypeStmt, .preprocess = NULL,
.postprocess = PostprocessCompositeTypeStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_TYPE,
.featureFlag = &EnableCreateTypePropagation,
.address = CompositeTypeStmtObjectAddress, .address = CompositeTypeStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps Any_CreateDomain = { static DistributeObjectOps Any_CreateDomain = {
.deparse = DeparseCreateDomainStmt, .deparse = DeparseCreateDomainStmt,
.qualify = QualifyCreateDomainStmt, .qualify = QualifyCreateDomainStmt,
.preprocess = PreprocessCreateDomainStmt, .preprocess = NULL,
.postprocess = PostprocessCreateDomainStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_DOMAIN,
.address = CreateDomainStmtObjectAddress, .address = CreateDomainStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps Any_CreateEnum = { static DistributeObjectOps Any_CreateEnum = {
.deparse = DeparseCreateEnumStmt, .deparse = DeparseCreateEnumStmt,
.qualify = QualifyCreateEnumStmt, .qualify = QualifyCreateEnumStmt,
.preprocess = PreprocessCreateEnumStmt, .preprocess = NULL,
.postprocess = PostprocessCreateEnumStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_TYPE,
.featureFlag = &EnableCreateTypePropagation,
.address = CreateEnumStmtObjectAddress, .address = CreateEnumStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
@ -196,8 +208,9 @@ static DistributeObjectOps Any_CreatePolicy = {
static DistributeObjectOps Any_CreateForeignServer = { static DistributeObjectOps Any_CreateForeignServer = {
.deparse = DeparseCreateForeignServerStmt, .deparse = DeparseCreateForeignServerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessCreateForeignServerStmt, .preprocess = NULL,
.postprocess = PostprocessCreateForeignServerStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_FOREIGN_SERVER,
.address = CreateForeignServerStmtObjectAddress, .address = CreateForeignServerStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
@ -268,31 +281,34 @@ static DistributeObjectOps Attribute_Rename = {
static DistributeObjectOps Collation_AlterObjectSchema = { static DistributeObjectOps Collation_AlterObjectSchema = {
.deparse = DeparseAlterCollationSchemaStmt, .deparse = DeparseAlterCollationSchemaStmt,
.qualify = QualifyAlterCollationSchemaStmt, .qualify = QualifyAlterCollationSchemaStmt,
.preprocess = PreprocessAlterCollationSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterCollationSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_COLLATION,
.address = AlterCollationSchemaStmtObjectAddress, .address = AlterCollationSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Collation_AlterOwner = { static DistributeObjectOps Collation_AlterOwner = {
.deparse = DeparseAlterCollationOwnerStmt, .deparse = DeparseAlterCollationOwnerStmt,
.qualify = QualifyAlterCollationOwnerStmt, .qualify = QualifyAlterCollationOwnerStmt,
.preprocess = PreprocessAlterCollationOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterCollationOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_COLLATION,
.address = AlterCollationOwnerObjectAddress, .address = AlterCollationOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Collation_Define = { static DistributeObjectOps Collation_Define = {
.deparse = NULL, .deparse = NULL,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDefineCollationStmt, .preprocess = NULL,
.postprocess = PostprocessDefineCollationStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_COLLATION,
.address = DefineCollationStmtObjectAddress, .address = DefineCollationStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps Collation_Drop = { static DistributeObjectOps Collation_Drop = {
.deparse = DeparseDropCollationStmt, .deparse = DeparseDropCollationStmt,
.qualify = QualifyDropCollationStmt, .qualify = QualifyDropCollationStmt,
.preprocess = PreprocessDropCollationStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -300,47 +316,53 @@ static DistributeObjectOps Collation_Drop = {
static DistributeObjectOps Collation_Rename = { static DistributeObjectOps Collation_Rename = {
.deparse = DeparseRenameCollationStmt, .deparse = DeparseRenameCollationStmt,
.qualify = QualifyRenameCollationStmt, .qualify = QualifyRenameCollationStmt,
.preprocess = PreprocessRenameCollationStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_COLLATION,
.address = RenameCollationStmtObjectAddress, .address = RenameCollationStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Database_AlterOwner = { static DistributeObjectOps Database_AlterOwner = {
.deparse = DeparseAlterDatabaseOwnerStmt, .deparse = DeparseAlterDatabaseOwnerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterDatabaseOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDatabaseOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DATABASE,
.featureFlag = &EnableAlterDatabaseOwner,
.address = AlterDatabaseOwnerObjectAddress, .address = AlterDatabaseOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Domain_Alter = { static DistributeObjectOps Domain_Alter = {
.deparse = DeparseAlterDomainStmt, .deparse = DeparseAlterDomainStmt,
.qualify = QualifyAlterDomainStmt, .qualify = QualifyAlterDomainStmt,
.preprocess = PreprocessAlterDomainStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDomainStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DOMAIN,
.address = AlterDomainStmtObjectAddress, .address = AlterDomainStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Domain_AlterObjectSchema = { static DistributeObjectOps Domain_AlterObjectSchema = {
.deparse = DeparseAlterDomainSchemaStmt, .deparse = DeparseAlterDomainSchemaStmt,
.qualify = QualifyAlterDomainSchemaStmt, .qualify = QualifyAlterDomainSchemaStmt,
.preprocess = PreprocessAlterDomainSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDomainSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DOMAIN,
.address = AlterTypeSchemaStmtObjectAddress, .address = AlterTypeSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Domain_AlterOwner = { static DistributeObjectOps Domain_AlterOwner = {
.deparse = DeparseAlterDomainOwnerStmt, .deparse = DeparseAlterDomainOwnerStmt,
.qualify = QualifyAlterDomainOwnerStmt, .qualify = QualifyAlterDomainOwnerStmt,
.preprocess = PreprocessAlterDomainOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterDomainOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_DOMAIN,
.address = AlterDomainOwnerStmtObjectAddress, .address = AlterDomainOwnerStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Domain_Drop = { static DistributeObjectOps Domain_Drop = {
.deparse = DeparseDropDomainStmt, .deparse = DeparseDropDomainStmt,
.qualify = QualifyDropDomainStmt, .qualify = QualifyDropDomainStmt,
.preprocess = PreprocessDropDomainStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -348,8 +370,9 @@ static DistributeObjectOps Domain_Drop = {
static DistributeObjectOps Domain_Rename = { static DistributeObjectOps Domain_Rename = {
.deparse = DeparseRenameDomainStmt, .deparse = DeparseRenameDomainStmt,
.qualify = QualifyRenameDomainStmt, .qualify = QualifyRenameDomainStmt,
.preprocess = PreprocessRenameDomainStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_DOMAIN,
.address = RenameDomainStmtObjectAddress, .address = RenameDomainStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -357,8 +380,9 @@ static DistributeObjectOps Domain_Rename = {
static DistributeObjectOps Domain_RenameConstraint = { static DistributeObjectOps Domain_RenameConstraint = {
.deparse = DeparseDomainRenameConstraintStmt, .deparse = DeparseDomainRenameConstraintStmt,
.qualify = QualifyDomainRenameConstraintStmt, .qualify = QualifyDomainRenameConstraintStmt,
.preprocess = PreprocessDomainRenameConstraintStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_DOMAIN,
.address = DomainRenameConstraintStmtObjectAddress, .address = DomainRenameConstraintStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -381,7 +405,7 @@ static DistributeObjectOps Extension_Drop = {
static DistributeObjectOps ForeignServer_Drop = { static DistributeObjectOps ForeignServer_Drop = {
.deparse = DeparseDropForeignServerStmt, .deparse = DeparseDropForeignServerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropForeignServerStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -389,16 +413,18 @@ static DistributeObjectOps ForeignServer_Drop = {
static DistributeObjectOps ForeignServer_Rename = { static DistributeObjectOps ForeignServer_Rename = {
.deparse = DeparseAlterForeignServerRenameStmt, .deparse = DeparseAlterForeignServerRenameStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessRenameForeignServerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .objectType = OBJECT_FOREIGN_SERVER,
.address = RenameForeignServerStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps ForeignServer_AlterOwner = { static DistributeObjectOps ForeignServer_AlterOwner = {
.deparse = DeparseAlterForeignServerOwnerStmt, .deparse = DeparseAlterForeignServerOwnerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterForeignServerOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterForeignServerOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FOREIGN_SERVER,
.address = AlterForeignServerOwnerStmtObjectAddress, .address = AlterForeignServerOwnerStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -421,23 +447,25 @@ static DistributeObjectOps Function_AlterObjectDepends = {
static DistributeObjectOps Function_AlterObjectSchema = { static DistributeObjectOps Function_AlterObjectSchema = {
.deparse = DeparseAlterFunctionSchemaStmt, .deparse = DeparseAlterFunctionSchemaStmt,
.qualify = QualifyAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt,
.preprocess = PreprocessAlterFunctionSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionSchemaStmtObjectAddress, .address = AlterFunctionSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Function_AlterOwner = { static DistributeObjectOps Function_AlterOwner = {
.deparse = DeparseAlterFunctionOwnerStmt, .deparse = DeparseAlterFunctionOwnerStmt,
.qualify = QualifyAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt,
.preprocess = PreprocessAlterFunctionOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionOwnerObjectAddress, .address = AlterFunctionOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Function_Drop = { static DistributeObjectOps Function_Drop = {
.deparse = DeparseDropFunctionStmt, .deparse = DeparseDropFunctionStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropFunctionStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -445,8 +473,9 @@ static DistributeObjectOps Function_Drop = {
static DistributeObjectOps Function_Rename = { static DistributeObjectOps Function_Rename = {
.deparse = DeparseRenameFunctionStmt, .deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt,
.preprocess = PreprocessRenameFunctionStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_FUNCTION,
.address = RenameFunctionStmtObjectAddress, .address = RenameFunctionStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -485,23 +514,25 @@ static DistributeObjectOps Procedure_AlterObjectDepends = {
static DistributeObjectOps Procedure_AlterObjectSchema = { static DistributeObjectOps Procedure_AlterObjectSchema = {
.deparse = DeparseAlterFunctionSchemaStmt, .deparse = DeparseAlterFunctionSchemaStmt,
.qualify = QualifyAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt,
.preprocess = PreprocessAlterFunctionSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionSchemaStmtObjectAddress, .address = AlterFunctionSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Procedure_AlterOwner = { static DistributeObjectOps Procedure_AlterOwner = {
.deparse = DeparseAlterFunctionOwnerStmt, .deparse = DeparseAlterFunctionOwnerStmt,
.qualify = QualifyAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt,
.preprocess = PreprocessAlterFunctionOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionOwnerObjectAddress, .address = AlterFunctionOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Procedure_Drop = { static DistributeObjectOps Procedure_Drop = {
.deparse = DeparseDropFunctionStmt, .deparse = DeparseDropFunctionStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropFunctionStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -509,8 +540,9 @@ static DistributeObjectOps Procedure_Drop = {
static DistributeObjectOps Procedure_Rename = { static DistributeObjectOps Procedure_Rename = {
.deparse = DeparseRenameFunctionStmt, .deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt,
.preprocess = PreprocessRenameFunctionStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_FUNCTION,
.address = RenameFunctionStmtObjectAddress, .address = RenameFunctionStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -565,32 +597,36 @@ static DistributeObjectOps Sequence_Rename = {
static DistributeObjectOps TextSearchConfig_Alter = { static DistributeObjectOps TextSearchConfig_Alter = {
.deparse = DeparseAlterTextSearchConfigurationStmt, .deparse = DeparseAlterTextSearchConfigurationStmt,
.qualify = QualifyAlterTextSearchConfigurationStmt, .qualify = QualifyAlterTextSearchConfigurationStmt,
.preprocess = PreprocessAlterTextSearchConfigurationStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSCONFIGURATION,
.address = AlterTextSearchConfigurationStmtObjectAddress, .address = AlterTextSearchConfigurationStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchConfig_AlterObjectSchema = { static DistributeObjectOps TextSearchConfig_AlterObjectSchema = {
.deparse = DeparseAlterTextSearchConfigurationSchemaStmt, .deparse = DeparseAlterTextSearchConfigurationSchemaStmt,
.qualify = QualifyAlterTextSearchConfigurationSchemaStmt, .qualify = QualifyAlterTextSearchConfigurationSchemaStmt,
.preprocess = PreprocessAlterTextSearchConfigurationSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterTextSearchConfigurationSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TSCONFIGURATION,
.address = AlterTextSearchConfigurationSchemaStmtObjectAddress, .address = AlterTextSearchConfigurationSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchConfig_AlterOwner = { static DistributeObjectOps TextSearchConfig_AlterOwner = {
.deparse = DeparseAlterTextSearchConfigurationOwnerStmt, .deparse = DeparseAlterTextSearchConfigurationOwnerStmt,
.qualify = QualifyAlterTextSearchConfigurationOwnerStmt, .qualify = QualifyAlterTextSearchConfigurationOwnerStmt,
.preprocess = PreprocessAlterTextSearchConfigurationOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterTextSearchConfigurationOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TSCONFIGURATION,
.address = AlterTextSearchConfigurationOwnerObjectAddress, .address = AlterTextSearchConfigurationOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchConfig_Comment = { static DistributeObjectOps TextSearchConfig_Comment = {
.deparse = DeparseTextSearchConfigurationCommentStmt, .deparse = DeparseTextSearchConfigurationCommentStmt,
.qualify = QualifyTextSearchConfigurationCommentStmt, .qualify = QualifyTextSearchConfigurationCommentStmt,
.preprocess = PreprocessTextSearchConfigurationCommentStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSCONFIGURATION,
.address = TextSearchConfigurationCommentObjectAddress, .address = TextSearchConfigurationCommentObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -598,14 +634,15 @@ static DistributeObjectOps TextSearchConfig_Define = {
.deparse = DeparseCreateTextSearchConfigurationStmt, .deparse = DeparseCreateTextSearchConfigurationStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = NULL, .preprocess = NULL,
.postprocess = PostprocessCreateTextSearchConfigurationStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_TSCONFIGURATION,
.address = CreateTextSearchConfigurationObjectAddress, .address = CreateTextSearchConfigurationObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps TextSearchConfig_Drop = { static DistributeObjectOps TextSearchConfig_Drop = {
.deparse = DeparseDropTextSearchConfigurationStmt, .deparse = DeparseDropTextSearchConfigurationStmt,
.qualify = QualifyDropTextSearchConfigurationStmt, .qualify = QualifyDropTextSearchConfigurationStmt,
.preprocess = PreprocessDropTextSearchConfigurationStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -613,40 +650,45 @@ static DistributeObjectOps TextSearchConfig_Drop = {
static DistributeObjectOps TextSearchConfig_Rename = { static DistributeObjectOps TextSearchConfig_Rename = {
.deparse = DeparseRenameTextSearchConfigurationStmt, .deparse = DeparseRenameTextSearchConfigurationStmt,
.qualify = QualifyRenameTextSearchConfigurationStmt, .qualify = QualifyRenameTextSearchConfigurationStmt,
.preprocess = PreprocessRenameTextSearchConfigurationStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSCONFIGURATION,
.address = RenameTextSearchConfigurationStmtObjectAddress, .address = RenameTextSearchConfigurationStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchDict_Alter = { static DistributeObjectOps TextSearchDict_Alter = {
.deparse = DeparseAlterTextSearchDictionaryStmt, .deparse = DeparseAlterTextSearchDictionaryStmt,
.qualify = QualifyAlterTextSearchDictionaryStmt, .qualify = QualifyAlterTextSearchDictionaryStmt,
.preprocess = PreprocessAlterTextSearchDictionaryStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSDICTIONARY,
.address = AlterTextSearchDictionaryStmtObjectAddress, .address = AlterTextSearchDictionaryStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchDict_AlterObjectSchema = { static DistributeObjectOps TextSearchDict_AlterObjectSchema = {
.deparse = DeparseAlterTextSearchDictionarySchemaStmt, .deparse = DeparseAlterTextSearchDictionarySchemaStmt,
.qualify = QualifyAlterTextSearchDictionarySchemaStmt, .qualify = QualifyAlterTextSearchDictionarySchemaStmt,
.preprocess = PreprocessAlterTextSearchDictionarySchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterTextSearchDictionarySchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TSDICTIONARY,
.address = AlterTextSearchDictionarySchemaStmtObjectAddress, .address = AlterTextSearchDictionarySchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchDict_AlterOwner = { static DistributeObjectOps TextSearchDict_AlterOwner = {
.deparse = DeparseAlterTextSearchDictionaryOwnerStmt, .deparse = DeparseAlterTextSearchDictionaryOwnerStmt,
.qualify = QualifyAlterTextSearchDictionaryOwnerStmt, .qualify = QualifyAlterTextSearchDictionaryOwnerStmt,
.preprocess = PreprocessAlterTextSearchDictionaryOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterTextSearchDictionaryOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TSDICTIONARY,
.address = AlterTextSearchDictOwnerObjectAddress, .address = AlterTextSearchDictOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps TextSearchDict_Comment = { static DistributeObjectOps TextSearchDict_Comment = {
.deparse = DeparseTextSearchDictionaryCommentStmt, .deparse = DeparseTextSearchDictionaryCommentStmt,
.qualify = QualifyTextSearchDictionaryCommentStmt, .qualify = QualifyTextSearchDictionaryCommentStmt,
.preprocess = PreprocessTextSearchDictionaryCommentStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSDICTIONARY,
.address = TextSearchDictCommentObjectAddress, .address = TextSearchDictCommentObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -654,14 +696,15 @@ static DistributeObjectOps TextSearchDict_Define = {
.deparse = DeparseCreateTextSearchDictionaryStmt, .deparse = DeparseCreateTextSearchDictionaryStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = NULL, .preprocess = NULL,
.postprocess = PostprocessCreateTextSearchDictionaryStmt, .postprocess = PostprocessCreateDistributedObjectFromCatalogStmt,
.objectType = OBJECT_TSDICTIONARY,
.address = CreateTextSearchDictObjectAddress, .address = CreateTextSearchDictObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps TextSearchDict_Drop = { static DistributeObjectOps TextSearchDict_Drop = {
.deparse = DeparseDropTextSearchDictionaryStmt, .deparse = DeparseDropTextSearchDictionaryStmt,
.qualify = QualifyDropTextSearchDictionaryStmt, .qualify = QualifyDropTextSearchDictionaryStmt,
.preprocess = PreprocessDropTextSearchDictionaryStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -669,8 +712,9 @@ static DistributeObjectOps TextSearchDict_Drop = {
static DistributeObjectOps TextSearchDict_Rename = { static DistributeObjectOps TextSearchDict_Rename = {
.deparse = DeparseRenameTextSearchDictionaryStmt, .deparse = DeparseRenameTextSearchDictionaryStmt,
.qualify = QualifyRenameTextSearchDictionaryStmt, .qualify = QualifyRenameTextSearchDictionaryStmt,
.preprocess = PreprocessRenameTextSearchDictionaryStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TSDICTIONARY,
.address = RenameTextSearchDictionaryStmtObjectAddress, .address = RenameTextSearchDictionaryStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -685,23 +729,25 @@ static DistributeObjectOps Trigger_AlterObjectDepends = {
static DistributeObjectOps Routine_AlterObjectSchema = { static DistributeObjectOps Routine_AlterObjectSchema = {
.deparse = DeparseAlterFunctionSchemaStmt, .deparse = DeparseAlterFunctionSchemaStmt,
.qualify = QualifyAlterFunctionSchemaStmt, .qualify = QualifyAlterFunctionSchemaStmt,
.preprocess = PreprocessAlterFunctionSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionSchemaStmtObjectAddress, .address = AlterFunctionSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Routine_AlterOwner = { static DistributeObjectOps Routine_AlterOwner = {
.deparse = DeparseAlterFunctionOwnerStmt, .deparse = DeparseAlterFunctionOwnerStmt,
.qualify = QualifyAlterFunctionOwnerStmt, .qualify = QualifyAlterFunctionOwnerStmt,
.preprocess = PreprocessAlterFunctionOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterFunctionOwnerStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_FUNCTION,
.address = AlterFunctionOwnerObjectAddress, .address = AlterFunctionOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Routine_Drop = { static DistributeObjectOps Routine_Drop = {
.deparse = DeparseDropFunctionStmt, .deparse = DeparseDropFunctionStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropFunctionStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -709,8 +755,9 @@ static DistributeObjectOps Routine_Drop = {
static DistributeObjectOps Routine_Rename = { static DistributeObjectOps Routine_Rename = {
.deparse = DeparseRenameFunctionStmt, .deparse = DeparseRenameFunctionStmt,
.qualify = QualifyRenameFunctionStmt, .qualify = QualifyRenameFunctionStmt,
.preprocess = PreprocessRenameFunctionStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_FUNCTION,
.address = RenameFunctionStmtObjectAddress, .address = RenameFunctionStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -733,8 +780,9 @@ static DistributeObjectOps Schema_Grant = {
static DistributeObjectOps Schema_Rename = { static DistributeObjectOps Schema_Rename = {
.deparse = DeparseAlterSchemaRenameStmt, .deparse = DeparseAlterSchemaRenameStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterSchemaRenameStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_SCHEMA,
.address = AlterSchemaRenameStmtObjectAddress, .address = AlterSchemaRenameStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
@ -807,31 +855,34 @@ static DistributeObjectOps Table_Drop = {
static DistributeObjectOps Type_AlterObjectSchema = { static DistributeObjectOps Type_AlterObjectSchema = {
.deparse = DeparseAlterTypeSchemaStmt, .deparse = DeparseAlterTypeSchemaStmt,
.qualify = QualifyAlterTypeSchemaStmt, .qualify = QualifyAlterTypeSchemaStmt,
.preprocess = PreprocessAlterTypeSchemaStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = PostprocessAlterTypeSchemaStmt, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TYPE,
.address = AlterTypeSchemaStmtObjectAddress, .address = AlterTypeSchemaStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Type_AlterOwner = { static DistributeObjectOps Type_AlterOwner = {
.deparse = DeparseAlterTypeOwnerStmt, .deparse = DeparseAlterTypeOwnerStmt,
.qualify = QualifyAlterTypeOwnerStmt, .qualify = QualifyAlterTypeOwnerStmt,
.preprocess = PreprocessAlterTypeOwnerStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = PostprocessAlterDistributedObjectStmt,
.objectType = OBJECT_TYPE,
.address = AlterTypeOwnerObjectAddress, .address = AlterTypeOwnerObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Type_AlterTable = { static DistributeObjectOps Type_AlterTable = {
.deparse = DeparseAlterTypeStmt, .deparse = DeparseAlterTypeStmt,
.qualify = QualifyAlterTypeStmt, .qualify = QualifyAlterTypeStmt,
.preprocess = PreprocessAlterTypeStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TYPE,
.address = AlterTypeStmtObjectAddress, .address = AlterTypeStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };
static DistributeObjectOps Type_Drop = { static DistributeObjectOps Type_Drop = {
.deparse = DeparseDropTypeStmt, .deparse = DeparseDropTypeStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessDropTypeStmt, .preprocess = PreprocessDropDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
.markDistributed = false, .markDistributed = false,
@ -847,8 +898,9 @@ static DistributeObjectOps Trigger_Drop = {
static DistributeObjectOps Type_Rename = { static DistributeObjectOps Type_Rename = {
.deparse = DeparseRenameTypeStmt, .deparse = DeparseRenameTypeStmt,
.qualify = QualifyRenameTypeStmt, .qualify = QualifyRenameTypeStmt,
.preprocess = PreprocessRenameTypeStmt, .preprocess = PreprocessAlterDistributedObjectStmt,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_TYPE,
.address = RenameTypeStmtObjectAddress, .address = RenameTypeStmtObjectAddress,
.markDistributed = false, .markDistributed = false,
}; };

View File

@ -37,382 +37,8 @@
static CollateClause * MakeCollateClauseFromOid(Oid collationOid); static CollateClause * MakeCollateClauseFromOid(Oid collationOid);
static List * FilterNameListForDistributedDomains(List *domainNames, bool missing_ok,
List **distributedDomainAddresses);
static ObjectAddress GetDomainAddressByName(TypeName *domainName, bool missing_ok); static ObjectAddress GetDomainAddressByName(TypeName *domainName, bool missing_ok);
/*
* PreprocessCreateDomainStmt handles the propagation of the create domain statements.
*/
List *
PreprocessCreateDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode(node);
const char *sql = DeparseTreeNode(node);
sql = WrapCreateOrReplace(sql);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessCreateDomainStmt gets called after the domain has been created locally. When
* the domain is decided to be propagated we make sure all the domains dependencies exist
* on all workers.
*/
List *
PostprocessCreateDomainStmt(Node *node, const char *queryString)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
/*
* find object address of the just created object, because the domain has been created
* locally it can't be missing
*/
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;
}
/*
* PreprocessDropDomainStmt gets called before dropping the domain locally. For
* distributed domains it will make sure the fully qualified statement is forwarded to all
* the workers reflecting the drop of the domain.
*/
List *
PreprocessDropDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
if (!ShouldPropagate())
{
return NIL;
}
QualifyTreeNode((Node *) stmt);
List *oldDomains = stmt->objects;
List *distributedDomainAddresses = NIL;
List *distributedDomains = FilterNameListForDistributedDomains(
oldDomains,
stmt->missing_ok,
&distributedDomainAddresses);
if (list_length(distributedDomains) <= 0)
{
/* no distributed domains to drop */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
ObjectAddress *addressItem = NULL;
foreach_ptr(addressItem, distributedDomainAddresses)
{
UnmarkObjectDistributed(addressItem);
}
/*
* temporary swap the lists of objects to delete with the distributed objects and
* deparse to an executable sql statement for the workers
*/
stmt->objects = distributedDomains;
char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldDomains;
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainStmt gets called for all domain specific alter statements. When
* the change happens on a distributed domain we reflect the changes on the workers.
*/
List *
PreprocessAlterDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainStmt gets called after the domain has been altered locally. A
* change on the constraints could cause new (undistributed) objects to be dependencies of
* the domain. Here we recreate any new dependencies on the workers before we forward the
* alter statement to the workers.
*/
List *
PostprocessAlterDomainStmt(Node *node, const char *queryString)
{
AlterDomainStmt *stmt = castNode(AlterDomainStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* PreprocessDomainRenameConstraintStmt gets called locally when a constraint on a domain
* is renamed. When the constraint is on a distributed domain we forward the statement
* appropriately.
*/
List *
PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMCONSTRAINT);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainOwnerStmt called locally when the owner of a constraint is
* changed. For distributed domains the statement is forwarded to all the workers.
*/
List *
PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainOwnerStmt given the change of ownership could cause new
* dependencies to exist for the domain we make sure all dependencies for the domain are
* created before we forward the statement to the workers.
*/
List *
PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* PreprocessRenameDomainStmt creates the statements to execute on the workers when the
* domain being renamed is distributed.
*/
List *
PreprocessRenameDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterDomainSchemaStmt cretes the statements to execute on the workers when
* the domain being moved to a new schema has been distributed.
*/
List *
PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_DOMAIN);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_DOMAIN);
QualifyTreeNode((Node *) stmt);
char *sqlStmt = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sqlStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterDomainSchemaStmt makes sure any new dependencies (as result of the
* schema move) are created on the workers before we forward the statement.
*/
List *
PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
ObjectAddress domainAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&domainAddress))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&domainAddress);
return NIL;
}
/*
* FilterNameListForDistributedDomains given a liost of domain names we will return a list
* filtered with only the names of distributed domains remaining. The pointer to the list
* distributedDomainAddresses is populated with a list of ObjectAddresses of the domains
* that are distributed. Indices between the returned list and the object addresses are
* synchronizes.
* Note: the pointer in distributedDomainAddresses should not be omitted
*
* When missing_ok is false this function will raise an error if a domain identified by a
* domain name cannot be found.
*/
static List *
FilterNameListForDistributedDomains(List *domainNames, bool missing_ok,
List **distributedDomainAddresses)
{
Assert(distributedDomainAddresses != NULL);
List *distributedDomainNames = NIL;
TypeName *domainName = NULL;
foreach_ptr(domainName, domainNames)
{
ObjectAddress objectAddress = GetDomainAddressByName(domainName, missing_ok);
if (IsObjectDistributed(&objectAddress))
{
distributedDomainNames = lappend(distributedDomainNames, domainName);
if (distributedDomainAddresses)
{
ObjectAddress *allocatedAddress = palloc0(sizeof(ObjectAddress));
*allocatedAddress = objectAddress;
*distributedDomainAddresses = lappend(*distributedDomainAddresses,
allocatedAddress);
}
}
}
return distributedDomainNames;
}
/* /*
* GetDomainAddressByName returns the ObjectAddress of the domain identified by * GetDomainAddressByName returns the ObjectAddress of the domain identified by
* domainName. When missing_ok is true the object id part of the ObjectAddress can be * domainName. When missing_ok is true the object id part of the ObjectAddress can be

View File

@ -25,238 +25,9 @@
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
static Node * RecreateForeignServerStmt(Oid serverId); static Node * RecreateForeignServerStmt(Oid serverId);
static bool NameListHasDistributedServer(List *serverNames);
static ObjectAddress GetObjectAddressByServerName(char *serverName, bool missing_ok); static ObjectAddress GetObjectAddressByServerName(char *serverName, bool missing_ok);
/*
* PreprocessCreateForeignServerStmt is called during the planning phase for
* CREATE SERVER.
*/
List *
PreprocessCreateForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_FOREIGN_SERVER);
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterForeignServerStmt is called during the planning phase for
* ALTER SERVER .. OPTIONS ..
*/
List *
PreprocessAlterForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node);
ObjectAddress address = GetObjectAddressByServerName(stmt->servername, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameForeignServerStmt is called during the planning phase for
* ALTER SERVER RENAME.
*/
List *
PreprocessRenameForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_FOREIGN_SERVER);
ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false);
/* filter distributed servers */
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterForeignServerOwnerStmt is called during the planning phase for
* ALTER SERVER .. OWNER TO.
*/
List *
PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_FOREIGN_SERVER);
ObjectAddress address = GetObjectAddressByServerName(strVal(stmt->object), false);
/* filter distributed servers */
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
char *sql = DeparseTreeNode(node);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessDropForeignServerStmt is called during the planning phase for
* DROP SERVER.
*/
List *
PreprocessDropForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_FOREIGN_SERVER);
bool includesDistributedServer = NameListHasDistributedServer(stmt->objects);
if (!includesDistributedServer)
{
return NIL;
}
if (list_length(stmt->objects) > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop distributed server with other servers"),
errhint("Try dropping each object in a separate DROP command")));
}
if (!ShouldPropagate())
{
return NIL;
}
EnsureCoordinator();
Assert(list_length(stmt->objects) == 1);
Value *serverValue = linitial(stmt->objects);
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);
/* unmark distributed server */
UnmarkObjectDistributed(&address);
const char *deparsedStmt = DeparseTreeNode((Node *) stmt);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) deparsedStmt,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessCreateForeignServerStmt is called after a CREATE SERVER command has
* been executed by standard process utility.
*/
List *
PostprocessCreateForeignServerStmt(Node *node, const char *queryString)
{
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
const bool missingOk = false;
ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk);
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PostprocessAlterForeignServerOwnerStmt is called after a ALTER SERVER OWNER command
* has been executed by standard process utility.
*/
List *
PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString)
{
const bool missingOk = false;
ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/* /*
* CreateForeignServerStmtObjectAddress finds the ObjectAddress for the server * CreateForeignServerStmtObjectAddress finds the ObjectAddress for the server
* that is created by given CreateForeignServerStmt. If missingOk is false and if * that is created by given CreateForeignServerStmt. If missingOk is false and if
@ -274,6 +45,41 @@ CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok)
} }
/*
* AlterForeignServerStmtObjectAddress finds the ObjectAddress for the server that is
* changed by given AlterForeignServerStmt. If missingOk is false and if
* the server does not exist, then it errors out.
*
* Never returns NULL, but the objid in the address can be invalid if missingOk
* was set to true.
*/
ObjectAddress
AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok)
{
AlterForeignServerStmt *stmt = castNode(AlterForeignServerStmt, node);
return GetObjectAddressByServerName(stmt->servername, missing_ok);
}
/*
* RenameForeignServerStmtObjectAddress finds the ObjectAddress for the server that is
* renamed by given RenmaeStmt. If missingOk is false and if the server does not exist,
* then it errors out.
*
* Never returns NULL, but the objid in the address can be invalid if missingOk
* was set to true.
*/
ObjectAddress
RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_FOREIGN_SERVER);
return GetObjectAddressByServerName(strVal(stmt->object), missing_ok);
}
/* /*
* AlterForeignServerOwnerStmtObjectAddress finds the ObjectAddress for the server * AlterForeignServerOwnerStmtObjectAddress finds the ObjectAddress for the server
* given in AlterOwnerStmt. If missingOk is false and if * given in AlterOwnerStmt. If missingOk is false and if
@ -355,28 +161,6 @@ RecreateForeignServerStmt(Oid serverId)
} }
/*
* NameListHasDistributedServer takes a namelist of servers and returns true if at least
* one of them is distributed. Returns false otherwise.
*/
static bool
NameListHasDistributedServer(List *serverNames)
{
Value *serverValue = NULL;
foreach_ptr(serverValue, serverNames)
{
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);
if (IsObjectDistributed(&address))
{
return true;
}
}
return false;
}
static ObjectAddress static ObjectAddress
GetObjectAddressByServerName(char *serverName, bool missing_ok) GetObjectAddressByServerName(char *serverName, bool missing_ok)
{ {

View File

@ -1502,234 +1502,6 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString,
} }
/*
* PreprocessRenameFunctionStmt is called when the user is renaming a function. The invocation
* happens before the statement is applied locally.
*
* As the function already exists we have access to the ObjectAddress, this is used to
* check if it is distributed. If so the rename is executed on all the workers to keep the
* types in sync across the cluster.
*/
List *
PreprocessRenameFunctionStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
AssertObjectTypeIsFunctional(stmt->renameType);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterFunctionSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers.
*/
List *
PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
AssertObjectTypeIsFunctional(stmt->objectType);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterFunctionOwnerStmt is called for change of owner ship of functions before the owner
* ship is changed on the local instance.
*
* If the function for which the owner is changed is distributed we execute the change on
* all the workers to keep the type in sync across the cluster.
*/
List *
PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
AssertObjectTypeIsFunctional(stmt->objectType);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_FUNCTION);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterFunctionOwnerStmt is invoked after the owner has been changed locally.
* Since changing the owner could result in new dependencies being found for this object
* we re-ensure all the dependencies for the function do exist.
*
* This is solely to propagate the new owner (and all its dependencies) if it was not
* already distributed in the cluster.
*/
List *
PostprocessAlterFunctionOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
AssertObjectTypeIsFunctional(stmt->objectType);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(&address))
{
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PreprocessDropFunctionStmt gets called during the planning phase of a DROP FUNCTION statement
* and returns a list of DDLJob's that will drop any distributed functions from the
* workers.
*
* The DropStmt could have multiple objects to drop, the list of objects will be filtered
* to only keep the distributed functions for deletion on the workers. Non-distributed
* functions will still be dropped locally but not on the workers.
*/
List *
PreprocessDropFunctionStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
List *deletingObjectWithArgsList = stmt->objects;
List *distributedObjectWithArgsList = NIL;
List *distributedFunctionAddresses = NIL;
AssertObjectTypeIsFunctional(stmt->removeType);
if (creating_extension)
{
/*
* extensions should be created separately on the workers, types cascading from an
* extension should therefore not be propagated here.
*/
return NIL;
}
if (!EnableMetadataSync)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return NIL;
}
/*
* Our statements need to be fully qualified so we can drop them from the right schema
* on the workers
*/
QualifyTreeNode((Node *) stmt);
/*
* iterate over all functions to be dropped and filter to keep only distributed
* functions.
*/
ObjectWithArgs *func = NULL;
foreach_ptr(func, deletingObjectWithArgsList)
{
ObjectAddress address = FunctionToObjectAddress(stmt->removeType, func,
stmt->missing_ok);
if (!IsObjectDistributed(&address))
{
continue;
}
/* collect information for all distributed functions */
ObjectAddress *addressp = palloc(sizeof(ObjectAddress));
*addressp = address;
distributedFunctionAddresses = lappend(distributedFunctionAddresses, addressp);
distributedObjectWithArgsList = lappend(distributedObjectWithArgsList, func);
}
if (list_length(distributedObjectWithArgsList) <= 0)
{
/* no distributed functions to drop */
return NIL;
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
*/
EnsureCoordinator();
EnsureSequentialMode(OBJECT_FUNCTION);
/* remove the entries for the distributed objects on dropping */
ObjectAddress *address = NULL;
foreach_ptr(address, distributedFunctionAddresses)
{
UnmarkObjectDistributed(address);
}
/*
* Swap the list of objects before deparsing and restore the old list after. This
* ensures we only have distributed functions in the deparsed drop statement.
*/
DropStmt *stmtCopy = copyObject(stmt);
stmtCopy->objects = distributedObjectWithArgsList;
const char *dropStmtSql = DeparseTreeNode((Node *) stmtCopy);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/* /*
* PreprocessAlterFunctionDependsStmt is called during the planning phase of an * PreprocessAlterFunctionDependsStmt is called during the planning phase of an
* ALTER FUNCION ... DEPENDS ON EXTENSION ... statement. Since functions depending on * ALTER FUNCION ... DEPENDS ON EXTENSION ... statement. Since functions depending on
@ -1803,30 +1575,6 @@ AlterFunctionDependsStmtObjectAddress(Node *node, bool missing_ok)
} }
/*
* PostprocessAlterFunctionSchemaStmt is executed after the change has been applied locally,
* we can now use the new dependencies of the function to ensure all its dependencies
* exist on the workers before we apply the commands remotely.
*/
List *
PostprocessAlterFunctionSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
AssertObjectTypeIsFunctional(stmt->objectType);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateAlterFunction(&address))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/* /*
* AlterFunctionStmtObjectAddress returns the ObjectAddress of the subject in the * AlterFunctionStmtObjectAddress returns the ObjectAddress of the subject in the
* AlterFunctionStmt. If missing_ok is set to false an error will be raised if postgres * AlterFunctionStmt. If missing_ok is set to false an error will be raised if postgres

View File

@ -182,43 +182,6 @@ PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
} }
/*
* PreprocessAlterSchemaRenameStmt is called when the user is renaming a schema.
* The invocation happens before the statement is applied locally.
*
* As the schema already exists we have access to the ObjectAddress for the schema, this
* is used to check if the schmea is distributed. If the schema is distributed the rename
* is executed on all the workers to keep the schemas in sync across the cluster.
*/
List *
PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
ObjectAddress schemaAddress = GetObjectAddressFromParseTree(node, false);
if (!ShouldPropagateObject(&schemaAddress))
{
return NIL;
}
EnsureCoordinator();
/* fully qualify */
QualifyTreeNode(node);
/* deparse sql*/
const char *renameStmtSql = DeparseTreeNode(node);
EnsureSequentialMode(OBJECT_SCHEMA);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/* /*
* CreateSchemaStmtObjectAddress returns the ObjectAddress of the schema that is * CreateSchemaStmtObjectAddress returns the ObjectAddress of the schema that is
* the object of the CreateSchemaStmt. Errors if missing_ok is false. * the object of the CreateSchemaStmt. Errors if missing_ok is false.

View File

@ -42,8 +42,6 @@
#include "distributed/worker_create_or_replace.h" #include "distributed/worker_create_or_replace.h"
static List * GetDistributedTextSearchConfigurationNames(DropStmt *stmt);
static List * GetDistributedTextSearchDictionaryNames(DropStmt *stmt);
static DefineStmt * GetTextSearchConfigDefineStmt(Oid tsconfigOid); static DefineStmt * GetTextSearchConfigDefineStmt(Oid tsconfigOid);
static DefineStmt * GetTextSearchDictionaryDefineStmt(Oid tsdictOid); static DefineStmt * GetTextSearchDictionaryDefineStmt(Oid tsdictOid);
static List * GetTextSearchDictionaryInitOptions(HeapTuple tup, Form_pg_ts_dict dict); static List * GetTextSearchDictionaryInitOptions(HeapTuple tup, Form_pg_ts_dict dict);
@ -59,113 +57,6 @@ static List * get_ts_template_namelist(Oid tstemplateOid);
static Oid get_ts_config_parser_oid(Oid tsconfigOid); static Oid get_ts_config_parser_oid(Oid tsconfigOid);
static char * get_ts_parser_tokentype_name(Oid parserOid, int32 tokentype); static char * get_ts_parser_tokentype_name(Oid parserOid, int32 tokentype);
/*
* PostprocessCreateTextSearchConfigurationStmt is called after the TEXT SEARCH
* CONFIGURATION has been created locally.
*
* Contrary to many other objects a text search configuration is often created as a copy
* of an existing configuration. After the copy there is no relation to the configuration
* that has been copied. This prevents our normal approach of ensuring dependencies to
* exist before forwarding a close ressemblance of the statement the user executed.
*
* Instead we recreate the object based on what we find in our own catalog, hence the
* amount of work we perform in the postprocess function, contrary to other objects.
*/
List *
PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString)
{
DefineStmt *stmt = castNode(DefineStmt, node);
Assert(stmt->kind == OBJECT_TSCONFIGURATION);
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
/*
* TEXT SEARCH CONFIGURATION objects are more complex with their mappings and the
* possibility of copying from existing templates that we will require the idempotent
* recreation commands to be run for successful propagation
*/
List *commands = CreateTextSearchConfigDDLCommandsIdempotent(&address);
commands = lcons(DISABLE_DDL_PROPAGATION, commands);
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessCreateTextSearchDictionaryStmt is called after the TEXT SEARCH DICTIONARY has been
* created locally.
*/
List *
PostprocessCreateTextSearchDictionaryStmt(Node *node, const char *queryString)
{
DefineStmt *stmt = castNode(DefineStmt, node);
Assert(stmt->kind == OBJECT_TSDICTIONARY);
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
QualifyTreeNode(node);
const char *createTSDictionaryStmtSql = DeparseTreeNode(node);
/*
* To prevent recursive propagation in mx architecture, we disable ddl
* propagation before sending the command to workers.
*/
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createTSDictionaryStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
List * List *
GetCreateTextSearchConfigStatements(const ObjectAddress *address) GetCreateTextSearchConfigStatements(const ObjectAddress *address)
{ {
@ -234,602 +125,6 @@ CreateTextSearchDictDDLCommandsIdempotent(const ObjectAddress *address)
} }
/*
* PreprocessDropTextSearchConfigurationStmt prepares the statements we need to send to
* the workers. After we have dropped the configurations locally they also got removed from
* pg_dist_object so it is important to do all distribution checks before the change is
* made locally.
*/
List *
PreprocessDropTextSearchConfigurationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_TSCONFIGURATION);
if (!ShouldPropagate())
{
return NIL;
}
List *distributedObjects = GetDistributedTextSearchConfigurationNames(stmt);
if (list_length(distributedObjects) == 0)
{
/* no distributed objects to remove */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
/*
* Temporarily replace the list of objects being dropped with only the list
* containing the distributed objects. After we have created the sql statement we
* restore the original list of objects to execute on locally.
*
* Because searchpaths on coordinator and workers might not be in sync we fully
* qualify the list before deparsing. This is safe because qualification doesn't
* change the original names in place, but insteads creates new ones.
*/
List *originalObjects = stmt->objects;
stmt->objects = distributedObjects;
QualifyTreeNode((Node *) stmt);
const char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = originalObjects;
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessDropTextSearchDictionaryStmt prepares the statements we need to send to
* the workers. After we have dropped the dictionaries locally they also got removed from
* pg_dist_object so it is important to do all distribution checks before the change is
* made locally.
*/
List *
PreprocessDropTextSearchDictionaryStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
Assert(stmt->removeType == OBJECT_TSDICTIONARY);
if (!ShouldPropagate())
{
return NIL;
}
List *distributedObjects = GetDistributedTextSearchDictionaryNames(stmt);
if (list_length(distributedObjects) == 0)
{
/* no distributed objects to remove */
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
/*
* Temporarily replace the list of objects being dropped with only the list
* containing the distributed objects. After we have created the sql statement we
* restore the original list of objects to execute on locally.
*
* Because searchpaths on coordinator and workers might not be in sync we fully
* qualify the list before deparsing. This is safe because qualification doesn't
* change the original names in place, but insteads creates new ones.
*/
List *originalObjects = stmt->objects;
stmt->objects = distributedObjects;
QualifyTreeNode((Node *) stmt);
const char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = originalObjects;
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* GetDistributedTextSearchConfigurationNames iterates over all text search configurations
* dropped, and create a list containing all configurations that are distributed.
*/
static List *
GetDistributedTextSearchConfigurationNames(DropStmt *stmt)
{
List *objName = NULL;
List *distributedObjects = NIL;
foreach_ptr(objName, stmt->objects)
{
Oid tsconfigOid = get_ts_config_oid(objName, stmt->missing_ok);
if (!OidIsValid(tsconfigOid))
{
/* skip missing configuration names, they can't be distributed */
continue;
}
ObjectAddress address = { 0 };
ObjectAddressSet(address, TSConfigRelationId, tsconfigOid);
if (!IsObjectDistributed(&address))
{
continue;
}
distributedObjects = lappend(distributedObjects, objName);
}
return distributedObjects;
}
/*
* GetDistributedTextSearchDictionaryNames iterates over all text search dictionaries
* dropped, and create a list containing all dictionaries that are distributed.
*/
static List *
GetDistributedTextSearchDictionaryNames(DropStmt *stmt)
{
List *objName = NULL;
List *distributedObjects = NIL;
foreach_ptr(objName, stmt->objects)
{
Oid tsdictOid = get_ts_dict_oid(objName, stmt->missing_ok);
if (!OidIsValid(tsdictOid))
{
/* skip missing dictionary names, they can't be distributed */
continue;
}
ObjectAddress address = { 0 };
ObjectAddressSet(address, TSDictionaryRelationId, tsdictOid);
if (!IsObjectDistributed(&address))
{
continue;
}
distributedObjects = lappend(distributedObjects, objName);
}
return distributedObjects;
}
/*
* PreprocessAlterTextSearchConfigurationStmt verifies if the configuration being altered
* is distributed in the cluster. If that is the case it will prepare the list of commands
* to send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchConfigurationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterTSConfigurationStmt *stmt = castNode(AlterTSConfigurationStmt, node);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
QualifyTreeNode((Node *) stmt);
const char *alterStmtSql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTextSearchDictionaryStmt verifies if the dictionary being altered is
* distributed in the cluster. If that is the case it will prepare the list of commands to
* send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchDictionaryStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterTSDictionaryStmt *stmt = castNode(AlterTSDictionaryStmt, node);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
QualifyTreeNode((Node *) stmt);
const char *alterStmtSql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameTextSearchConfigurationStmt verifies if the configuration being altered
* is distributed in the cluster. If that is the case it will prepare the list of commands
* to send to the worker to apply the same changes remote.
*/
List *
PreprocessRenameTextSearchConfigurationStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
QualifyTreeNode((Node *) stmt);
char *ddlCommand = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) ddlCommand,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameTextSearchDictionaryStmt verifies if the dictionary being altered
* is distributed in the cluster. If that is the case it will prepare the list of commands
* to send to the worker to apply the same changes remote.
*/
List *
PreprocessRenameTextSearchDictionaryStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
RenameStmt *stmt = castNode(RenameStmt, node);
Assert(stmt->renameType == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
QualifyTreeNode((Node *) stmt);
char *ddlCommand = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) ddlCommand,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTextSearchConfigurationSchemaStmt verifies if the configuration being
* altered is distributed in the cluster. If that is the case it will prepare the list of
* commands to send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchConfigurationSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTextSearchDictionarySchemaStmt verifies if the dictionary being
* altered is distributed in the cluster. If that is the case it will prepare the list of
* commands to send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchDictionarySchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterTextSearchConfigurationSchemaStmt is invoked after the schema has been
* changed locally. Since changing the schema could result in new dependencies being found
* for this object we re-ensure all the dependencies for the configuration do exist. This
* is solely to propagate the new schema (and all its dependencies) if it was not already
* distributed in the cluster.
*/
List *
PostprocessAlterTextSearchConfigurationSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PostprocessAlterTextSearchDictionarySchemaStmt is invoked after the schema has been
* changed locally. Since changing the schema could result in new dependencies being found
* for this object we re-ensure all the dependencies for the dictionary do exist. This
* is solely to propagate the new schema (and all its dependencies) if it was not already
* distributed in the cluster.
*/
List *
PostprocessAlterTextSearchDictionarySchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt,
stmt->missing_ok);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PreprocessTextSearchConfigurationCommentStmt propagates any comment on a distributed
* configuration to the workers. Since comments for configurations are promenently shown
* when listing all text search configurations this is purely a cosmetic thing when
* running in MX.
*/
List *
PreprocessTextSearchConfigurationCommentStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessTextSearchDictionaryCommentStmt propagates any comment on a distributed
* dictionary to the workers. Since comments for dictionaries are promenently shown
* when listing all text search dictionaries this is purely a cosmetic thing when
* running in MX.
*/
List *
PreprocessTextSearchDictionaryCommentStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
CommentStmt *stmt = castNode(CommentStmt, node);
Assert(stmt->objtype == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTextSearchConfigurationOwnerStmt verifies if the configuration being
* altered is distributed in the cluster. If that is the case it will prepare the list of
* commands to send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchConfigurationOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTextSearchDictionaryOwnerStmt verifies if the dictionary being
* altered is distributed in the cluster. If that is the case it will prepare the list of
* commands to send to the worker to apply the same changes remote.
*/
List *
PreprocessAlterTextSearchDictionaryOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
EnsureCoordinator();
EnsureSequentialMode(OBJECT_TSDICTIONARY);
QualifyTreeNode((Node *) stmt);
char *sql = DeparseTreeNode((Node *) stmt);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterTextSearchConfigurationOwnerStmt is invoked after the owner has been
* changed locally. Since changing the owner could result in new dependencies being found
* for this object we re-ensure all the dependencies for the configuration do exist. This
* is solely to propagate the new owner (and all its dependencies) if it was not already
* distributed in the cluster.
*/
List *
PostprocessAlterTextSearchConfigurationOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
/* dependencies have changed (owner) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/*
* PostprocessAlterTextSearchDictionaryOwnerStmt is invoked after the owner has been
* changed locally. Since changing the owner could result in new dependencies being found
* for this object we re-ensure all the dependencies for the dictionary do exist. This
* is solely to propagate the new owner (and all its dependencies) if it was not already
* distributed in the cluster.
*/
List *
PostprocessAlterTextSearchDictionaryOwnerStmt(Node *node, const char *queryString)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&address))
{
return NIL;
}
/* dependencies have changed (owner) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&address);
return NIL;
}
/* /*
* GetTextSearchConfigDefineStmt returns the DefineStmt for a TEXT SEARCH CONFIGURATION * GetTextSearchConfigDefineStmt returns the DefineStmt for a TEXT SEARCH CONFIGURATION
* based on the configuration as defined in the catalog identified by tsconfigOid. * based on the configuration as defined in the catalog identified by tsconfigOid.

View File

@ -90,8 +90,6 @@
bool EnableCreateTypePropagation = true; bool EnableCreateTypePropagation = true;
/* forward declaration for helper functions*/ /* forward declaration for helper functions*/
static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok);
static List * TypeNameListToObjectAddresses(List *objects);
static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation);
static Oid GetTypeOwner(Oid typeOid); static Oid GetTypeOwner(Oid typeOid);
static Oid LookupNonAssociatedArrayTypeNameOid(ParseState *pstate, static Oid LookupNonAssociatedArrayTypeNameOid(ParseState *pstate,
@ -104,365 +102,6 @@ static List * CompositeTypeColumnDefList(Oid typeOid);
static CreateEnumStmt * RecreateEnumStmt(Oid typeOid); static CreateEnumStmt * RecreateEnumStmt(Oid typeOid);
static List * EnumValsList(Oid typeOid); static List * EnumValsList(Oid typeOid);
static bool ShouldPropagateTypeCreate(void);
/*
* PreprocessCompositeTypeStmt is called during the creation of a composite type. It is executed
* before the statement is applied locally.
*
* We decide if the compisite type needs to be replicated to the worker, and if that is
* the case return a list of DDLJob's that describe how and where the type needs to be
* created.
*
* Since the planning happens before the statement has been applied locally we do not have
* access to the ObjectAddress of the new type.
*/
List *
PreprocessCompositeTypeStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagateTypeCreate())
{
return NIL;
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here
*/
EnsureCoordinator();
/* fully qualify before lookup and later deparsing */
QualifyTreeNode(node);
return NIL;
}
/*
* PostprocessCompositeTypeStmt is executed after the type has been created locally and before
* we create it on the remote servers. Here we have access to the ObjectAddress of the new
* type which we use to make sure the type's dependencies are on all nodes.
*/
List *
PostprocessCompositeTypeStmt(Node *node, const char *queryString)
{
/* same check we perform during planning of the statement */
if (!ShouldPropagateTypeCreate())
{
return NIL;
}
/*
* find object address of the just created object, because the type has been created
* locally it can't be missing
*/
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
/* If the type has any unsupported dependency, create it locally */
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
*/
EnsureSequentialMode(OBJECT_TYPE);
EnsureDependenciesExistOnAllNodes(&typeAddress);
/*
* reconstruct creation statement in a portable fashion. The create_or_replace helper
* function will be used to create the type in an idempotent manner on the workers.
*
* Types could exist on the worker prior to being created on the coordinator when the
* type previously has been attempted to be created in a transaction which did not
* commit on the coordinator.
*/
const char *compositeTypeStmtSql = DeparseCompositeTypeStmt(node);
compositeTypeStmtSql = WrapCreateOrReplace(compositeTypeStmtSql);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) compositeTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterTypeStmt is invoked for alter type statements for composite types.
*
* Normally we would have a process step as well to re-ensure dependencies exists, however
* this is already implemented by the post processing for adding columns to tables.
*/
List *
PreprocessAlterTypeStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
Assert(AlterTableStmtObjType_compat(stmt) == OBJECT_TYPE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
EnsureCoordinator();
/* reconstruct alter statement in a portable fashion */
QualifyTreeNode((Node *) stmt);
const char *alterTypeStmtSql = DeparseTreeNode((Node *) stmt);
/*
* all types that are distributed will need their alter statements propagated
* regardless if in a transaction or not. If we would not propagate the alter
* statement the types would be different on worker and coordinator.
*/
EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterTypeStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessCreateEnumStmt is called before the statement gets applied locally.
*
* It decides if the create statement will be applied to the workers and if that is the
* case returns a list of DDLJobs that will be executed _after_ the statement has been
* applied locally.
*
* Since planning is done before we have created the object locally we do not have an
* ObjectAddress for the new type just yet.
*/
List *
PreprocessCreateEnumStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagateTypeCreate())
{
return NIL;
}
/* managing types can only be done on the coordinator */
EnsureCoordinator();
/* enforce fully qualified typeName for correct deparsing and lookup */
QualifyTreeNode(node);
return NIL;
}
/*
* PostprocessCreateEnumStmt is called after the statement has been applied locally, but
* before the plan on how to create the types on the workers has been executed.
*
* We apply the same checks to verify if the type should be distributed, if that is the
* case we resolve the ObjectAddress for the just created object, distribute its
* dependencies to all the nodes, and mark the object as distributed.
*/
List *
PostprocessCreateEnumStmt(Node *node, const char *queryString)
{
if (!ShouldPropagateTypeCreate())
{
return NIL;
}
/* lookup type address of just created type */
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
*/
EnsureSequentialMode(OBJECT_TYPE);
EnsureDependenciesExistOnAllNodes(&typeAddress);
/* reconstruct creation statement in a portable fashion */
const char *createEnumStmtSql = DeparseCreateEnumStmt(node);
createEnumStmtSql = WrapCreateOrReplace(createEnumStmtSql);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessAlterEnumStmt handles ALTER TYPE ... ADD VALUE for enum based types. Planning
* happens before the statement has been applied locally.
*
* Since it is an alter of an existing type we actually have the ObjectAddress. This is
* used to check if the type is distributed, if so the alter will be executed on the
* workers directly to keep the types in sync across the cluster.
*/
List *
PreprocessAlterEnumStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
/*
* alter enum will run for all distributed enums, regardless if in a transaction or
* not since the enum will be different on the coordinator and workers if we didn't.
* (adding values to an enum can not run in a transaction anyway and would error by
* postgres already).
*/
EnsureSequentialMode(OBJECT_TYPE);
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here
*/
EnsureCoordinator();
QualifyTreeNode(node);
const char *alterEnumStmtSql = DeparseTreeNode(node);
/*
* Before pg12 ALTER ENUM ... ADD VALUE could not be within a xact block. Instead of
* creating a DDLTaksList we won't return anything here. During the processing phase
* we directly connect to workers and execute the commands remotely.
*/
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) alterEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessDropTypeStmt is called for all DROP TYPE statements. For all types in the list that
* citus has distributed to the workers it will drop the type on the workers as well. If
* no types in the drop list are distributed no calls will be made to the workers.
*/
List *
PreprocessDropTypeStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropStmt *stmt = castNode(DropStmt, node);
/*
* We swap the list of objects to remove during deparse so we need a reference back to
* the old list to put back
*/
List *oldTypes = stmt->objects;
if (!ShouldPropagate())
{
return NIL;
}
List *distributedTypes = FilterNameListForDistributedTypes(oldTypes,
stmt->missing_ok);
if (list_length(distributedTypes) <= 0)
{
/* no distributed types to drop */
return NIL;
}
/*
* managing types can only be done on the coordinator if ddl propagation is on. when
* it is off we will never get here. MX workers don't have a notion of distributed
* types, so we block the call.
*/
EnsureCoordinator();
/*
* remove the entries for the distributed objects on dropping
*/
List *distributedTypeAddresses = TypeNameListToObjectAddresses(distributedTypes);
ObjectAddress *address = NULL;
foreach_ptr(address, distributedTypeAddresses)
{
UnmarkObjectDistributed(address);
}
/*
* temporary swap the lists of objects to delete with the distributed objects and
* deparse to an executable sql statement for the workers
*/
stmt->objects = distributedTypes;
char *dropStmtSql = DeparseTreeNode((Node *) stmt);
stmt->objects = oldTypes;
EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
dropStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PreprocessRenameTypeStmt is called when the user is renaming the type. The invocation happens
* before the statement is applied locally.
*
* As the type already exists we have access to the ObjectAddress for the type, this is
* used to check if the type is distributed. If the type is distributed the rename is
* executed on all the workers to keep the types in sync across the cluster.
*/
List *
PreprocessRenameTypeStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
EnsureCoordinator();
/* fully qualify */
QualifyTreeNode(node);
/* deparse sql*/
const char *renameStmtSql = DeparseTreeNode(node);
EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) renameStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/* /*
* PreprocessRenameTypeAttributeStmt is called for changes of attribute names for composite * PreprocessRenameTypeAttributeStmt is called for changes of attribute names for composite
* types. Planning is called before the statement is applied locally. * types. Planning is called before the statement is applied locally.
@ -499,98 +138,6 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString,
} }
/*
* PreprocessAlterTypeSchemaStmt is executed before the statement is applied to the local
* postgres instance.
*
* In this stage we can prepare the commands that need to be run on all workers.
*/
List *
PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TYPE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/*
* PostprocessAlterTypeSchemaStmt is executed after the change has been applied locally, we
* can now use the new dependencies of the type to ensure all its dependencies exist on
* the workers before we apply the commands remotely.
*/
List *
PostprocessAlterTypeSchemaStmt(Node *node, const char *queryString)
{
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_TYPE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
/* dependencies have changed (schema) let's ensure they exist */
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;
}
/*
* PreprocessAlterTypeOwnerStmt is called for change of ownership of types before the
* ownership is changed on the local instance.
*
* If the type for which the owner is changed is distributed we execute the change on all
* the workers to keep the type in sync across the cluster.
*/
List *
PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_TYPE);
ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!ShouldPropagateObject(&typeAddress))
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt);
EnsureSequentialMode(OBJECT_TYPE);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}
/* /*
* CreateTypeStmtByObjectAddress returns a parsetree for the CREATE TYPE statement to * CreateTypeStmtByObjectAddress returns a parsetree for the CREATE TYPE statement to
* recreate the type by its object address. * recreate the type by its object address.
@ -1051,60 +598,6 @@ GenerateBackupNameForTypeCollision(const ObjectAddress *address)
} }
/*
* FilterNameListForDistributedTypes takes a list of objects to delete, for Types this
* will be a list of TypeName. This list is filtered against the types that are
* distributed.
*
* The original list will not be touched, a new list will be created with only the objects
* in there.
*/
static List *
FilterNameListForDistributedTypes(List *objects, bool missing_ok)
{
List *result = NIL;
TypeName *typeName = NULL;
foreach_ptr(typeName, objects)
{
Oid typeOid = LookupTypeNameOid(NULL, typeName, missing_ok);
ObjectAddress typeAddress = { 0 };
if (!OidIsValid(typeOid))
{
continue;
}
ObjectAddressSet(typeAddress, TypeRelationId, typeOid);
if (IsObjectDistributed(&typeAddress))
{
result = lappend(result, typeName);
}
}
return result;
}
/*
* TypeNameListToObjectAddresses transforms a List * of TypeName *'s into a List * of
* ObjectAddress *'s. For this to succeed all Types identified by the TypeName *'s should
* exist on this postgres, an error will be thrown otherwise.
*/
static List *
TypeNameListToObjectAddresses(List *objects)
{
List *result = NIL;
TypeName *typeName = NULL;
foreach_ptr(typeName, objects)
{
Oid typeOid = LookupTypeNameOid(NULL, typeName, false);
ObjectAddress *typeAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*typeAddress, TypeRelationId, typeOid);
result = lappend(result, typeAddress);
}
return result;
}
/* /*
* GetTypeOwner * GetTypeOwner
* *
@ -1145,47 +638,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation)
} }
/*
* ShouldPropagateTypeCreate returns if we should propagate the creation of a type.
*
* There are two moments we decide to not directly propagate the creation of a type.
* - During the creation of an Extension; we assume the type will be created by creating
* the extension on the worker
* - During a transaction block; if types are used in a distributed table in the same
* block we can only provide parallelism on the table if we do not change to sequential
* mode. Types will be propagated outside of this transaction to the workers so that
* the transaction can use 1 connection per shard and fully utilize citus' parallelism
*/
static bool
ShouldPropagateTypeCreate()
{
if (!ShouldPropagate())
{
return false;
}
if (!EnableCreateTypePropagation)
{
/*
* Administrator has turned of type creation propagation
*/
return false;
}
/*
* by not propagating in a transaction block we allow for parallelism to be used when
* this type will be used as a column in a table that will be created and distributed
* in this same transaction.
*/
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return false;
}
return true;
}
/* /*
* LookupNonAssociatedArrayTypeNameOid returns the oid of the type with the given type name * LookupNonAssociatedArrayTypeNameOid returns the oid of the type with the given type name
* that is not an array type that is associated to another user defined type. * that is not an array type that is associated to another user defined type.

View File

@ -525,6 +525,20 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
parsetree = pstmt->utilityStmt; parsetree = pstmt->utilityStmt;
ops = GetDistributeObjectOps(parsetree); ops = GetDistributeObjectOps(parsetree);
/*
* For some statements Citus defines a Qualify function. The goal of this function
* is to take any ambiguity from the statement that is contextual on either the
* search_path or current settings.
* Instead of relying on the search_path and settings we replace any deduced bits
* and fill them out how postgres would resolve them. This makes subsequent
* deserialize calls for the statement portable to other postgres servers, the
* workers in our case.
*/
if (ops && ops->qualify)
{
ops->qualify(parsetree);
}
if (ops && ops->preprocess) if (ops && ops->preprocess)
{ {
ddlJobs = ops->preprocess(parsetree, queryString, context); ddlJobs = ops->preprocess(parsetree, queryString, context);

View File

@ -63,6 +63,15 @@ typedef struct DistributeObjectOps
List * (*postprocess)(Node *, const char *); List * (*postprocess)(Node *, const char *);
ObjectAddress (*address)(Node *, bool); ObjectAddress (*address)(Node *, bool);
bool markDistributed; bool markDistributed;
/* fields used by common implementations, omitted for specialized implementations */
ObjectType objectType;
/*
* Points to the varriable that contains the GUC'd feature flag, when turned off the
* common propagation functions will not propagate the creation of the object.
*/
bool *featureFlag;
} DistributeObjectOps; } DistributeObjectOps;
#define CITUS_TRUNCATE_TRIGGER_NAME "citus_truncate_trigger" #define CITUS_TRUNCATE_TRIGGER_NAME "citus_truncate_trigger"
@ -122,15 +131,21 @@ typedef enum SearchForeignKeyColumnFlags
} SearchForeignKeyColumnFlags; } SearchForeignKeyColumnFlags;
/* aggregate.c - forward declarations */
extern List * PreprocessDefineAggregateStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessDefineAggregateStmt(Node *node, const char *queryString);
/* cluster.c - forward declarations */ /* cluster.c - forward declarations */
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand, extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
/* common.c - forward declarations*/
extern List * PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt,
const char *queryString);
extern List * PreprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterDistributedObjectStmt(Node *stmt, const char *queryString);
extern List * PreprocessDropDistributedObjectStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
/* index.c */ /* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
@ -143,54 +158,17 @@ extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *d
extern char * CreateCollationDDL(Oid collationId); extern char * CreateCollationDDL(Oid collationId);
extern List * CreateCollationDDLsIdempotent(Oid collationId); extern List * CreateCollationDDLsIdempotent(Oid collationId);
extern ObjectAddress AlterCollationOwnerObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress AlterCollationOwnerObjectAddress(Node *stmt, bool missing_ok);
extern List * PreprocessDropCollationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterCollationOwnerStmt(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterCollationOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessRenameCollationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern ObjectAddress RenameCollationStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress RenameCollationStmtObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress AlterCollationSchemaStmtObjectAddress(Node *stmt, extern ObjectAddress AlterCollationSchemaStmtObjectAddress(Node *stmt,
bool missing_ok); bool missing_ok);
extern List * PostprocessAlterCollationSchemaStmt(Node *stmt, const char *queryString);
extern char * GenerateBackupNameForCollationCollision(const ObjectAddress *address); extern char * GenerateBackupNameForCollationCollision(const ObjectAddress *address);
extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress DefineCollationStmtObjectAddress(Node *stmt, bool missing_ok);
extern List * PreprocessDefineCollationStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessDefineCollationStmt(Node *stmt, const char *queryString);
/* database.c - forward declarations */ /* database.c - forward declarations */
extern List * PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString);
extern ObjectAddress AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok);
extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address); extern List * DatabaseOwnerDDLCommands(const ObjectAddress *address);
/* domain.c - forward declarations */ /* domain.c - forward declarations */
extern List * PreprocessCreateDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateDomainStmt(Node *node, const char *queryString);
extern List * PreprocessDropDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainStmt(Node *node, const char *queryString);
extern List * PreprocessDomainRenameConstraintStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterDomainOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessRenameDomainStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterDomainSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterDomainSchemaStmt(Node *node, const char *queryString);
extern ObjectAddress CreateDomainStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress CreateDomainStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterDomainStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterDomainStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress DomainRenameConstraintStmtObjectAddress(Node *node, extern ObjectAddress DomainRenameConstraintStmtObjectAddress(Node *node,
@ -266,23 +244,9 @@ extern Oid GetReferencingTableId(Oid foreignKeyId);
extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId); extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
/* foreign_server.c - forward declarations */ /* foreign_server.c - forward declarations */
extern List * PreprocessCreateForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString);
extern List * PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString);
extern ObjectAddress CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress CreateForeignServerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool extern ObjectAddress AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
missing_ok); missing_ok);
extern List * GetForeignServerCreateDDLCommand(Oid serverId); extern List * GetForeignServerCreateDDLCommand(Oid serverId);
@ -307,24 +271,12 @@ extern List * PreprocessAlterFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern ObjectAddress AlterFunctionStmtObjectAddress(Node *stmt, extern ObjectAddress AlterFunctionStmtObjectAddress(Node *stmt,
bool missing_ok); bool missing_ok);
extern List * PreprocessRenameFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern ObjectAddress RenameFunctionStmtObjectAddress(Node *stmt, extern ObjectAddress RenameFunctionStmtObjectAddress(Node *stmt,
bool missing_ok); bool missing_ok);
extern List * PreprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterFunctionOwnerStmt(Node *stmt, const char *queryString);
extern ObjectAddress AlterFunctionOwnerObjectAddress(Node *stmt, extern ObjectAddress AlterFunctionOwnerObjectAddress(Node *stmt,
bool missing_ok); bool missing_ok);
extern List * PreprocessAlterFunctionSchemaStmt(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern ObjectAddress AlterFunctionSchemaStmtObjectAddress(Node *stmt, extern ObjectAddress AlterFunctionSchemaStmtObjectAddress(Node *stmt,
bool missing_ok); bool missing_ok);
extern List * PostprocessAlterFunctionSchemaStmt(Node *stmt,
const char *queryString);
extern List * PreprocessDropFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterFunctionDependsStmt(Node *stmt, extern List * PreprocessAlterFunctionDependsStmt(Node *stmt,
const char *queryString, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
@ -416,8 +368,6 @@ extern List * PreprocessAlterObjectSchemaStmt(Node *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand); const char *alterObjectSchemaCommand);
extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString, extern List * PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern ObjectAddress CreateSchemaStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress CreateSchemaStmtObjectAddress(Node *node, bool missing_ok);
extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok); extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing_ok);
@ -508,70 +458,10 @@ extern bool ConstrTypeUsesIndex(ConstrType constrType);
/* text_search.c - forward declarations */ /* text_search.c - forward declarations */
extern List * PostprocessCreateTextSearchConfigurationStmt(Node *node,
const char *queryString);
extern List * PostprocessCreateTextSearchDictionaryStmt(Node *node,
const char *queryString);
extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address); extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address);
extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address); extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address);
extern List * CreateTextSearchConfigDDLCommandsIdempotent(const ObjectAddress *address); extern List * CreateTextSearchConfigDDLCommandsIdempotent(const ObjectAddress *address);
extern List * CreateTextSearchDictDDLCommandsIdempotent(const ObjectAddress *address); extern List * CreateTextSearchDictDDLCommandsIdempotent(const ObjectAddress *address);
extern List * PreprocessDropTextSearchConfigurationStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropTextSearchDictionaryStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchConfigurationStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchDictionaryStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessRenameTextSearchConfigurationStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessRenameTextSearchDictionaryStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchConfigurationSchemaStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchDictionarySchemaStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterTextSearchConfigurationSchemaStmt(Node *node,
const char *queryString);
extern List * PostprocessAlterTextSearchDictionarySchemaStmt(Node *node,
const char *queryString);
extern List * PreprocessTextSearchConfigurationCommentStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessTextSearchDictionaryCommentStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchConfigurationOwnerStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTextSearchDictionaryOwnerStmt(Node *node,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterTextSearchConfigurationOwnerStmt(Node *node,
const char *queryString);
extern List * PostprocessAlterTextSearchDictionaryOwnerStmt(Node *node,
const char *queryString);
extern ObjectAddress CreateTextSearchConfigurationObjectAddress(Node *node, extern ObjectAddress CreateTextSearchConfigurationObjectAddress(Node *node,
bool missing_ok); bool missing_ok);
extern ObjectAddress CreateTextSearchDictObjectAddress(Node *node, extern ObjectAddress CreateTextSearchDictObjectAddress(Node *node,
@ -604,28 +494,9 @@ extern List * get_ts_config_namelist(Oid tsconfigOid);
extern void PreprocessTruncateStatement(TruncateStmt *truncateStatement); extern void PreprocessTruncateStatement(TruncateStmt *truncateStatement);
/* type.c - forward declarations */ /* type.c - forward declarations */
extern List * PreprocessCompositeTypeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCompositeTypeStmt(Node *stmt, const char *queryString);
extern List * PreprocessAlterTypeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessCreateEnumStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessCreateEnumStmt(Node *stmt, const char *queryString);
extern List * PreprocessAlterEnumStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessDropTypeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameTypeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessRenameTypeAttributeStmt(Node *stmt, const char *queryString, extern List * PreprocessRenameTypeAttributeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern List * PreprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterTypeOwnerStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * PostprocessAlterTypeSchemaStmt(Node *stmt, const char *queryString);
extern Node * CreateTypeStmtByObjectAddress(const ObjectAddress *address); extern Node * CreateTypeStmtByObjectAddress(const ObjectAddress *address);
extern ObjectAddress CompositeTypeStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress CompositeTypeStmtObjectAddress(Node *stmt, bool missing_ok);
extern ObjectAddress CreateEnumStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress CreateEnumStmtObjectAddress(Node *stmt, bool missing_ok);

View File

@ -250,6 +250,7 @@ extern TableConversionReturn * UndistributeTable(TableConversionParameters *para
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
extern bool ShouldPropagate(void); extern bool ShouldPropagate(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateObject(const ObjectAddress *address); extern bool ShouldPropagateObject(const ObjectAddress *address);

View File

@ -92,11 +92,8 @@ SELECT run_command_on_workers($$select r.rolname from pg_roles r join pg_class c
CREATE SERVER foreign_server_to_drop CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test'); OPTIONS (host 'test');
--should error
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
ERROR: cannot drop distributed server with other servers
HINT: Try dropping each object in a separate DROP command
DROP FOREIGN TABLE foreign_table; DROP FOREIGN TABLE foreign_table;
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
SELECT citus_remove_node('localhost', :master_port); SELECT citus_remove_node('localhost', :master_port);
citus_remove_node citus_remove_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -55,9 +55,8 @@ CREATE SERVER foreign_server_to_drop
FOREIGN DATA WRAPPER postgres_fdw FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'test'); OPTIONS (host 'test');
--should error
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
DROP FOREIGN TABLE foreign_table; DROP FOREIGN TABLE foreign_table;
DROP SERVER foreign_server_dependent_schema, foreign_server_to_drop;
SELECT citus_remove_node('localhost', :master_port); SELECT citus_remove_node('localhost', :master_port);
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;