diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index d64b9e127..3ff650aa0 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -532,10 +532,11 @@ GetAllDependencyCreateDDLCommands(const List *dependencies) * clusterHasDistributedFunction if there are any distributed functions. */ void -ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort, +ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, List **ddlCommands) { /* since we are executing ddl commands disable propagation first, primarily for mx */ + if (ddlCommands != NULL) *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); /* @@ -559,8 +560,8 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort, */ if (list_length(dependencies) > 100) { - ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName, - nodePort), + ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", "lll", + 5555), errdetail("There are %d objects to replicate, depending on your " "environment this might take a while", list_length(dependencies)))); @@ -579,10 +580,18 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort, continue; } + List *perObjCommands = GetDependencyCreateDDLCommands(dependency); + if (ddlCommands != NULL) *ddlCommands = list_concat(*ddlCommands, - GetDependencyCreateDDLCommands(dependency)); + perObjCommands); + + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), perObjCommands); + } } + if (ddlCommands != NULL) *ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e9883359f..1ef6fa364 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -672,7 +672,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) List *detachPartitionCommandList = NIL; - DetachPartitionCommandList(&detachPartitionCommandList); + DetachPartitionCommandList(NIL, &detachPartitionCommandList); dropMetadataCommandList = list_concat(dropMetadataCommandList, detachPartitionCommandList); @@ -2667,40 +2667,65 @@ CreateTableMetadataOnWorkers(Oid relationId) * empty list to not disable/enable DDL propagation for nothing. */ void -DetachPartitionCommandList(List **detachPartitionCommandList) +DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPartitionCommandList) { - List *distributedTableList = CitusTableList(); + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); + + bool foundAnyPartitionedTable = false; /* we iterate over all distributed partitioned tables and DETACH their partitions */ - CitusTableCacheEntry *cacheEntry = NULL; - foreach_ptr(cacheEntry, distributedTableList) + Oid relationId = InvalidOid; + foreach_oid(relationId, citusTableIdList) { - if (!PartitionedTable(cacheEntry->relationId)) + if (!PartitionedTable(relationId)) { continue; } - List *partitionList = PartitionList(cacheEntry->relationId); - List *detachCommands = - GenerateDetachPartitionCommandRelationIdList(partitionList); - *detachPartitionCommandList = list_concat(*detachPartitionCommandList, - detachCommands); + List *partitionList = PartitionList(relationId); + + Oid partitionRelOid = InvalidOid; + foreach_oid(partitionRelOid, partitionList) + { + foundAnyPartitionedTable = true; + + Assert(PartitionTable(partitionRelOid)); + char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid); + + + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(detachCommand)); + } + + if (detachPartitionCommandList != NULL) + { + *detachPartitionCommandList = + lappend(*detachPartitionCommandList, detachCommand); + } + else + pfree(detachCommand); + + } } - if (list_length(*detachPartitionCommandList) == 0) + if (!foundAnyPartitionedTable) { return; } - *detachPartitionCommandList = - lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList); + if (detachPartitionCommandList != NULL) + { + *detachPartitionCommandList = + lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList); - /* - * We probably do not need this but as an extra precaution, we are enabling - * DDL propagation to switch back to original state. - */ - *detachPartitionCommandList = lappend(*detachPartitionCommandList, - ENABLE_DDL_PROPAGATION); + /* + * We probably do not need this but as an extra precaution, we are enabling + * DDL propagation to switch back to original state. + */ + *detachPartitionCommandList = lappend(*detachPartitionCommandList, + ENABLE_DDL_PROPAGATION); + } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 74387591d..0c4a2da98 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -102,11 +102,13 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada static void DeleteNodeRow(char *nodename, int32 nodeport); static void SyncDistributedObjectsToNodeList(List *workerNodeList); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); -static void SyncPgDistTableMetadataToNodeList(List *nodeList); -static void InterTableRelationshipCommandList(List **ddlCommandList); +static void SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections); +static void InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, + List **ddlCommandList); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static void PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList); +static void PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, + List **nodeWideObjectCommandList); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); @@ -653,35 +655,39 @@ master_set_node_property(PG_FUNCTION_ARGS) * for each citus table. */ static void -InterTableRelationshipCommandList(List **multipleTableIntegrationCommandList) +InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, + List **multipleTableIntegrationCommandList) { - List *distributedTableList = CitusTableList(); + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *propagatedTableList = NIL; - CitusTableCacheEntry *cacheEntry = NULL; - foreach_ptr(cacheEntry, distributedTableList) + Oid relationId = InvalidOid; + foreach_oid(relationId, citusTableIdList) { /* * Skip foreign key and partition creation when we shouldn't need to sync * tablem metadata or the Citus table is owned by an extension. */ - if (ShouldSyncTableMetadata(cacheEntry->relationId) && - !IsTableOwnedByExtension(cacheEntry->relationId)) + if (ShouldSyncTableMetadataViaCatalog(relationId) && + !IsTableOwnedByExtension(relationId)) { - propagatedTableList = lappend(propagatedTableList, cacheEntry); + propagatedTableList = lappend_oid(propagatedTableList, relationId); } } - foreach_ptr(cacheEntry, propagatedTableList) + foreach_oid(relationId, propagatedTableList) { - Oid relationId = cacheEntry->relationId; - List *commandListForRelation = InterTableRelationshipOfRelationCommandList(relationId); *multipleTableIntegrationCommandList = list_concat( *multipleTableIntegrationCommandList, commandListForRelation); + + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), commandListForRelation); + } } *multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, @@ -696,7 +702,7 @@ InterTableRelationshipCommandList(List **multipleTableIntegrationCommandList) * (except pg_dist_node) metadata. We call them as table metadata. */ void -PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList) +PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **metadataSnapshotCommandList) { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -723,12 +729,19 @@ PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList) *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, DELETE_ALL_COLOCATION); + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), *metadataSnapshotCommandList); + } + /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ foreach_ptr(cacheEntry, propagatedTableList) { List *tableMetadataCreateCommandList = CitusTableMetadataCreateCommandList(cacheEntry->relationId); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), tableMetadataCreateCommandList); + *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList, tableMetadataCreateCommandList); } @@ -755,8 +768,10 @@ PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList) * generally not linked to any distributed object but change system wide behaviour. */ static void -PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList) +PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, + List **nodeWideObjectCommandList) { + bool hasObjects = false; if (EnableAlterRoleSetPropagation) { /* @@ -764,17 +779,23 @@ PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList) * linked to any role that can be distributed we need to distribute them seperately */ List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); - *nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList, - alterRoleSetCommands); - } - if (list_length(*nodeWideObjectCommandList) > 0) - { - /* if there are command wrap them in enable_ddl_propagation off */ - *nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION, - *nodeWideObjectCommandList); - *nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList, - ENABLE_DDL_PROPAGATION); + if (nodeWideObjectCommandList != NULL) + { + *nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList, + alterRoleSetCommands); + + /* if there are command wrap them in enable_ddl_propagation off */ + *nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION, + *nodeWideObjectCommandList); + *nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList, + ENABLE_DDL_PROPAGATION); + } + + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), alterRoleSetCommands); + } } } @@ -794,37 +815,40 @@ PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList) * We also update the local group id here, as handling sequence dependencies * requires it. */ -List * -SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList) +void +SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **commandList) { /* * Propagate node wide objects. It includes only roles for now. */ - List *nodeWideObjectCommandList = NIL; - PropagateNodeWideObjectsCommandList(&nodeWideObjectCommandList); - - *commandList = list_concat(*commandList, nodeWideObjectCommandList); + PropagateNodeWideObjectsCommandList(nodeToSyncMetadataConnections, commandList); /* * Detach partitions, break dependencies between sequences and table then * remove shell tables first. */ - List *detachPartitionCommandList = NIL; - - DetachPartitionCommandList(&detachPartitionCommandList); - *commandList = list_concat(*commandList, detachPartitionCommandList); + DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList); - *commandList = lappend(*commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); - *commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND); + //if (commandList != NULL) + { + + + if (list_length(nodeToSyncMetadataConnections) != 0) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND)); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(REMOVE_ALL_SHELL_TABLES_COMMAND)); + } + } /* * Replicate all objects of the pg_dist_object to the remote node. */ List *replicateAllObjectsToNodeCommandList = NIL; - ReplicateAllObjectsToNodeCommandList(workerNode->workerName, workerNode->workerPort, + ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections, &replicateAllObjectsToNodeCommandList); + if (commandList != NULL) *commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList); /* @@ -832,12 +856,10 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList) * those tables. */ List *interTableRelationshipCommandList = NIL; - InterTableRelationshipCommandList(&interTableRelationshipCommandList); + InterTableRelationshipCommandList(nodeToSyncMetadataConnections, &interTableRelationshipCommandList); + if (commandList != NULL) *commandList = list_concat(*commandList, interTableRelationshipCommandList); - - - return *commandList; } @@ -851,28 +873,10 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList) * since all the dependencies should be present in the coordinator already. */ static void -SyncDistributedObjectsToNodeList(List *workerNodeList) +SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections) { - List *workerNodesToSync = NIL; - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - if (NodeIsCoordinator(workerNode)) - { - /* coordinator has all the objects */ - continue; - } - if (!NodeIsPrimary(workerNode)) - { - /* secondary nodes gets the objects from their primaries via replication */ - continue; - } - - workerNodesToSync = lappend(workerNodesToSync, workerNode); - } - - if (workerNodesToSync == NIL) + if (nodeToSyncMetadataConnections == NIL) { return; } @@ -881,16 +885,10 @@ SyncDistributedObjectsToNodeList(List *workerNodeList) Assert(ShouldPropagate()); - List *commandList = NIL; - - SyncDistributedObjectsCommandList(workerNode, &commandList); - /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - workerNodesToSync, - CurrentUserName(), - commandList); + + SyncDistributedObjectsCommandList(nodeToSyncMetadataConnections, NULL); } @@ -920,33 +918,13 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) * */ static void -SyncPgDistTableMetadataToNodeList(List *nodeList) +SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections) { /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - List *nodesWithMetadata = NIL; - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) - { - if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) - { - nodesWithMetadata = lappend(nodesWithMetadata, workerNode); - } - } - - if (nodesWithMetadata == NIL) - { - return; - } - List *syncPgDistMetadataCommandList = NIL; - PgDistTableMetadataSyncCommandList(&syncPgDistMetadataCommandList); - - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - nodesWithMetadata, - CurrentUserName(), - syncPgDistMetadataCommandList); + PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, &syncPgDistMetadataCommandList); } @@ -1168,6 +1146,8 @@ ActivateNodeList(List *nodeList) List *nodeToSyncMetadata = NIL; + List *nodeToSyncMetadataConnections = NIL; + WorkerNode *node = NULL; foreach_ptr(node, nodeList) { @@ -1207,7 +1187,7 @@ ActivateNodeList(List *nodeList) BoolGetDatum(true)); /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ - bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); + bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode); if (syncMetadata) { /* @@ -1224,6 +1204,17 @@ ActivateNodeList(List *nodeList) UpdateLocalGroupIdOnNode(workerNode); nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); + + + MultiConnection *connection = + GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName, node->workerPort); + + ClaimConnectionExclusively(connection); + nodeToSyncMetadataConnections = + lappend(nodeToSyncMetadataConnections, connection); + + SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(DISABLE_DDL_PROPAGATION)); + } } @@ -1232,7 +1223,7 @@ ActivateNodeList(List *nodeList) * replicating reference tables to the remote node, as reference tables may * need such objects. */ - SyncDistributedObjectsToNodeList(nodeToSyncMetadata); + SyncDistributedObjectsToNodeList(nodeToSyncMetadataConnections); /* * Sync node metadata. We must sync node metadata before syncing table @@ -1249,7 +1240,7 @@ ActivateNodeList(List *nodeList) * We must handle it as the last step because of limitations shared with * above comments. */ - SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); + SyncPgDistTableMetadataToNodeList(nodeToSyncMetadataConnections); foreach_ptr(node, nodeList) { diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 663312b10..8d5ab9bde 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -53,7 +53,7 @@ activate_node_snapshot(PG_FUNCTION_ARGS) list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); List *syncDistObjCommands = NIL; - SyncDistributedObjectsCommandList(dummyWorkerNode, &syncDistObjCommands); + SyncDistributedObjectsCommandList(NIL, &syncDistObjCommands); List *dropSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index aa6dd46ee..38e733185 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -90,7 +90,8 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); -extern void DetachPartitionCommandList(List **detachPartitionCommandList); +extern void DetachPartitionCommandList(List *nodeToSyncMetadataConnections, + List **detachPartitionCommandList); extern void SyncNodeMetadataToNodes(void); extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner); extern void SyncNodeMetadataToNodesMain(Datum main_arg); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index c3b42268b..cdc7de79b 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -343,7 +343,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateAnyObject(List *addresses); -extern void ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort, +extern void ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, List **ddlCommands); /* Remaining metadata utility functions */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 9305963eb..e79179044 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -105,9 +105,10 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); -extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode, +extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **commandList); -extern void PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList); +extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, + List **metadataSnapshotCommandList); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);