diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 853b9ccde..435501ac9 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -532,14 +532,16 @@ GetAllDependencyCreateDDLCommands(const List *dependencies) * clusterHasDistributedFunction if there are any distributed functions. */ void -ReplicateAllObjectsToNodes(List *connectionList, List **commandList) +ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext) { + List *connectionList = activateNodeContext.connectionList; + /* since we are executing ddl commands disable propagation first, primarily for mx */ ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - *commandList = lappend(*commandList, DISABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, DISABLE_DDL_PROPAGATION); } /* @@ -602,19 +604,20 @@ ReplicateAllObjectsToNodes(List *connectionList, List **commandList) { ExecuteRemoteCommandInConnectionList(connectionList, command); - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - *commandList = lappend(*commandList, command); + activateNodeContext.commandList = + lappend(activateNodeContext.commandList, command); } } } ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION); } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 62314e56f..37ad41916 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -100,16 +100,16 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SyncDistributedObjectsInOutsideTransaction(List *workerNodeList); +static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); -static void SyncPgDistTableMetadataToNodeList(List *nodeList); -static void BuildInterTableRelationships(List *connectionList, List **commandList); +static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext); +static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static void PropagateNodeWideObjects(List *connectionList, List **commandList); +static void PropagateNodeWideObjects(ActivateNodeContext activateNodeContext); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); -static void DropExistingMetadataInOutsideTransaction(List *nodeList); +static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAllWorkers(void); @@ -654,7 +654,7 @@ master_set_node_property(PG_FUNCTION_ARGS) * for each citus table. */ static void -BuildInterTableRelationships(List *connectionList, List **commandList) +BuildInterTableRelationships(ActivateNodeContext activateNodeContext) { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -673,6 +673,7 @@ BuildInterTableRelationships(List *connectionList, List **commandList) } } + List *connectionList = activateNodeContext.connectionList; ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); foreach_ptr(cacheEntry, propagatedTableList) @@ -687,21 +688,24 @@ BuildInterTableRelationships(List *connectionList, List **commandList) { ExecuteRemoteCommandInConnectionList(connectionList, command); - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - *commandList = lappend(*commandList, command); + activateNodeContext.commandList = + lappend(activateNodeContext.commandList, command); } } } ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - *commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); - *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + activateNodeContext.commandList = + lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList); + activateNodeContext.commandList = + lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION); } } @@ -763,7 +767,7 @@ PgDistTableMetadataSyncCommandList(void) * change system wide behaviour. */ static void -PropagateNodeWideObjects(List *connectionList, List **commandList) +PropagateNodeWideObjects(ActivateNodeContext activateNodeContext) { if (EnableAlterRoleSetPropagation) { @@ -775,24 +779,24 @@ PropagateNodeWideObjects(List *connectionList, List **commandList) if (alterRoleSetCommands != NIL) { - ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, DISABLE_DDL_PROPAGATION); char *command = NULL; foreach_ptr(command, alterRoleSetCommands) { - ExecuteRemoteCommandInConnectionList(connectionList, command); + ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, command); } - ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, ENABLE_DDL_PROPAGATION); /* the caller is interested in collecting the commands */ - if (commandList != NULL) + if (activateNodeContext.fetchCommands) { - *commandList = list_concat(*commandList, alterRoleSetCommands); + activateNodeContext.commandList = list_concat(activateNodeContext.commandList, alterRoleSetCommands); /* if there are command wrap them in enable_ddl_propagation off */ - *commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); - *commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION); } } } @@ -816,46 +820,23 @@ PropagateNodeWideObjects(List *connectionList, List **commandList) * partitioning, so use the list cautiously. */ void -SyncDistributedObjects(List *workerNodeList, List **commandList) +SyncDistributedObjects(ActivateNodeContext activateNodeContext) { - List *connectionList = NIL; - - /* first, establish new connections */ - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - int connectionFlags = FORCE_NEW_CONNECTION; - - Assert(superuser()); - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, - workerNode->workerPort, NULL, NULL); - - connectionList = lappend(connectionList, connection); - } - /* * Propagate node wide objects. It includes only roles for now. */ - PropagateNodeWideObjects(connectionList, commandList); + PropagateNodeWideObjects(activateNodeContext); /* * Replicate all objects of the pg_dist_object to the remote node. */ - ReplicateAllObjectsToNodes(connectionList, commandList); + ReplicateAllObjectsToNodes(activateNodeContext); /* * After creating each table, handle the inter table relationship between * those tables. */ - BuildInterTableRelationships(connectionList, commandList); - - /* finally, close the connections as we don't need them anymore */ - MultiConnection *connection; - foreach_ptr(connection, connectionList) - { - CloseConnection(connection); - } + BuildInterTableRelationships(activateNodeContext); } @@ -870,9 +851,9 @@ SyncDistributedObjects(List *workerNodeList, List **commandList) * since all the dependencies should be present in the coordinator already. */ static void -SyncDistributedObjectsInOutsideTransaction(List *workerNodeList) +SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext) { - if (workerNodeList == NIL) + if (activateNodeContext.workerNodeList == NIL) { return; } @@ -883,7 +864,7 @@ SyncDistributedObjectsInOutsideTransaction(List *workerNodeList) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SyncDistributedObjects(workerNodeList, NULL); + SyncDistributedObjects(activateNodeContext); } @@ -913,14 +894,14 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) * */ static void -SyncPgDistTableMetadataToNodeList(List *nodeList) +SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext) { /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); List *nodesWithMetadata = NIL; WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) + foreach_ptr(workerNode, activateContext.workerNodeList) { if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) { @@ -935,34 +916,13 @@ SyncPgDistTableMetadataToNodeList(List *nodeList) List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); - List *connectionList = NIL; - - /* first, establish new connections */ - workerNode = NULL; - foreach_ptr(workerNode, nodesWithMetadata) - { - int connectionFlags = FORCE_NEW_CONNECTION; - - Assert(superuser()); - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, - workerNode->workerPort, NULL, NULL); - - connectionList = lappend(connectionList, connection); - } + List *connectionList = activateContext.connectionList; char *command = NULL; foreach_ptr(command, syncPgDistMetadataCommandList) { ExecuteRemoteCommandInConnectionList(connectionList, command); } - - /* finally, close the connections as we don't need them anymore */ - MultiConnection *connection; - foreach_ptr(connection, connectionList) - { - CloseConnection(connection); - } } @@ -1182,8 +1142,10 @@ ActivateNodeList(List *nodeList) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); + ActivateNodeContext activateContext; + + activateContext.fetchCommands = false; - List *nodeToSyncMetadata = NIL; WorkerNode *node = NULL; foreach_ptr(node, nodeList) { @@ -1247,7 +1209,17 @@ ActivateNodeList(List *nodeList) */ UpdateLocalGroupIdOnNode(workerNode); - nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); + activateContext.workerNodeList = + lappend(activateContext.workerNodeList, workerNode); + + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert(superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + activateContext.connectionList = + lappend(activateContext.connectionList, connection); } } @@ -1265,7 +1237,7 @@ ActivateNodeList(List *nodeList) * It incurs some risk of failures during these steps causing * issues if any of the steps is not idempotent. */ - DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata); + DropExistingMetadataInOutsideTransaction(activateContext); /* @@ -1276,14 +1248,14 @@ ActivateNodeList(List *nodeList) * Given that object creation is idempotent, we can afford to run them * outside the transaction. */ - SyncDistributedObjectsInOutsideTransaction(nodeToSyncMetadata); + SyncDistributedObjectsInOutsideTransaction(activateContext); /* * Sync node metadata. We must sync node metadata before syncing table * related pg_dist_xxx metadata. Since table related metadata requires * to have right pg_dist_node entries. */ - foreach_ptr(node, nodeToSyncMetadata) + foreach_ptr(node, activateContext.workerNodeList) { SyncNodeMetadataToNode(node->workerName, node->workerPort); } @@ -1293,7 +1265,7 @@ ActivateNodeList(List *nodeList) * We must handle it as the last step because of limitations shared with * above comments. */ - SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); + SyncPgDistTableMetadataToNodeList(activateContext); foreach_ptr(node, nodeList) { @@ -1312,36 +1284,20 @@ ActivateNodeList(List *nodeList) * The function establishes one connection per node, and closes at the end. */ static void -DropExistingMetadataInOutsideTransaction(List *nodeList) +DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext) { - List *connectionList = NIL; - - /* first, establish new connections */ - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) - { - int connectionFlags = FORCE_NEW_CONNECTION; - - Assert(superuser()); - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, - workerNode->workerPort, NULL, NULL); - - connectionList = lappend(connectionList, connection); - } - char *command = NULL; List *commandList = DropExistingMetadataCommandList(); foreach_ptr(command, commandList) { - ExecuteRemoteCommandInConnectionList(connectionList, command); + ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, + command); } - /* finally, close the connections as we don't need them anymore */ - MultiConnection *connection; - foreach_ptr(connection, connectionList) + if (activateNodeContext.fetchCommands) { - CloseConnection(connection); + activateNodeContext.commandList = + list_concat(activateNodeContext.commandList, commandList); } } diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index f7b57d4b0..e36fc4ce7 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -52,8 +52,10 @@ activate_node_snapshot(PG_FUNCTION_ARGS) List *updateLocalGroupCommand = list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); List *dropMetadataCommandList = DropExistingMetadataCommandList(); - List *syncDistObjCommands = NIL; - SyncDistributedObjects(NIL, &syncDistObjCommands); + ActivateNodeContext context; + context.fetchCommands = true; + SyncDistributedObjects(context); + List *syncDistObjCommands = context.commandList; List *dropNodeSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 990d9ecc8..6b4ede5ad 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -349,7 +349,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateAnyObject(List *addresses); -extern void ReplicateAllObjectsToNodes(List *connectionList, List **commandList); +extern void ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext); /* Remaining metadata utility functions */ extern Oid TableOwnerOid(Oid relationId); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 92fc2e922..d768202c5 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -57,6 +57,15 @@ typedef struct WorkerNode } WorkerNode; +typedef struct ActivateNodeContext +{ + List *workerNodeList; + List *connectionList; + + bool fetchCommands; + List *commandList; +} ActivateNodeContext; + /* Config variables managed via guc.c */ extern int MaxWorkerNodesTracked; extern char *WorkerListFileName; @@ -106,7 +115,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); -extern void SyncDistributedObjects(List *workerNodeList, List **commandList); +extern void SyncDistributedObjects(ActivateNodeContext activateNodeContext); extern List * PgDistTableMetadataSyncCommandList(void); /* Function declarations for worker node utilities */