diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 93a3d3bd9..d92c1914d 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -64,6 +64,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) List *dependencyCommands = GetDependencyCreateDDLCommands(dependency); ddlCommands = list_concat(ddlCommands, dependencyCommands); + // TODO: Might add check for tables /* create a new list with dependencies that actually created commands */ if (list_length(dependencyCommands) > 0) { @@ -237,19 +238,20 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) if (relKind == RELKIND_RELATION) { + Oid relationId = dependency->objectId; List *commandList = NIL; - List *tableDDLCommands = GetFullTableCreationCommands( - dependency->objectId, - WORKER_NEXTVAL_SEQUENCE_DEFAULTS); + List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS); TableDDLCommand *tableDDLCommand = NULL; foreach_ptr(tableDDLCommand, tableDDLCommands) { Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); - commandList = lappend(commandList, GetTableDDLCommand( - tableDDLCommand)); + commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); } + List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId); + commandList = list_concat(commandList, sequenceDependencyCommandList); + return commandList; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 18f223674..d6de1074a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -92,7 +92,6 @@ static char * LocalGroupIdUpdateCommand(int32 groupId); static char * TruncateTriggerCreateCommand(Oid relationId); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); -static List * DetachPartitionCommandList(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey); static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError); @@ -522,18 +521,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) * following queries: * * (i) Query that populates pg_dist_node table - * (ii) Queries that create the foreign keys and partitioning hierarchy - * (iii) Queries that populate pg_dist_partition table referenced by (ii) - * (iv) Queries that populate pg_dist_shard table referenced by (iii) - * (v) Queries that populate pg_dist_placement table referenced by (iv) - * (vi) Queries that populate pg_dist_object table */ List * MetadataCreateCommands(void) { List *metadataSnapshotCommandList = NIL; - List *distributedTableList = CitusTableList(); - List *propagatedTableList = NIL; bool includeNodesFromOtherClusters = true; List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); @@ -545,49 +537,6 @@ MetadataCreateCommands(void) metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, nodeListInsertCommand); - /* create the list of tables whose metadata will be created */ - CitusTableCacheEntry *cacheEntry = NULL; - foreach_ptr(cacheEntry, distributedTableList) - { - if (ShouldSyncTableMetadata(cacheEntry->relationId)) - { - propagatedTableList = lappend(propagatedTableList, cacheEntry); - } - } - - /* create the metadata */ - foreach_ptr(cacheEntry, propagatedTableList) - { - Oid clusteredTableId = cacheEntry->relationId; - - /* add the table metadata command first*/ - char *metadataCommand = DistributionCreateCommand(cacheEntry); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - metadataCommand); - - /* add the truncate trigger command after the table became distributed */ - char *truncateTriggerCreateCommand = - TruncateTriggerCreateCommand(cacheEntry->relationId); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - truncateTriggerCreateCommand); - - /* add the pg_dist_shard{,placement} entries */ - List *shardIntervalList = LoadShardIntervalList(clusteredTableId); - List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList); - - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - shardCreateCommandList); - } - - /* As the last step, propagate the pg_dist_object entities */ - if (ShouldPropagate()) - { - List *distributedObjectSyncCommandList = - DistributedObjectMetadataSyncCommandList(); - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - distributedObjectSyncCommandList); - } - return metadataSnapshotCommandList; } @@ -739,33 +688,17 @@ GetDistributedTableDDLEvents(Oid relationId) /* * MetadataDropCommands returns list of queries that are required to - * drop all the metadata of the node that are related to clustered tables. + * drop all the metadata of the node that are not related to clustered tables. * The drop metadata snapshot commands includes the following queries: * - * (i) Query to disable DDL propagation (necessary for (ii) - * (ii) Queries that DETACH all partitions of distributed tables - * (iii) Queries that delete all the rows from pg_dist_node table - * (iv) Queries that drop the clustered tables and remove its references from - * the pg_dist_partition. Note that distributed relation ids are gathered - * from the worker itself to prevent dropping any non-distributed tables - * with the same name. - * (v) Queries that delete all the rows from pg_dist_shard table referenced by (iv) - * (vi) Queries that delete all the rows from pg_dist_placement table - * referenced by (v) - * (vii) Queries that delete all the rows from pg_dist_object table + * (i) Queries that delete all the rows from pg_dist_node table */ List * MetadataDropCommands(void) { List *dropSnapshotCommandList = NIL; - List *detachPartitionCommandList = DetachPartitionCommandList(); - - dropSnapshotCommandList = list_concat(dropSnapshotCommandList, - detachPartitionCommandList); dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES); - dropSnapshotCommandList = lappend(dropSnapshotCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); return dropSnapshotCommandList; } @@ -1984,55 +1917,6 @@ CreateTableMetadataOnWorkers(Oid relationId) } -/* - * DetachPartitionCommandList returns list of DETACH commands to detach partitions - * of all distributed tables. This function is used for detaching partitions in MX - * workers before DROPping distributed partitioned tables in them. Thus, we are - * disabling DDL propagation to the beginning of the commands (we are also enabling - * DDL propagation at the end of command list to swtich back to original state). As - * an extra step, if there are no partitions to DETACH, this function simply returns - * empty list to not disable/enable DDL propagation for nothing. - */ -static List * -DetachPartitionCommandList(void) -{ - List *detachPartitionCommandList = NIL; - List *distributedTableList = CitusTableList(); - - /* we iterate over all distributed partitioned tables and DETACH their partitions */ - CitusTableCacheEntry *cacheEntry = NULL; - foreach_ptr(cacheEntry, distributedTableList) - { - if (!PartitionedTable(cacheEntry->relationId)) - { - continue; - } - - List *partitionList = PartitionList(cacheEntry->relationId); - List *detachCommands = - GenerateDetachPartitionCommandRelationIdList(partitionList); - detachPartitionCommandList = list_concat(detachPartitionCommandList, - detachCommands); - } - - if (list_length(detachPartitionCommandList) == 0) - { - return NIL; - } - - detachPartitionCommandList = - lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList); - - /* - * We probably do not need this but as an extra precaution, we are enabling - * DDL propagation to switch back to original state. - */ - detachPartitionCommandList = lappend(detachPartitionCommandList, - ENABLE_DDL_PROPAGATION); - - return detachPartitionCommandList; -} - /* * SyncMetadataToNodes tries recreating the metadata snapshot in the diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 307e7dcbb..aee8e22d9 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -91,6 +91,7 @@ typedef struct NodeMetadata } NodeMetadata; /* local function forward declarations */ +static List * DetachPartitionCommandList(void); static int ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode); @@ -107,6 +108,8 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void SetUpSequenceDependencies(WorkerNode *workerNode); +static void SetUpObjectMetadata(WorkerNode *workerNode); +static void ClearDistributedTablesOnNode(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -678,17 +681,100 @@ SetUpSequenceDependencies(WorkerNode *workerNode) List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); sequenceCommandList = list_concat(sequenceCommandList, sequenceDependencyCommandList); + } - /* prevent recursive propagation */ - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + /* prevent recursive propagation */ + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); - /* send the commands one by one */ - const char *command = NULL; - foreach_ptr(command, sequenceCommandList) + char *currentUser = CurrentUserName(); + SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + sequenceCommandList); +} + + +static void +SetUpObjectMetadata(WorkerNode *workerNode) +{ + List *distributedTableList = CitusTableList(); + List *propagatedTableList = NIL; + List *metadataSnapshotCommandList = NIL; + + /* create the list of tables whose metadata will be created */ + CitusTableCacheEntry *cacheEntry = NULL; + foreach_ptr(cacheEntry, distributedTableList) + { + if (ShouldSyncTableMetadata(cacheEntry->relationId)) { - SendCommandToWorkersWithMetadata(command); + propagatedTableList = lappend(propagatedTableList, cacheEntry); } } + + /* after all tables are created, create the metadata */ + foreach_ptr(cacheEntry, propagatedTableList) + { + Oid clusteredTableId = cacheEntry->relationId; + + /* add the table metadata command first*/ + char *metadataCommand = DistributionCreateCommand(cacheEntry); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + metadataCommand); + + /* add the truncate trigger command after the table became distributed */ + char *truncateTriggerCreateCommand = + TruncateTriggerCreateCommand(cacheEntry->relationId); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + truncateTriggerCreateCommand); + + /* add the pg_dist_shard{,placement} entries */ + List *shardIntervalList = LoadShardIntervalList(clusteredTableId); + List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList); + + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + shardCreateCommandList); + } + + /* As the last step, propagate the pg_dist_object entities */ + if (ShouldPropagate()) + { + List *distributedObjectSyncCommandList = + DistributedObjectMetadataSyncCommandList(); + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + distributedObjectSyncCommandList); + } + + List *metadataSnapshotCommands = list_make2(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList); + + char *currentUser = CurrentUserName(); + SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + metadataSnapshotCommands); +} + +static void +ClearDistributedTablesOnNode(WorkerNode *workerNode) +{ + List *clearDistTableInfoCommandList = NIL; + List *detachPartitionCommandList = DetachPartitionCommandList(); + + clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, + detachPartitionCommandList); + + clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, + REMOVE_ALL_CLUSTERED_TABLES_COMMAND); + + clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); + + List *clearDistTableCommands = list_make2(DISABLE_DDL_PROPAGATION, + clearDistTableInfoCommandList); + + char *currentUser = CurrentUserName(); + SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + clearDistTableCommands); } @@ -1013,9 +1099,11 @@ ActivateNode(char *nodeName, int nodePort) BoolGetDatum(isActive)); } + ClearDistributedTablesOnNode(workerNode); SetUpDistributedTableWithDependencies(workerNode); SetUpSequenceDependencies(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode); + SetUpObjectMetadata(workerNode); if (syncMetadata) { @@ -1030,6 +1118,58 @@ ActivateNode(char *nodeName, int nodePort) } + + +/* + * DetachPartitionCommandList returns list of DETACH commands to detach partitions + * of all distributed tables. This function is used for detaching partitions in MX + * workers before DROPping distributed partitioned tables in them. Thus, we are + * disabling DDL propagation to the beginning of the commands (we are also enabling + * DDL propagation at the end of command list to swtich back to original state). As + * an extra step, if there are no partitions to DETACH, this function simply returns + * empty list to not disable/enable DDL propagation for nothing. + */ +static List * +DetachPartitionCommandList(void) +{ + List *detachPartitionCommandList = NIL; + List *distributedTableList = CitusTableList(); + + /* we iterate over all distributed partitioned tables and DETACH their partitions */ + CitusTableCacheEntry *cacheEntry = NULL; + foreach_ptr(cacheEntry, distributedTableList) + { + if (!PartitionedTable(cacheEntry->relationId)) + { + continue; + } + + List *partitionList = PartitionList(cacheEntry->relationId); + List *detachCommands = + GenerateDetachPartitionCommandRelationIdList(partitionList); + detachPartitionCommandList = list_concat(detachPartitionCommandList, + detachCommands); + } + + if (list_length(detachPartitionCommandList) == 0) + { + return NIL; + } + + detachPartitionCommandList = + lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList); + + /* + * We probably do not need this but as an extra precaution, we are enabling + * DDL propagation to switch back to original state. + */ + detachPartitionCommandList = lappend(detachPartitionCommandList, + ENABLE_DDL_PROPAGATION); + + return detachPartitionCommandList; +} + + /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who