diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 435501ac9..9db7f7198 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -534,14 +534,15 @@ GetAllDependencyCreateDDLCommands(const List *dependencies) void ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext) { - List *connectionList = activateNodeContext.connectionList; + List *connectionList = activateNodeContext.activatedNodeConnectionList; /* since we are executing ddl commands disable propagation first, primarily for mx */ ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - activateNodeContext.commandList = lappend(activateNodeContext.commandList, DISABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, + DISABLE_DDL_PROPAGATION); } /* @@ -617,7 +618,8 @@ ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext) if (activateNodeContext.fetchCommands) { /* caller requested the commands */ - activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, + ENABLE_DDL_PROPAGATION); } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index d972ccbda..f8137337e 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -239,26 +239,29 @@ SyncNodeMetadataToNode(ActivateNodeContext activateContext) WorkerNode *workerNode = NULL; MultiConnection *connection = NULL; - forboth_ptr(workerNode, activateContext.workerNodeList, - connection, activateContext.connectionList) + forboth_ptr(workerNode, activateContext.activatedNodeList, + connection, activateContext.activatedNodeConnectionList) { - if (NodeIsCoordinator(workerNode)) { return; } SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); + BoolGetDatum(true)); char *metadataSyncCommand = - GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); + GetMetadataSyncCommandToSetNodeColumn(workerNode, + Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, - BoolGetDatum(true)); + BoolGetDatum(true)); metadataSyncCommand = - GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true)); + GetMetadataSyncCommandToSetNodeColumn(workerNode, + Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); if (!NodeIsPrimary(workerNode)) @@ -274,7 +277,8 @@ SyncNodeMetadataToNode(ActivateNodeContext activateContext) metadataSyncCommand = NULL; foreach_ptr(metadataSyncCommand, nodeMetadataCommandList) { - ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + ExecuteRemoteCommandInConnectionList(list_make1(connection), + metadataSyncCommand); } } } @@ -598,6 +602,7 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) List *recreateMetadataSnapshotCommandList = NodeMetadataReCreateCommandList(workerNode); + /* * Send the snapshot recreation commands in a single remote transaction and * if requested, error out in any kind of failure. Note that it is not @@ -627,7 +632,6 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) static List * NodeMetadataReCreateCommandList(WorkerNode *workerNode) { - /* generate and add the local group id's update query */ char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); @@ -644,9 +648,9 @@ NodeMetadataReCreateCommandList(WorkerNode *workerNode) createMetadataCommandList); return recreateMetadataSnapshotCommandList; - } + /* * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them * to the worker given as parameter. diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 1ff67ce61..57ba208a2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -99,7 +99,8 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext); +static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext + activateNodeContext); static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext); static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext); static void BlockDistributedQueriesOnMetadataNodes(void); @@ -107,7 +108,8 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT static void PropagateNodeWideObjects(ActivateNodeContext activateNodeContext); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); -static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext); +static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext + activateNodeContext); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAllWorkers(void); @@ -668,7 +670,7 @@ BuildInterTableRelationships(ActivateNodeContext activateNodeContext) } } - List *connectionList = activateNodeContext.connectionList; + List *connectionList = activateNodeContext.activatedNodeConnectionList; ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); foreach_ptr(cacheEntry, propagatedTableList) @@ -774,24 +776,30 @@ PropagateNodeWideObjects(ActivateNodeContext activateNodeContext) if (alterRoleSetCommands != NIL) { - ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, DISABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList( + activateNodeContext.activatedNodeConnectionList, DISABLE_DDL_PROPAGATION); char *command = NULL; foreach_ptr(command, alterRoleSetCommands) { - ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, command); + ExecuteRemoteCommandInConnectionList( + activateNodeContext.activatedNodeConnectionList, command); } - ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, ENABLE_DDL_PROPAGATION); + ExecuteRemoteCommandInConnectionList( + activateNodeContext.activatedNodeConnectionList, ENABLE_DDL_PROPAGATION); /* the caller is interested in collecting the commands */ if (activateNodeContext.fetchCommands) { - activateNodeContext.commandList = list_concat(activateNodeContext.commandList, alterRoleSetCommands); + activateNodeContext.commandList = list_concat( + activateNodeContext.commandList, alterRoleSetCommands); /* if there are command wrap them in enable_ddl_propagation off */ - activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList); - activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION); + activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION, + activateNodeContext.commandList); + activateNodeContext.commandList = lappend(activateNodeContext.commandList, + ENABLE_DDL_PROPAGATION); } } } @@ -848,7 +856,7 @@ SyncDistributedObjects(ActivateNodeContext activateNodeContext) static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext) { - if (activateNodeContext.workerNodeList == NIL) + if (activateNodeContext.activatedNodeList == NIL) { return; } @@ -876,7 +884,7 @@ SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext) List *nodesWithMetadata = NIL; WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, activateContext.workerNodeList) + foreach_ptr(workerNode, activateContext.activatedNodeList) { if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) { @@ -891,7 +899,7 @@ SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext) List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); - List *connectionList = activateContext.connectionList; + List *connectionList = activateContext.activatedNodeConnectionList; char *command = NULL; foreach_ptr(command, syncPgDistMetadataCommandList) @@ -1117,13 +1125,13 @@ ActivateNodeList(List *nodeList) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - ActivateNodeContext activateContext = {}; + ActivateNodeContext activateContext = { }; activateContext.fetchCommands = false; - activateContext.workerNodeList = NIL; - activateContext.connectionList = NIL; activateContext.commandList = NIL; + activateContext.activatedNodeList = NIL; + activateContext.activatedNodeConnectionList = NIL; WorkerNode *node = NULL; foreach_ptr(node, nodeList) @@ -1175,8 +1183,8 @@ ActivateNodeList(List *nodeList) bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); if (syncMetadata) { - activateContext.workerNodeList = - lappend(activateContext.workerNodeList, workerNode); + activateContext.activatedNodeList = + lappend(activateContext.activatedNodeList, workerNode); int connectionFlags = FORCE_NEW_CONNECTION; @@ -1184,28 +1192,66 @@ ActivateNodeList(List *nodeList) MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, workerNode->workerPort, NULL, NULL); - activateContext.connectionList = - lappend(activateContext.connectionList, connection); + activateContext.activatedNodeConnectionList = + lappend(activateContext.activatedNodeConnectionList, connection); /* * We are going to sync the metadata anyway in this transaction, so do * not fail just because the current metadata is not synced. */ SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); + BoolGetDatum(true)); char *metadataSyncCommand = - GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); - ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + GetMetadataSyncCommandToSetNodeColumn(workerNode, + Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList(list_make1(connection), + metadataSyncCommand); /* * Update local group id first, as object dependency logic requires to have * updated local group id. */ char *localGroupCommand = LocalGroupIdUpdateCommand(workerNode->groupId); - ExecuteRemoteCommandInConnectionList(list_make1(connection), localGroupCommand); + ExecuteRemoteCommandInConnectionList(list_make1(connection), + localGroupCommand); } } + List *existingMetadataNodeList = + TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + RowShareLock); + /*ErrorIfAnyMetadataNodeOutOfSync(existingMetadataNodeList); */ + + node = NULL; + foreach_ptr(node, existingMetadataNodeList) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert(superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, node->workerName, + node->workerPort, NULL, NULL); + + activateContext.existingMetadataNodeConnectionList = + lappend(activateContext.existingMetadataNodeConnectionList, connection); + } + + node = NULL; + foreach_ptr(node, existingMetadataNodeList) + { + if (list_member_ptr(activateContext.activatedNodeList, node)) + { + char *metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(node, + Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList( + activateContext.existingMetadataNodeConnectionList, metadataSyncCommand); + } + } + + /* * Ideally, we'd want to drop and sync the metadata inside a * single transaction. However, for some users, the metadata might @@ -1249,14 +1295,16 @@ ActivateNodeList(List *nodeList) node = NULL; MultiConnection *connection = NULL; - forboth_ptr(node, activateContext.workerNodeList, connection,activateContext.connectionList) + forboth_ptr(node, activateContext.activatedNodeList, connection, + activateContext.activatedNodeConnectionList) { /* finally, let all other active metadata nodes to learn about this change */ SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, BoolGetDatum(true)); char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(node, Anum_pg_dist_node_isactive, BoolGetDatum(true)); - ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + ExecuteRemoteCommandInConnectionList( + activateContext.existingMetadataNodeConnectionList, metadataSyncCommand); } } @@ -1274,8 +1322,9 @@ DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext List *commandList = DropExistingMetadataCommandList(); foreach_ptr(command, commandList) { - ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, - command); + ExecuteRemoteCommandInConnectionList( + activateNodeContext.activatedNodeConnectionList, + command); } if (activateNodeContext.fetchCommands) diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 907129463..f371c0c7c 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -59,8 +59,10 @@ typedef struct WorkerNode typedef struct ActivateNodeContext { - List *workerNodeList; - List *connectionList; + List *activatedNodeList; + List *activatedNodeConnectionList; + + List *existingMetadataNodeConnectionList; bool fetchCommands; List *commandList;