From 77ef777d533e7888e5f7b496d826f0d85cc9b71c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 1 Dec 2022 15:41:45 +0100 Subject: [PATCH] sync objects without coordinated tx --- .../distributed/commands/dependencies.c | 42 +++++-- .../distributed/metadata/node_metadata.c | 115 +++++++++++------- src/backend/distributed/test/metadata_sync.c | 3 +- src/include/distributed/metadata_utility.h | 2 +- src/include/distributed/worker_manager.h | 2 +- 5 files changed, 105 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 87491a4f5..5a795cffb 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -531,11 +531,16 @@ GetAllDependencyCreateDDLCommands(const List *dependencies) * previously marked objects to a worker node. The function also sets * clusterHasDistributedFunction if there are any distributed functions. */ -List * -ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) +void +ReplicateAllObjectsToNodes(List *connectionList, List **commandList) { /* since we are executing ddl commands disable propagation first, primarily for mx */ - List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); + if (commandList != NULL) + { + /* caller requested the commands */ + *commandList = lappend(*commandList, DISABLE_DDL_PROPAGATION); + } /* * collect all dependencies in creation order and get their ddl commands @@ -556,10 +561,12 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) * 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too * low we can adjust this based on experience. */ - if (list_length(dependencies) > 100) + if (list_length(dependencies) > 100 && list_length(connectionList)) { - ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName, - nodePort), + MultiConnection *connection = (MultiConnection *) linitial(connectionList); + + ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", + connection->hostname, connection->port), errdetail("There are %d objects to replicate, depending on your " "environment this might take a while", list_length(dependencies)))); @@ -578,13 +585,26 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) continue; } - ddlCommands = list_concat(ddlCommands, - GetDependencyCreateDDLCommands(dependency)); + List *commandsForDep = GetDependencyCreateDDLCommands(dependency); + char *command = NULL; + foreach_ptr(command, commandsForDep) + { + ExecuteRemoteCommandInConnectionList(connectionList, command); + + if (commandList != NULL) + { + /* caller requested the commands */ + *commandList = lappend(*commandList, command); + } + } } - ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - - return ddlCommands; + ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); + if (commandList != NULL) + { + /* caller requested the commands */ + *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 085cf12aa..b03f8c949 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -103,10 +103,10 @@ static void DeleteNodeRow(char *nodename, int32 nodeport); static void SyncDistributedObjectsToNodeList(List *workerNodeList); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void SyncPgDistTableMetadataToNodeList(List *nodeList); -static List * InterTableRelationshipCommandList(); +static void BuildInterTableRelationships(List *connectionList, List **commandList); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static List * PropagateNodeWideObjectsCommandList(); +static void PropagateNodeWideObjects(List *connectionList, List **commandList); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void DropExistingMetadataInOutsideTransaction(List *nodeList); @@ -653,12 +653,11 @@ master_set_node_property(PG_FUNCTION_ARGS) * * for each citus table. */ -static List * -InterTableRelationshipCommandList() +static void +BuildInterTableRelationships(List *connectionList, List **commandList) { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; - List *multipleTableIntegrationCommandList = NIL; CitusTableCacheEntry *cacheEntry = NULL; foreach_ptr(cacheEntry, distributedTableList) @@ -674,6 +673,8 @@ InterTableRelationshipCommandList() } } + ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); + foreach_ptr(cacheEntry, propagatedTableList) { Oid relationId = cacheEntry->relationId; @@ -681,17 +682,27 @@ InterTableRelationshipCommandList() List *commandListForRelation = InterTableRelationshipOfRelationCommandList(relationId); - multipleTableIntegrationCommandList = list_concat( - multipleTableIntegrationCommandList, - commandListForRelation); + char *command = NULL; + foreach_ptr(command, commandListForRelation) + { + ExecuteRemoteCommandInConnectionList(connectionList, command); + + if (commandList != NULL) + { + /* caller requested the commands */ + *commandList = lappend(*commandList, command); + } + } } - multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, - multipleTableIntegrationCommandList); - multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList, - ENABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); - return multipleTableIntegrationCommandList; + if (commandList != NULL) + { + /* caller requested the commands */ + *commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); + *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + } } @@ -745,16 +756,15 @@ PgDistTableMetadataSyncCommandList(void) /* - * PropagateNodeWideObjectsCommandList is called during node activation to - * propagate any object that should be propagated for every node. These are - * generally not linked to any distributed object but change system wide behaviour. + * PropagateNodeWideObjects is called during node activation to + * propagate any object that should be propagated for every node. + * + * These are generally not linked to any distributed object but + * change system wide behaviour. */ -static List * -PropagateNodeWideObjectsCommandList() +static void +PropagateNodeWideObjects(List *connectionList, List **commandList) { - /* collect all commands */ - List *ddlCommands = NIL; - if (EnableAlterRoleSetPropagation) { /* @@ -762,17 +772,26 @@ PropagateNodeWideObjectsCommandList() * linked to any role that can be distributed we need to distribute them seperately */ List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); - ddlCommands = list_concat(ddlCommands, alterRoleSetCommands); - } - if (list_length(ddlCommands) > 0) - { - /* if there are command wrap them in enable_ddl_propagation off */ - ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); - ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - } + if (alterRoleSetCommands != NIL) + { + char *command = NULL; + foreach_ptr(command, alterRoleSetCommands) + { + ExecuteRemoteCommandInConnectionList(connectionList, command); + } - return ddlCommands; + /* the caller is interested in collecting the commands */ + if (commandList != NULL) + { + *commandList = list_concat(*commandList, alterRoleSetCommands); + + /* if there are command wrap them in enable_ddl_propagation off */ + *commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); + *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + } + } + } } @@ -791,29 +810,40 @@ PropagateNodeWideObjectsCommandList() * We also update the local group id here, as handling sequence dependencies * requires it. */ -List * -SyncDistributedObjectsCommandList(WorkerNode *workerNode) +void +SyncDistributedObjects(List *workerNodeList, List **commandList) { - List *commandList = NIL; + List *connectionList = NIL; + + /* first, establish new connections */ + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert(superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + + connectionList = lappend(connectionList, connection); + } /* * Propagate node wide objects. It includes only roles for now. */ - commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); + PropagateNodeWideObjects(connectionList, commandList); /* * Replicate all objects of the pg_dist_object to the remote node. */ - commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList( - workerNode->workerName, workerNode->workerPort)); + ReplicateAllObjectsToNodes(connectionList, commandList); /* * After creating each table, handle the inter table relationship between * those tables. */ - commandList = list_concat(commandList, InterTableRelationshipCommandList()); - - return commandList; + BuildInterTableRelationships(connectionList, commandList); } @@ -838,14 +868,9 @@ SyncDistributedObjectsToNodeList(List *workerNodeList) Assert(ShouldPropagate()); - List *commandList = SyncDistributedObjectsCommandList(linitial(workerNodeList)); - /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - workerNodeList, - CurrentUserName(), - commandList); + SyncDistributedObjects(workerNodeList, NULL); } diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 712ed6b0a..f7b57d4b0 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -52,7 +52,8 @@ activate_node_snapshot(PG_FUNCTION_ARGS) List *updateLocalGroupCommand = list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); List *dropMetadataCommandList = DropExistingMetadataCommandList(); - List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode); + List *syncDistObjCommands = NIL; + SyncDistributedObjects(NIL, &syncDistObjCommands); List *dropNodeSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 5c93c0bb7..990d9ecc8 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -349,7 +349,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateAnyObject(List *addresses); -extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort); +extern void ReplicateAllObjectsToNodes(List *connectionList, List **commandList); /* Remaining metadata utility functions */ extern Oid TableOwnerOid(Oid relationId); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 0e0e96ccb..92fc2e922 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -106,7 +106,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); -extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode); +extern void SyncDistributedObjects(List *workerNodeList, List **commandList); extern List * PgDistTableMetadataSyncCommandList(void); /* Function declarations for worker node utilities */