diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index d92c1914d..628f18802 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -239,20 +239,25 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) if (relKind == RELKIND_RELATION) { Oid relationId = dependency->objectId; - List *commandList = NIL; - List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS); - - TableDDLCommand *tableDDLCommand = NULL; - foreach_ptr(tableDDLCommand, tableDDLCommands) + if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId)) { - Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); - commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); + /* skip table metadata creation when the Citus table is owned by an extension */ + List *commandList = NIL; + List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS); + + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, tableDDLCommands) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); + } + + // TODO: May need to move sequence dependencies to ActiveNode directly + List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId); + commandList = list_concat(commandList, sequenceDependencyCommandList); + + return commandList; } - - List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId); - commandList = list_concat(commandList, sequenceDependencyCommandList); - - return commandList; } if (relKind == RELKIND_SEQUENCE) @@ -387,10 +392,9 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) } /* since we are executing ddl commands lets disable propagation, primarily for mx */ - ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); + ddlCommands = list_make3(DISABLE_DDL_PROPAGATION, ddlCommands, ENABLE_DDL_PROPAGATION); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); } diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 5dbefbc7e..23981a747 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -676,7 +676,7 @@ SupportedDependencyByCitus(const ObjectAddress *address) * for tables. */ if (relKind == RELKIND_COMPOSITE_TYPE || - relKind == RELKIND_RELATION || + (relKind == RELKIND_RELATION && IsCitusTable(address->objectId)) || relKind == RELKIND_SEQUENCE) { return true; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index d6de1074a..f6c16dd22 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,13 +83,10 @@ char *EnableManualMetadataChangesForUser = ""; -static void EnsureSequentialModeMetadataOperations(void); -static List * DistributedObjectMetadataSyncCommandList(void); static List * GetDistributedTableDDLEvents(Oid relationId); static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); static char * LocalGroupIdUpdateCommand(int32 groupId); -static char * TruncateTriggerCreateCommand(Oid relationId); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, @@ -254,7 +251,7 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) * visible on all connections used by the transaction, meaning we can only use 1 * connection per node. */ -static void +void EnsureSequentialModeMetadataOperations(void) { if (!IsTransactionBlock()) @@ -541,95 +538,6 @@ MetadataCreateCommands(void) } -/* - * DistributedObjectMetadataSyncCommandList returns the necessary commands to create - * pg_dist_object entries on the new node. - */ -static List * -DistributedObjectMetadataSyncCommandList(void) -{ - HeapTuple pgDistObjectTup = NULL; - Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock); - Relation pgDistObjectIndexRel = index_open(DistObjectPrimaryKeyIndexId(), - AccessShareLock); - TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel); - - List *objectAddressList = NIL; - List *distArgumentIndexList = NIL; - List *colocationIdList = NIL; - - /* It is not strictly necessary to read the tuples in order. - * However, it is useful to get consistent behavior, both for regression - * tests and also in production systems. - */ - SysScanDesc pgDistObjectScan = systable_beginscan_ordered(pgDistObjectRel, - pgDistObjectIndexRel, NULL, - 0, NULL); - while (HeapTupleIsValid(pgDistObjectTup = systable_getnext_ordered(pgDistObjectScan, - ForwardScanDirection))) - { - Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT( - pgDistObjectTup); - - ObjectAddress *address = palloc(sizeof(ObjectAddress)); - - ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid, - pg_dist_object->objsubid); - - bool distributionArgumentIndexIsNull = false; - Datum distributionArgumentIndexDatum = - heap_getattr(pgDistObjectTup, - Anum_pg_dist_object_distribution_argument_index, - pgDistObjectDesc, - &distributionArgumentIndexIsNull); - int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum); - - bool colocationIdIsNull = false; - Datum colocationIdDatum = - heap_getattr(pgDistObjectTup, - Anum_pg_dist_object_colocationid, - pgDistObjectDesc, - &colocationIdIsNull); - int32 colocationId = DatumGetInt32(colocationIdDatum); - - objectAddressList = lappend(objectAddressList, address); - - if (distributionArgumentIndexIsNull) - { - distArgumentIndexList = lappend_int(distArgumentIndexList, - INVALID_DISTRIBUTION_ARGUMENT_INDEX); - } - else - { - distArgumentIndexList = lappend_int(distArgumentIndexList, - distributionArgumentIndex); - } - - if (colocationIdIsNull) - { - colocationIdList = lappend_int(colocationIdList, - INVALID_COLOCATION_ID); - } - else - { - colocationIdList = lappend_int(colocationIdList, colocationId); - } - } - - systable_endscan_ordered(pgDistObjectScan); - index_close(pgDistObjectIndexRel, AccessShareLock); - relation_close(pgDistObjectRel, NoLock); - - char *workerMetadataUpdateCommand = - MarkObjectsDistributedCreateCommand(objectAddressList, - distArgumentIndexList, - colocationIdList); - List *commandList = list_make1(workerMetadataUpdateCommand); - - return commandList; -} - - /* * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to * create the given distributed table on a worker. The list includes setting up any @@ -1793,7 +1701,7 @@ GenerateSetRoleQuery(Oid roleOid) * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger * function, which creates the truncate trigger on the worker. */ -static char * +char * TruncateTriggerCreateCommand(Oid relationId) { StringInfo triggerCreateCommand = makeStringInfo(); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index aee8e22d9..5fd10a98b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -43,6 +43,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/shared_connection_stats.h" #include "distributed/string_utils.h" +#include "distributed/metadata/pg_dist_object.h" #include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" @@ -91,6 +92,7 @@ typedef struct NodeMetadata } NodeMetadata; /* local function forward declarations */ +static List * DistributedObjectMetadataSyncCommandList(void); static List * DetachPartitionCommandList(void); static int ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); @@ -107,9 +109,8 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SetUpSequenceDependencies(WorkerNode *workerNode); static void SetUpObjectMetadata(WorkerNode *workerNode); -static void ClearDistributedTablesOnNode(WorkerNode *workerNode); +static void ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -582,6 +583,15 @@ master_set_node_property(PG_FUNCTION_ARGS) } +/* + * SetUpMultipleDistributedTableIntegrations set up the multiple integrations + * including + * + * (i) Foreign keys + * (ii) Partionining hierarchy + * + * on the given worker node. + */ static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) { @@ -639,61 +649,19 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) } } - /* prevent recursive propagation */ - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); - - /* send the commands one by one */ - const char *command = NULL; - foreach_ptr(command, multipleTableIntegrationCommandList) - { - SendCommandToWorkersWithMetadata(command); - } -} - - -static void -SetUpSequenceDependencies(WorkerNode *workerNode) -{ - List *distributedTableList = CitusTableList(); - List *propagatedTableList = NIL; - List *sequenceCommandList = NIL; - - CitusTableCacheEntry *cacheEntry = NULL; - foreach_ptr(cacheEntry, distributedTableList) - { - if (ShouldSyncTableMetadata(cacheEntry->relationId)) - { - propagatedTableList = lappend(propagatedTableList, cacheEntry); - } - } - - /* create the metadata */ - foreach_ptr(cacheEntry, propagatedTableList) - { - Oid relationId = cacheEntry->relationId; - - if (IsTableOwnedByExtension(relationId)) - { - /* skip table metadata creation when the Citus table is owned by an extension */ - continue; - } - - List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); - sequenceCommandList = list_concat(sequenceCommandList, - sequenceDependencyCommandList); - } - - /* prevent recursive propagation */ - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); - char *currentUser = CurrentUserName(); + List *commandList = list_make3(DISABLE_DDL_PROPAGATION, multipleTableIntegrationCommandList, ENABLE_DDL_PROPAGATION); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, workerNode->workerPort, currentUser, - sequenceCommandList); + commandList); } +/* + * SetUpObjectMetadata sets up the metadata depending on the distributed object + * on the given node. + */ static void SetUpObjectMetadata(WorkerNode *workerNode) { @@ -744,7 +712,7 @@ SetUpObjectMetadata(WorkerNode *workerNode) distributedObjectSyncCommandList); } - List *metadataSnapshotCommands = list_make2(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList); + List *metadataSnapshotCommands = list_make3(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList, ENABLE_DDL_PROPAGATION); char *currentUser = CurrentUserName(); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, @@ -753,8 +721,102 @@ SetUpObjectMetadata(WorkerNode *workerNode) metadataSnapshotCommands); } + +/* + * DistributedObjectMetadataSyncCommandList returns the necessary commands to create + * pg_dist_object entries on the new node. + */ +static List * +DistributedObjectMetadataSyncCommandList(void) +{ + HeapTuple pgDistObjectTup = NULL; + Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock); + Relation pgDistObjectIndexRel = index_open(DistObjectPrimaryKeyIndexId(), + AccessShareLock); + TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel); + + List *objectAddressList = NIL; + List *distArgumentIndexList = NIL; + List *colocationIdList = NIL; + + /* It is not strictly necessary to read the tuples in order. + * However, it is useful to get consistent behavior, both for regression + * tests and also in production systems. + */ + SysScanDesc pgDistObjectScan = systable_beginscan_ordered(pgDistObjectRel, + pgDistObjectIndexRel, NULL, + 0, NULL); + while (HeapTupleIsValid(pgDistObjectTup = systable_getnext_ordered(pgDistObjectScan, + ForwardScanDirection))) + { + Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT( + pgDistObjectTup); + + ObjectAddress *address = palloc(sizeof(ObjectAddress)); + + ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid, + pg_dist_object->objsubid); + + bool distributionArgumentIndexIsNull = false; + Datum distributionArgumentIndexDatum = + heap_getattr(pgDistObjectTup, + Anum_pg_dist_object_distribution_argument_index, + pgDistObjectDesc, + &distributionArgumentIndexIsNull); + int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum); + + bool colocationIdIsNull = false; + Datum colocationIdDatum = + heap_getattr(pgDistObjectTup, + Anum_pg_dist_object_colocationid, + pgDistObjectDesc, + &colocationIdIsNull); + int32 colocationId = DatumGetInt32(colocationIdDatum); + + objectAddressList = lappend(objectAddressList, address); + + if (distributionArgumentIndexIsNull) + { + distArgumentIndexList = lappend_int(distArgumentIndexList, + INVALID_DISTRIBUTION_ARGUMENT_INDEX); + } + else + { + distArgumentIndexList = lappend_int(distArgumentIndexList, + distributionArgumentIndex); + } + + if (colocationIdIsNull) + { + colocationIdList = lappend_int(colocationIdList, + INVALID_COLOCATION_ID); + } + else + { + colocationIdList = lappend_int(colocationIdList, colocationId); + } + } + + systable_endscan_ordered(pgDistObjectScan); + index_close(pgDistObjectIndexRel, AccessShareLock); + relation_close(pgDistObjectRel, NoLock); + + char *workerMetadataUpdateCommand = + MarkObjectsDistributedCreateCommand(objectAddressList, + distArgumentIndexList, + colocationIdList); + List *commandList = list_make1(workerMetadataUpdateCommand); + + return commandList; +} + + +/* + * ClearDistributedObjectsWithMetadataFromNode clears all the distributed objects and related + * metadata from the given worker node. + */ static void -ClearDistributedTablesOnNode(WorkerNode *workerNode) +ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode) { List *clearDistTableInfoCommandList = NIL; List *detachPartitionCommandList = DetachPartitionCommandList(); @@ -767,8 +829,9 @@ ClearDistributedTablesOnNode(WorkerNode *workerNode) clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); - List *clearDistTableCommands = list_make2(DISABLE_DDL_PROPAGATION, - clearDistTableInfoCommandList); + List *clearDistTableCommands = list_make3(DISABLE_DDL_PROPAGATION, + clearDistTableInfoCommandList, + ENABLE_DDL_PROPAGATION); char *currentUser = CurrentUserName(); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, @@ -781,7 +844,7 @@ ClearDistributedTablesOnNode(WorkerNode *workerNode) /* * SetUpDistributedTableWithDependencies sets up up the following on a node if it's * a primary node that currently stores data: - * - All dependencies (e.g., types, schemas) + * - All dependencies (e.g., types, schemas, sequences) * - All shell distributed table * - Reference tables, because they are needed to handle queries efficiently. * - Distributed functions @@ -857,14 +920,13 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) if (list_length(ddlCommands) > 0) { /* if there are command wrap them in enable_ddl_propagation off */ - ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); - ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); + ddlCommands = list_make3(DISABLE_DDL_PROPAGATION, ddlCommands, ENABLE_DDL_PROPAGATION); /* send commands to new workers*/ - SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CitusExtensionOwnerName(), + ddlCommands); } } @@ -1070,6 +1132,12 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + CheckCitusVersion(ERROR); + EnsureCoordinator(); + EnsureModificationsCanRun(); + + EnsureSequentialModeMetadataOperations(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -1099,9 +1167,10 @@ ActivateNode(char *nodeName, int nodePort) BoolGetDatum(isActive)); } - ClearDistributedTablesOnNode(workerNode); + UseCoordinatedTransaction(); + + ClearDistributedObjectsWithMetadataFromNode(workerNode); SetUpDistributedTableWithDependencies(workerNode); - SetUpSequenceDependencies(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode); SetUpObjectMetadata(workerNode); @@ -1118,8 +1187,6 @@ ActivateNode(char *nodeName, int nodePort) } - - /* * DetachPartitionCommandList returns list of DETACH commands to detach partitions * of all distributed tables. This function is used for detaching partitions in MX diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index bcefd46b4..efcec8a81 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -29,6 +29,7 @@ typedef enum /* Functions declarations for metadata syncing */ extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort); +extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); @@ -51,6 +52,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, int32 groupId); +extern char * TruncateTriggerCreateCommand(Oid relationId); extern void CreateShellTableOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);