From 512c72d2d0bc174f5d5d92518cbfb064ba07429a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 1 Dec 2022 23:48:02 +0100 Subject: [PATCH] sync objects without coordinated tx --- .../distributed/metadata/metadata_sync.c | 133 +++++++++--------- .../distributed/metadata/node_metadata.c | 102 +++++--------- src/include/distributed/metadata_sync.h | 3 +- src/include/distributed/worker_manager.h | 3 + 4 files changed, 102 insertions(+), 139 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 07ed45375..d972ccbda 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -104,6 +104,7 @@ static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey); static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError); +static List * NodeMetadataReCreateCommandList(WorkerNode *workerNode); static void DropMetadataSnapshotOnNode(WorkerNode *workerNode); static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName); @@ -226,10 +227,8 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) * start_metadata_sync_to_node(). */ void -SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) +SyncNodeMetadataToNode(ActivateNodeContext activateContext) { - char *escapedNodeName = quote_literal_cstr(nodeNameString); - CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureModificationsCanRun(); @@ -238,58 +237,46 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) LockRelationOid(DistNodeRelationId(), ExclusiveLock); - WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); - if (workerNode == NULL) + WorkerNode *workerNode = NULL; + MultiConnection *connection = NULL; + forboth_ptr(workerNode, activateContext.workerNodeList, + connection, activateContext.connectionList) { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("you cannot sync metadata to a non-existent node"), - errhint("First, add the node with SELECT citus_add_node" - "(%s,%d)", escapedNodeName, nodePort))); + + if (NodeIsCoordinator(workerNode)) + { + return; + } + + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + char *metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + + + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); + metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + + if (!NodeIsPrimary(workerNode)) + { + /* + * If this is a secondary node we can't actually sync metadata to it; we assume + * the primary node is receiving metadata. + */ + return; + } + + List *nodeMetadataCommandList = NodeMetadataReCreateCommandList(workerNode); + metadataSyncCommand = NULL; + foreach_ptr(metadataSyncCommand, nodeMetadataCommandList) + { + ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + } } - - if (!workerNode->isActive) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("you cannot sync metadata to an inactive node"), - errhint("First, activate the node with " - "SELECT citus_activate_node(%s,%d)", - escapedNodeName, nodePort))); - } - - if (NodeIsCoordinator(workerNode)) - { - return; - } - - UseCoordinatedTransaction(); - - /* - * One would normally expect to set hasmetadata first, and then metadata sync. - * However, at this point we do the order reverse. - * We first set metadatasynced, and then hasmetadata; since setting columns for - * nodes with metadatasynced==false could cause errors. - * (See ErrorIfAnyMetadataNodeOutOfSync) - * We can safely do that because we are in a coordinated transaction and the changes - * are only visible to our own transaction. - * If anything goes wrong, we are going to rollback all the changes. - */ - workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum( - true)); - - if (!NodeIsPrimary(workerNode)) - { - /* - * If this is a secondary node we can't actually sync metadata to it; we assume - * the primary node is receiving metadata. - */ - return; - } - - /* fail if metadata synchronization doesn't succeed */ - bool raiseInterrupts = true; - SyncNodeMetadataSnapshotToNode(workerNode, raiseInterrupts); } @@ -609,21 +596,8 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) { char *currentUser = CurrentUserName(); - /* generate and add the local group id's update query */ - char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); - - /* generate the queries which drop the node metadata */ - List *dropMetadataCommandList = NodeMetadataDropCommands(); - - /* generate the queries which create the node metadata from scratch */ - List *createMetadataCommandList = NodeMetadataCreateCommands(); - - List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand); - recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, - dropMetadataCommandList); - recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, - createMetadataCommandList); - + List *recreateMetadataSnapshotCommandList = + NodeMetadataReCreateCommandList(workerNode); /* * Send the snapshot recreation commands in a single remote transaction and * if requested, error out in any kind of failure. Note that it is not @@ -650,6 +624,29 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) } +static List * +NodeMetadataReCreateCommandList(WorkerNode *workerNode) +{ + + /* generate and add the local group id's update query */ + char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + + /* generate the queries which drop the node metadata */ + List *dropMetadataCommandList = NodeMetadataDropCommands(); + + /* generate the queries which create the node metadata from scratch */ + List *createMetadataCommandList = NodeMetadataCreateCommands(); + + List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + dropMetadataCommandList); + recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, + createMetadataCommandList); + + return recreateMetadataSnapshotCommandList; + +} + /* * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them * to the worker given as parameter. diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 37ad41916..1ff67ce61 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -92,7 +92,6 @@ static bool PlacementHasActivePlacementOnAnotherGroup(GroupShardPlacement *sourcePlacement); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); -static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); @@ -101,7 +100,6 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext); -static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext); static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext); static void BlockDistributedQueriesOnMetadataNodes(void); @@ -113,9 +111,6 @@ static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activat static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAllWorkers(void); -static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, - int columnIndex, - Datum value); static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata); static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, @@ -868,26 +863,6 @@ SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeConte } -/* - * UpdateLocalGroupIdOnNode updates local group id on node. - */ -static void -UpdateLocalGroupIdOnNode(WorkerNode *workerNode) -{ - if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) - { - List *commandList = list_make1(LocalGroupIdUpdateCommand(workerNode->groupId)); - - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - list_make1(workerNode), - CurrentUserName(), - commandList); - } -} - - /* * SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard * pg_dist_placement and pg_dist_object metadata entries. @@ -1142,9 +1117,13 @@ ActivateNodeList(List *nodeList) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - ActivateNodeContext activateContext; + ActivateNodeContext activateContext = {}; activateContext.fetchCommands = false; + activateContext.workerNodeList = NIL; + activateContext.connectionList = NIL; + activateContext.commandList = NIL; + WorkerNode *node = NULL; foreach_ptr(node, nodeList) @@ -1196,19 +1175,6 @@ ActivateNodeList(List *nodeList) bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); if (syncMetadata) { - /* - * We are going to sync the metadata anyway in this transaction, so do - * not fail just because the current metadata is not synced. - */ - SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - - /* - * Update local group id first, as object dependency logic requires to have - * updated local group id. - */ - UpdateLocalGroupIdOnNode(workerNode); - activateContext.workerNodeList = lappend(activateContext.workerNodeList, workerNode); @@ -1220,6 +1186,23 @@ ActivateNodeList(List *nodeList) workerNode->workerPort, NULL, NULL); activateContext.connectionList = lappend(activateContext.connectionList, connection); + + /* + * We are going to sync the metadata anyway in this transaction, so do + * not fail just because the current metadata is not synced. + */ + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + char *metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); + + /* + * Update local group id first, as object dependency logic requires to have + * updated local group id. + */ + char *localGroupCommand = LocalGroupIdUpdateCommand(workerNode->groupId); + ExecuteRemoteCommandInConnectionList(list_make1(connection), localGroupCommand); } } @@ -1255,10 +1238,7 @@ ActivateNodeList(List *nodeList) * related pg_dist_xxx metadata. Since table related metadata requires * to have right pg_dist_node entries. */ - foreach_ptr(node, activateContext.workerNodeList) - { - SyncNodeMetadataToNode(node->workerName, node->workerPort); - } + SyncNodeMetadataToNode(activateContext); /* * As the last step, sync the table related metadata to the remote node. @@ -1267,12 +1247,16 @@ ActivateNodeList(List *nodeList) */ SyncPgDistTableMetadataToNodeList(activateContext); - foreach_ptr(node, nodeList) + node = NULL; + MultiConnection *connection = NULL; + forboth_ptr(node, activateContext.workerNodeList, connection,activateContext.connectionList) { - bool isActive = true; - /* finally, let all other active metadata nodes to learn about this change */ - SetNodeState(node->workerName, node->workerPort, isActive); + SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, BoolGetDatum(true)); + char *metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(node, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand); } } @@ -1342,18 +1326,12 @@ DropExistingMetadataCommandList() int ActivateNode(char *nodeName, int nodePort) { - bool isActive = true; - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); ActivateNodeList(list_make1(workerNode)); - /* finally, let all other active metadata nodes to learn about this change */ - WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); - Assert(newWorkerNode->nodeId == workerNode->nodeId); - TransactionModifiedNodeMetadata = true; - return newWorkerNode->nodeId; + return workerNode->nodeId; } @@ -2386,7 +2364,7 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value) * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is * valid or not. Then it returns the necessary metadata sync command as a string. */ -static char * +char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum value) { @@ -2500,20 +2478,6 @@ SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards) } -/* - * SetNodeState function sets the isactive column of the specified worker in - * pg_dist_node to isActive. Also propagates this to other metadata nodes. - * It returns the new worker node after the modification. - */ -static WorkerNode * -SetNodeState(char *nodeName, int nodePort, bool isActive) -{ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum( - isActive)); -} - - /* * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * node is not found this function returns NULL. diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 08cc585ab..b381912d4 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -46,9 +46,8 @@ typedef struct SequenceInfo bool isNextValDefault; } SequenceInfo; - /* Functions declarations for metadata syncing */ -extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort); +extern void SyncNodeMetadataToNode(ActivateNodeContext activateContext); extern void SyncCitusTableMetadata(Oid relationId); extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index d768202c5..907129463 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -113,6 +113,9 @@ extern WorkerNode * SetWorkerColumnOptional(WorkerNode *workerNode, int columnIn value); extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value); +extern char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, + int columnIndex, + Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern void SyncDistributedObjects(ActivateNodeContext activateNodeContext);