diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index b3665fd18..736d9ae7b 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -368,12 +368,12 @@ GetCitusTableDDLCommandList(Oid relationId) /* - * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker - * node. The function also sets clusterHasDistributedFunction if there are any - * distributed functions. + * ReplicateAllDependenciesToNodeCommandList returns commands to replicate all + * previously marked objects to a worker node. The function also sets + * clusterHasDistributedFunction if there are any distributed functions. */ -void -ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) +List * +ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort) { List *ddlCommands = NIL; @@ -412,22 +412,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) ddlCommands = list_concat(ddlCommands, GetDependencyCreateDDLCommands(dependency)); } - if (list_length(ddlCommands) <= 0) - { - /* no commands to replicate dependencies to the new worker */ - return; - } /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - /* send commands to new workers, the current user should a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, - nodePort, - CurrentUserName(), - ddlCommands); + return ddlCommands; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index bf4efa264..1588b1036 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -107,14 +107,16 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SetUpObjectMetadata(WorkerNode *workerNode); -static void ClearDistributedObjectsFromNode(WorkerNode *workerNode); -static void ClearDistributedTablesFromNode(WorkerNode *workerNode); -static void UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode); +static List * ResyncMetadataCommandList(); +static List * MetadataSetupCommandList(); +static List * ClearMetadataCommandList(); +static List * ClearShellTablesCommandList(); +static List * RecreateDistributedTablesWithDependenciesCommandList( + WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); -static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); +static List * MultipleDistributedTableIntegrationsCommandList(); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); +static List * PropagateNodeWideObjectsCommandList(); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); @@ -580,16 +582,15 @@ master_set_node_property(PG_FUNCTION_ARGS) /* - * SetUpMultipleDistributedTableIntegrations set up the multiple integrations - * including + * MultipleDistributedTableIntegrationsCommandList returns the command list to + * set up the multiple integrations including * * (i) Foreign keys * (ii) Partionining hierarchy * - * on the given worker node. */ -static void -SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) +static List * +MultipleDistributedTableIntegrationsCommandList() { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -650,21 +651,16 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList, ENABLE_DDL_PROPAGATION); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - multipleTableIntegrationCommandList); + return multipleTableIntegrationCommandList; } /* - * SetUpObjectMetadata sets up the metadata depending on the distributed object - * on the given node. + * MetadataSetupCommandList get the command list to set up the metadata + * depending on the distributed object on the given node. */ -static void -SetUpObjectMetadata(WorkerNode *workerNode) +static List * +MetadataSetupCommandList() { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -721,71 +717,81 @@ SetUpObjectMetadata(WorkerNode *workerNode) metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, ENABLE_DDL_PROPAGATION); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - metadataSnapshotCommandList); + return metadataSnapshotCommandList; } /* - * UpdatePgDistLocalGroupOnNode updates the pg_dist_local_group on the given node + * RecreateDistributedTablesWithDependenciesCommandList return command list to recreate + * distributed tables with command list. */ -static void -UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode) +static List * +RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode) { - /* generate and add the local group id's update query */ - char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + List *commandList = NIL; - List *localGroupIdUpdateCommandList = list_make1(localGroupIdUpdateCommand); + commandList = list_concat(commandList, DetachPartitionCommandList()); + commandList = list_concat(commandList, ClearShellTablesCommandList()); + commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); + commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand( + workerNode->groupId))); + commandList = list_concat(commandList, ReplicateAllDependenciesToNodeCommandList( + workerNode->workerName, workerNode->workerPort)); + commandList = list_concat(commandList, + MultipleDistributedTableIntegrationsCommandList()); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - localGroupIdUpdateCommandList); + return commandList; } /* - * ClearDistributedTablesFromNode clear (shell) distributed tables from the given node. + * ClearShellTablesCommandList returns the command list to clear (shell) distributed + * tables from the given node. */ -static void -ClearDistributedTablesFromNode(WorkerNode *workerNode) +static List * +ClearShellTablesCommandList() { - List *clearDistributedTablesCommandList = NIL; + List *clearShellTablesCommandList = NIL; - List *detachPartitionCommandList = DetachPartitionCommandList(); + clearShellTablesCommandList = lappend(clearShellTablesCommandList, + REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND); - clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList, - detachPartitionCommandList); + clearShellTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION), + clearShellTablesCommandList); + clearShellTablesCommandList = list_concat(clearShellTablesCommandList, + list_make1(ENABLE_DDL_PROPAGATION)); - clearDistributedTablesCommandList = lappend(clearDistributedTablesCommandList, - REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND); - - clearDistributedTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION), - clearDistributedTablesCommandList); - clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList, - list_make1( - ENABLE_DDL_PROPAGATION)); - - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - clearDistributedTablesCommandList); + return clearShellTablesCommandList; } /* - * ClearDistributedObjectsFromNode clears all the distributed objects, metadata and partition hierarchy from the given node. + * ResyncMetadataCommandList returns the command list to resync all the + * distributed table related metadata. */ -static void -ClearDistributedObjectsFromNode(WorkerNode *workerNode) +static List * +ResyncMetadataCommandList() +{ + List *resyncMetadataCommandList = NIL; + + List *clearMetadataCommandList = ClearMetadataCommandList(); + resyncMetadataCommandList = list_concat(resyncMetadataCommandList, + clearMetadataCommandList); + + List *setupMetadataCommandList = MetadataSetupCommandList(); + resyncMetadataCommandList = list_concat(resyncMetadataCommandList, + setupMetadataCommandList); + + return resyncMetadataCommandList; +} + + +/* + * ClearMetadataCommandList returns the command list to clear all the + * distributed objects and metadata from the given node. + */ +static List * +ClearMetadataCommandList() { List *clearDistTableInfoCommandList = NIL; @@ -806,11 +812,7 @@ ClearDistributedObjectsFromNode(WorkerNode *workerNode) clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1( ENABLE_DDL_PROPAGATION)); - char *currentUser = CurrentUserName(); - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - clearDistTableInfoCommandList); + return clearDistTableInfoCommandList; } @@ -835,12 +837,17 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) Assert(ShouldPropagate()); if (!NodeIsCoordinator(newWorkerNode)) { - ClearDistributedTablesFromNode(newWorkerNode); - PropagateNodeWideObjects(newWorkerNode); - UpdatePgDistLocalGroupOnNode(newWorkerNode); - ReplicateAllDependenciesToNode(newWorkerNode->workerName, - newWorkerNode->workerPort); - SetUpMultipleDistributedTableIntegrations(newWorkerNode); + List *commandList = RecreateDistributedTablesWithDependenciesCommandList( + newWorkerNode); + + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction( + newWorkerNode->workerName, + newWorkerNode-> + workerPort, + CurrentUserName(), + commandList); } if (ReplicateReferenceTablesOnActivate) @@ -853,12 +860,12 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) /* - * PropagateNodeWideObjects is called during node activation to propagate any object that - * should be propagated for every node. These are generally not linked to any distributed - * object but change system wide behaviour. + * PropagateNodeWideObjectsCommandList is called during node activation to + * propagate any object that should be propagated for every node. These are + * generally not linked to any distributed object but change system wide behaviour. */ -static void -PropagateNodeWideObjects(WorkerNode *newWorkerNode) +static List * +PropagateNodeWideObjectsCommandList() { /* collect all commands */ List *ddlCommands = NIL; @@ -878,14 +885,9 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) /* if there are command wrap them in enable_ddl_propagation off */ ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CurrentUserName(), - ddlCommands); } + + return ddlCommands; } @@ -1163,8 +1165,15 @@ ActivateNode(char *nodeName, int nodePort) if (!NodeIsCoordinator(workerNode)) { - ClearDistributedObjectsFromNode(workerNode); - SetUpObjectMetadata(workerNode); + List *metadataUpdateCommandList = ResyncMetadataCommandList(); + + /* send commands to the new worker, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction( + workerNode->workerName, + workerNode->workerPort, + CurrentUserName(), + metadataUpdateCommandList); } } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 53eb658d9..651bee20f 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -249,7 +249,8 @@ extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); extern bool ShouldPropagateObject(const ObjectAddress *address); -extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort); +extern List * ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int + nodePort); /* Remaining metadata utility functions */ extern char * TableOwner(Oid relationId);