From da3a0f4cdd0eda46175a9c7cee4e35f454894ef7 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Thu, 20 Jan 2022 17:03:50 +0300 Subject: [PATCH] Combine metadata logic --- .../distributed/commands/dependencies.c | 2 +- .../distributed/metadata/node_metadata.c | 202 +++++++----------- src/backend/distributed/test/metadata_sync.c | 12 +- src/include/distributed/metadata_utility.h | 3 +- src/include/distributed/worker_manager.h | 2 +- 5 files changed, 83 insertions(+), 138 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 4bcaa4bcd..5573f301f 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -372,7 +372,7 @@ GetCitusTableDDLCommandList(Oid relationId) * clusterHasDistributedFunction if there are any distributed functions. */ List * -ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort) +ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) { /* since we are executing ddl commands disable propagation first, primarily for mx */ List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index fa693b2bc..219a2aeb0 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -107,10 +107,8 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static List * MetadataSetupCommandList(); -static List * ClearMetadataCommandList(); -static List * ClearShellTablesCommandList(); -static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); +static List * PgDistMetadataSyncCommandList(); +static void SyncTableMetadataToNode(WorkerNode *workerNode); static List * InterTableRelationshipCommandList(); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static List * PropagateNodeWideObjectsCommandList(); @@ -630,11 +628,11 @@ InterTableRelationshipCommandList() /* - * MetadataSetupCommandList get the command list to set up the metadata - * depending on the distributed object on the given node. + * PgDistMetadataSyncCommandList returns the command list to sync the pg_dist_* + * metadata. */ static List * -MetadataSetupCommandList() +PgDistMetadataSyncCommandList() { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -695,27 +693,6 @@ MetadataSetupCommandList() } -/* - * ClearShellTablesCommandList returns the command list to clear (shell) distributed - * tables from the given node. - */ -static List * -ClearShellTablesCommandList() -{ - List *clearShellTablesCommandList = NIL; - - clearShellTablesCommandList = lappend(clearShellTablesCommandList, - REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND); - - clearShellTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION), - clearShellTablesCommandList); - clearShellTablesCommandList = list_concat(clearShellTablesCommandList, - list_make1(ENABLE_DDL_PROPAGATION)); - - return clearShellTablesCommandList; -} - - /* * PropagateNodeWideObjectsCommandList is called during node activation to * propagate any object that should be propagated for every node. These are @@ -749,94 +726,68 @@ PropagateNodeWideObjectsCommandList() /* - * RecreateDistributedTablesWithDependenciesCommandList return command list to recreate - * distributed tables with command list. + * SyncTableMetadataCommandList returns commands to sync table metadata to the + * given worker node. To be idempotent, it first drops the ones required to be + * dropped. + * + * Table metadata includes: + * + * - All dependencies (e.g., types, schemas, sequences) + * - All shell distributed tables + * - pg_dist_partition, pg_dist_shard, pg_dist_placement, pg_dist_object + * */ List * -RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode) +SyncTableMetadataCommandList(WorkerNode *workerNode) { List *commandList = NIL; + /* + * Remove shell tables and pg_dist_* metadata. + */ commandList = list_concat(commandList, DetachPartitionCommandList()); - commandList = list_concat(commandList, ClearShellTablesCommandList()); + commandList = lappend(commandList, REMOVE_ALL_CITUS_TABLES_COMMAND); + commandList = lappend(commandList, DELETE_ALL_DISTRIBUTED_OBJECTS); + + /* + * Propagate node wide objects. It includes only roles for now. + */ commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); - commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand( - workerNode->groupId))); - commandList = list_concat(commandList, - ReplicateAllDependenciesToNodeCommandList( - workerNode->workerName, - workerNode->workerPort)); - commandList = list_concat(commandList, - InterTableRelationshipCommandList()); + + /* + * Replicate all objects of the pg_dist_object to the remote node. We need to + * update local group id first, as sequence replication logic depends on it. + */ + commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(workerNode->groupId))); + commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList(workerNode->workerName, workerNode->workerPort)); + + /* + * After creating each table, handle the inter table relationship between + * those tables. + */ + commandList = list_concat(commandList, InterTableRelationshipCommandList()); + + /* + * Finally create pg_dist_* entries + */ + List *syncPgDistMetadataCommandList = PgDistMetadataSyncCommandList(); + commandList = list_concat(commandList, syncPgDistMetadataCommandList); return commandList; } /* - * ClearMetadataCommandList returns the command list to clear all the - * distributed objects and metadata from the given node. - */ -static List * -ClearMetadataCommandList() -{ - List *clearDistTableInfoCommandList = NIL; - - clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - DELETE_ALL_PARTITIONS); - - clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - DELETE_ALL_SHARDS); - - clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - DELETE_ALL_PLACEMENTS); - - clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); - - clearDistTableInfoCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION), - clearDistTableInfoCommandList); - clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1( - ENABLE_DDL_PROPAGATION)); - - return clearDistTableInfoCommandList; -} - - -/* - * ResyncMetadataCommandList returns the command list to resync all the - * distributed table related metadata. - */ -List * -ResyncMetadataCommandList(void) -{ - List *resyncMetadataCommandList = NIL; - - List *clearMetadataCommandList = ClearMetadataCommandList(); - resyncMetadataCommandList = list_concat(resyncMetadataCommandList, - clearMetadataCommandList); - - List *setupMetadataCommandList = MetadataSetupCommandList(); - resyncMetadataCommandList = list_concat(resyncMetadataCommandList, - setupMetadataCommandList); - - return resyncMetadataCommandList; -} - - -/* - * SetUpDistributedTableWithDependencies sets up up the following on a node if it's - * a primary node that currently stores data: + * SyncTableMetadataToNode sync the table metadata to the node. Metadata includes * - All dependencies (e.g., types, schemas, sequences) * - All shell distributed table - * - Reference tables, because they are needed to handle queries efficiently. - * - Distributed functions + * - pg_dist_partition, pg_dist_shard, pg_dist_placement, pg_dist_object * * Note that we do not create the distributed dependencies on the coordinator * since all the dependencies should be present in the coordinator already. */ static void -SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) +SyncTableMetadataToNode(WorkerNode *newWorkerNode) { if (NodeIsPrimary(newWorkerNode)) { @@ -845,8 +796,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) Assert(ShouldPropagate()); if (!NodeIsCoordinator(newWorkerNode)) { - List *commandList = RecreateDistributedTablesWithDependenciesCommandList( - newWorkerNode); + List *commandList = SyncTableMetadataCommandList(newWorkerNode); /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); @@ -856,12 +806,6 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) CurrentUserName(), commandList); } - - if (ReplicateReferenceTablesOnActivate) - { - ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, - newWorkerNode->workerPort); - } } } @@ -1080,12 +1024,6 @@ ActivateNode(char *nodeName, int nodePort) */ EnsureSuperUser(); - if (!EnableDependencyCreation) - { - ereport(ERROR, (errmsg("citus.enable_object_propagation must be on to " - "add an active node"))); - } - /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -1122,26 +1060,38 @@ ActivateNode(char *nodeName, int nodePort) /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ bool syncMetadata = EnableMetadataSyncByDefault && NodeIsPrimary(workerNode); - if (syncMetadata) + if (syncMetadata && EnableDependencyCreation) { + /* + * We are going to sync the metadata anyway in this transaction, so do + * not fail just because the current metadata is not synced. + */ + SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + + /* + * Sync table metadata first. Please check the comment on SyncTableMetadataToNode + * for the definition of table metadata. + */ + SyncTableMetadataToNode(workerNode); + + /* + * We need to replicate reference tables before syncing node metadata, otherwise + * reference table replication logic would try to get lock on the new node before + * having the shard placement on it + */ + if (ReplicateReferenceTablesOnActivate) + { + ReplicateAllReferenceTablesToNode(workerNode->workerName, + workerNode->workerPort); + } + + /* + * Sync node metadata (pg_dist_node) finally. + */ SyncNodeMetadataToNode(nodeName, nodePort); } - SetUpDistributedTableWithDependencies(workerNode); - - if (syncMetadata && !NodeIsCoordinator(workerNode)) - { - List *metadataUpdateCommandList = ResyncMetadataCommandList(); - - /* send commands to the new worker, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - metadataUpdateCommandList); - } - /* finally, let all other active metadata nodes to learn about this change */ WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); Assert(newWorkerNode->nodeId == workerNode->nodeId); diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 026dd1664..e339b8102 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -48,22 +48,18 @@ activate_node_snapshot(PG_FUNCTION_ARGS) */ WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode(); - List *recreateTablesCommands = RecreateDistributedTablesWithDependenciesCommandList( - dummyWorkerNode); + List *syncTableMetadataCommandList = SyncTableMetadataCommandList(dummyWorkerNode); List *dropSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); - List *metadataUpdateCommandList = ResyncMetadataCommandList(); + List *activateNodeCommandList = NIL; int activateNodeCommandIndex = 0; Oid ddlCommandTypeId = TEXTOID; activateNodeCommandList = list_concat(activateNodeCommandList, - recreateTablesCommands); + syncTableMetadataCommandList); activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands); - activateNodeCommandList = list_concat(activateNodeCommandList, - createSnapshotCommands); - activateNodeCommandList = list_concat(activateNodeCommandList, - metadataUpdateCommandList); + activateNodeCommandList = list_concat(activateNodeCommandList, createSnapshotCommands); int activateNodeCommandCount = list_length(activateNodeCommandList); Datum *activateNodeCommandDatumArray = palloc0(activateNodeCommandCount * diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 77f089a7a..44b4b05a8 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -249,8 +249,7 @@ extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); extern bool ShouldPropagateObject(const ObjectAddress *address); -extern List * ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int - nodePort); +extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort); /* Remaining metadata utility functions */ extern char * TableOwner(Oid relationId); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 6cd566f55..f61886344 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -105,7 +105,7 @@ extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern List * RecreateDistributedTablesWithDependenciesCommandList( WorkerNode *workerNode); -extern List * ResyncMetadataCommandList(void); +extern List * SyncTableMetadataCommandList(WorkerNode *workerNode); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);