From 35dbdae5a40fa127cded891ac4bce0436cd68b2e Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Tue, 28 Mar 2023 21:34:22 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=2011?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let AddNodeMetadata to use metadatasync api during node addition. --- .../distributed/metadata/metadata_sync.c | 81 +++- .../distributed/metadata/node_metadata.c | 413 ++++++++++++------ src/backend/distributed/test/metadata_sync.c | 10 +- .../transaction/worker_transaction.c | 2 + src/include/distributed/metadata_sync.h | 11 +- src/include/distributed/worker_manager.h | 1 + .../expected/multi_cluster_management.out | 24 +- .../regress/expected/multi_metadata_sync.out | 1 + .../expected/multi_metadata_sync_0.out | 1 + .../upgrade_citus_finish_citus_upgrade.out | 9 + .../regress/sql/multi_cluster_management.sql | 10 +- .../upgrade_citus_finish_citus_upgrade.sql | 5 + 12 files changed, 420 insertions(+), 148 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 1b2a8b84f..4260a4e80 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -194,10 +194,25 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); char *nodeNameString = text_to_cstring(nodeName); + WorkerNode *workerNode = ModifiableWorkerNode(nodeNameString, nodePort); - ActivateNode(nodeNameString, nodePort); + /* + * Create MetadataSyncContext which is used throughout nodes' activation. + * It contains activated nodes, bare connections if the mode is nontransactional, + * and a memory context for allocation. + */ + bool collectCommands = false; + bool nodesAddedInSameTransaction = false; + MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), + collectCommands, + nodesAddedInSameTransaction); + + ActivateNodeList(context); TransactionModifiedNodeMetadata = true; + /* cleanup metadata memory context and connections */ + DestroyMetadataSyncContext(context); + PG_RETURN_VOID(); } @@ -215,24 +230,25 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) EnsureSuperUser(); EnsureCoordinator(); - List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock); + List *nodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); /* - * create MetadataSyncContext which will be used throughout nodes' activation. - * It contains metadata sync nodes, their connections and also a MemoryContext - * for allocations. + * Create MetadataSyncContext which is used throughout nodes' activation. + * It contains activated nodes, bare connections if the mode is nontransactional, + * and a memory context for allocation. */ bool collectCommands = false; - MetadataSyncContext *context = CreateMetadataSyncContext(workerNodes, - collectCommands); + bool nodesAddedInSameTransaction = false; + MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, + collectCommands, + nodesAddedInSameTransaction); ActivateNodeList(context); + TransactionModifiedNodeMetadata = true; /* cleanup metadata memory context and connections */ DestroyMetadataSyncContext(context); - TransactionModifiedNodeMetadata = true; - PG_RETURN_BOOL(true); } @@ -840,6 +856,35 @@ NodeListInsertCommand(List *workerNodeList) } +/* + * NodeListIdempotentInsertCommand generates an idempotent multi-row INSERT command that + * can be executed to insert the nodes that are in workerNodeList to pg_dist_node table. + * It would insert new nodes or replace current nodes with new nodes if nodename-nodeport + * pairs already exist. + */ +char * +NodeListIdempotentInsertCommand(List *workerNodeList) +{ + StringInfo nodeInsertIdempotentCommand = makeStringInfo(); + char *nodeInsertStr = NodeListInsertCommand(workerNodeList); + appendStringInfoString(nodeInsertIdempotentCommand, nodeInsertStr); + char *onConflictStr = " ON CONFLICT ON CONSTRAINT pg_dist_node_nodename_nodeport_key " + "DO UPDATE SET nodeid = EXCLUDED.nodeid, " + "groupid = EXCLUDED.groupid, " + "nodename = EXCLUDED.nodename, " + "nodeport = EXCLUDED.nodeport, " + "noderack = EXCLUDED.noderack, " + "hasmetadata = EXCLUDED.hasmetadata, " + "isactive = EXCLUDED.isactive, " + "noderole = EXCLUDED.noderole, " + "nodecluster = EXCLUDED.nodecluster ," + "metadatasynced = EXCLUDED.metadatasynced, " + "shouldhaveshards = EXCLUDED.shouldhaveshards"; + appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); + return nodeInsertIdempotentCommand->data; +} + + /* * MarkObjectsDistributedCreateCommand generates a command that can be executed to * insert or update the provided objects into pg_dist_object on a worker node. @@ -3904,12 +3949,18 @@ ColocationGroupDeleteCommand(uint32 colocationId) void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList) { + /* sync is disabled, then no nodes to sync */ + if (!EnableMetadataSync) + { + return; + } + List *activatedWorkerNodeList = NIL; WorkerNode *node = NULL; foreach_ptr(node, nodeList) { - if (EnableMetadataSync && NodeIsPrimary(node)) + if (NodeIsPrimary(node)) { /* warn if we have coordinator in nodelist */ if (NodeIsCoordinator(node)) @@ -3963,10 +4014,14 @@ EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context) * and a MemoryContext to be used throughout the metadata sync. * * If we collect commands, connections will not be established as caller's intent - * is to collcet sync commands. + * is to collect sync commands. + * + * If the nodes are newly added before activation, we would not try to unset + * metadatasynced in separate transaction during nontransactional metadatasync. */ MetadataSyncContext * -CreateMetadataSyncContext(List *nodeList, bool collectCommands) +CreateMetadataSyncContext(List *nodeList, bool collectCommands, + bool nodesAddedInSameTransaction) { /* should be alive during local transaction during the sync */ MemoryContext context = AllocSetContextCreate(TopTransactionContext, @@ -3980,6 +4035,7 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands) metadataSyncContext->transactionMode = MetadataSyncTransMode; metadataSyncContext->collectCommands = collectCommands; metadataSyncContext->collectedCommands = NIL; + metadataSyncContext->nodesAddedInSameTransaction = nodesAddedInSameTransaction; /* filter the nodes that needs to be activated from given node list */ SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList); @@ -4010,6 +4066,7 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands) void DestroyMetadataSyncContext(MetadataSyncContext *context) { + /* todo: make sure context is always cleanup by using resource release callback?? */ /* close connections */ MultiConnection *connection = NULL; foreach_ptr(connection, context->activatedWorkerBareConnections) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index da3e02853..f113a7a13 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -91,9 +91,11 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode); static bool PlacementHasActivePlacementOnAnotherGroup(GroupShardPlacement *sourcePlacement); -static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata - *nodeMetadata, bool *nodeAlreadyExists); -static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); +static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, + bool *nodeAlreadyExists, bool localOnly); +static int AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort, + NodeMetadata *nodeMetadata, + bool *nodeAlreadyExists); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static HeapTuple GetNodeByNodeId(int32 nodeId); static int32 GetNextGroupId(void); @@ -104,7 +106,6 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada static void DeleteNodeRow(char *nodename, int32 nodeport); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); @@ -124,11 +125,11 @@ static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void SendDeletionCommandsForReplicatedTablePlacements( MetadataSyncContext *context); static void SyncNodeMetadata(MetadataSyncContext *context); -static void SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, - WorkerNode *workerNode, - int columnIndex, Datum value); -static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t - parentSessionPid); +static void SetNodeStateViaMetadataContext(MetadataSyncContext *context, + WorkerNode *workerNode, + Datum value); +static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, + pid_t parentSessionPid); static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); @@ -199,16 +200,26 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) Name nodeClusterName = PG_GETARG_NAME(3); nodeMetadata.nodeCluster = NameStr(*nodeClusterName); + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) + { + EnsureTransactionalMetadataSyncMode(); + } + bool isCoordinatorInMetadata = false; WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata); if (!isCoordinatorInMetadata) { bool nodeAlreadyExists = false; + bool localOnly = false; /* add the coordinator to pg_dist_node if it was not already added */ AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, - &nodeAlreadyExists); + &nodeAlreadyExists, localOnly); /* we just checked */ Assert(!nodeAlreadyExists); @@ -257,6 +268,9 @@ citus_add_node(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); @@ -289,14 +303,27 @@ citus_add_node(PG_FUNCTION_ARGS) } /* - * We do not allow addition of secondary nodes in nontransactional sync mode - * via citus_add_node. + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. */ if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) { EnsureTransactionalMetadataSyncMode(); } + if (MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL && + IsMultiStatementTransaction()) + { + /* + * prevent inside transaction block as we use bare connections which can + * lead deadlock + */ + ereport(ERROR, (errmsg("do not add node in transaction block " + "when the sync mode is nontransactional"), + errhint("add the node after SET citus.metadata_sync_mode " + "TO 'transactional'"))); + } + int nodeId = AddNodeMetadataViaMetadataContext(nodeNameString, nodePort, &nodeMetadata, &nodeAlreadyExists); @@ -342,8 +369,18 @@ citus_add_inactive_node(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node"))); } + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) + { + EnsureTransactionalMetadataSyncMode(); + } + + bool localOnly = false; int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, - &nodeAlreadyExists); + &nodeAlreadyExists, localOnly); TransactionModifiedNodeMetadata = true; PG_RETURN_INT32(nodeId); @@ -386,8 +423,15 @@ citus_add_secondary_node(PG_FUNCTION_ARGS) nodeMetadata.nodeRole = SecondaryNodeRoleId(); nodeMetadata.isActive = true; + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + EnsureTransactionalMetadataSyncMode(); + + bool localOnly = false; int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, - &nodeAlreadyExists); + &nodeAlreadyExists, localOnly); TransactionModifiedNodeMetadata = true; PG_RETURN_INT32(nodeId); @@ -465,6 +509,15 @@ citus_disable_node(PG_FUNCTION_ARGS) ErrorIfCoordinatorMetadataSetFalse(workerNode, BoolGetDatum(isActive), "isactive"); + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (NodeIsSecondary(workerNode)) + { + EnsureTransactionalMetadataSyncMode(); + } + WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode(); bool disablingFirstNode = (firstWorkerNode && firstWorkerNode->nodeId == workerNode->nodeId); @@ -623,6 +676,15 @@ citus_set_node_property(PG_FUNCTION_ARGS) WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText), nodePort); + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (NodeIsSecondary(workerNode)) + { + EnsureTransactionalMetadataSyncMode(); + } + if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0) { SetShouldHaveShards(workerNode, value); @@ -654,7 +716,7 @@ master_set_node_property(PG_FUNCTION_ARGS) * ModifiableWorkerNode gets the requested WorkerNode and also gets locks * required for modifying it. This fails if the node does not exist. */ -static WorkerNode * +WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort) { CheckCitusVersion(ERROR); @@ -683,11 +745,36 @@ citus_activate_node(PG_FUNCTION_ARGS) text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - int32 nodeId = ActivateNode(text_to_cstring(nodeNameText), nodePort); + char *nodeNameString = text_to_cstring(nodeNameText); + WorkerNode *workerNode = ModifiableWorkerNode(nodeNameString, nodePort); + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (NodeIsSecondary(workerNode)) + { + EnsureTransactionalMetadataSyncMode(); + } + + /* + * Create MetadataSyncContext which is used throughout nodes' activation. + * It contains activated nodes, bare connections if the mode is nontransactional, + * and a memory context for allocation. + */ + bool collectCommands = false; + bool nodesAddedInSameTransaction = false; + MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), + collectCommands, + nodesAddedInSameTransaction); + + ActivateNodeList(context); TransactionModifiedNodeMetadata = true; - PG_RETURN_INT32(nodeId); + /* cleanup metadata memory context and connections */ + DestroyMetadataSyncContext(context); + + PG_RETURN_INT32(workerNode->nodeId); } @@ -844,8 +931,8 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) * connection to localhost by calling the udf `citus_internal_mark_node_not_synced`. */ static void -MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t - parentSessionPid) +MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, + pid_t parentSessionPid) { Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL); Assert(!MetadataSyncCollectsCommands(context)); @@ -867,6 +954,22 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t * In those cases, proper metadata sync for the workers should be done.) */ + /* + * Because we try to unset metadatasynced flag with a separate transaction, + * we could not find the new node if the node is added in the current local + * transaction. But, hopefully, we do not need to unset metadatasynced for + * the new node as local transaction would rollback in case of a failure. + */ + if (context->nodesAddedInSameTransaction) + { + return; + } + + if (context->activatedWorkerNodeList == NIL) + { + return; + } + int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); @@ -880,7 +983,7 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t * connection to localhost. To achieve this, we check if the caller session's * pid holds the Exclusive lock on pg_dist_node. After ensuring that (we are * called from parent session which holds the Exclusive lock), we can safely - * update node metadata by acquiring lower level of lock. + * update node metadata by acquiring the relaxed lock. */ StringInfo metadatasyncCommand = makeStringInfo(); appendStringInfo(metadatasyncCommand, CITUS_INTERNAL_MARK_NODE_NOT_SYNCED, @@ -903,6 +1006,8 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) /* do not execute local transaction if we collect commands */ if (!MetadataSyncCollectsCommands(context)) { + List *updatedActivatedNodeList = NIL; + WorkerNode *node = NULL; foreach_ptr(node, context->activatedWorkerNodeList) { @@ -912,22 +1017,20 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) BoolGetDatum(true)); node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true)); + + updatedActivatedNodeList = lappend(updatedActivatedNodeList, node); } + + /* reset activated nodes inside metadataSyncContext afer local update */ + SetMetadataSyncNodesFromNodeList(context, updatedActivatedNodeList); } - if (!localOnly) + if (!localOnly && EnableMetadataSync) { WorkerNode *node = NULL; foreach_ptr(node, context->activatedWorkerNodeList) { - SetWorkerColumnViaMetadataContext(context, node, Anum_pg_dist_node_isactive, - BoolGetDatum(true)); - SetWorkerColumnViaMetadataContext(context, node, - Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - SetWorkerColumnViaMetadataContext(context, node, - Anum_pg_dist_node_hasmetadata, - BoolGetDatum(true)); + SetNodeStateViaMetadataContext(context, node, BoolGetDatum(true)); } } } @@ -935,7 +1038,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) /* * ActivateNodeList does some sanity checks and acquire Exclusive lock on pg_dist_node, - * and then iterates over the nodeList and activates the nodes. + * and then activates the nodes inside given metadataSyncContext. * * The function operates in 3 different modes according to transactionMode inside * metadataSyncContext. @@ -950,16 +1053,6 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) void ActivateNodeList(MetadataSyncContext *context) { - if (context->activatedWorkerNodeList == NIL) - { - /* - * In case user calls with only coordinator in nodelist, we can hit here. Just bail - * out as we already warned the user, at `SetMetadataSyncNodesFromNodeList`, that - * coordinator already has metadata. - */ - return; - } - if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL && IsMultiStatementTransaction()) { @@ -1017,7 +1110,6 @@ ActivateNodeList(MetadataSyncContext *context) * Delete existing reference and replicated table placements on the * given groupId if the group has been disabled earlier (e.g., isActive * set to false). - * todo: use metada context connections */ SendDeletionCommandsForReplicatedTablePlacements(context); @@ -1070,44 +1162,6 @@ ActivateNodeList(MetadataSyncContext *context) } -/* - * ActivateNode activates the node with nodeName and nodePort. Currently, activation - * includes only replicating the reference tables and setting isactive column of the - * given node. - */ -int -ActivateNode(char *nodeName, int nodePort) -{ - bool isActive = true; - - /* - * We take exclusive lock on pg_dist_node inside ActivateNodeList. We - * also check the node still exists after acquiring the lock. - */ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - - /* - * Create MetadataSyncContext which will be used throughout nodes' activation. - * It contains metadata sync nodes, their connections and also a MemoryContext - * for allocations. - */ - bool collectCommands = false; - MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), - collectCommands); - - ActivateNodeList(context); - - /* cleanup metadata memory context and connections */ - DestroyMetadataSyncContext(context); - - /* finally, let all other active metadata nodes to learn about this change */ - WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); - Assert(newWorkerNode->nodeId == workerNode->nodeId); - - return newWorkerNode->nodeId; -} - - /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who @@ -1165,6 +1219,14 @@ citus_update_node(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (NodeIsSecondary(workerNode)) + { + EnsureTransactionalMetadataSyncMode(); + } /* * If the node is a primary node we block reads and writes. @@ -1525,10 +1587,10 @@ EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid) } char *checkIfParentLockCommand = "SELECT pid FROM pg_locks WHERE " - "database = %d AND relation = %d AND " + "pid = %d AND database = %d AND relation = %d AND " "mode = 'ExclusiveLock' AND granted = TRUE"; appendStringInfo(checkIfParentLockCommandStr, checkIfParentLockCommand, - MyDatabaseId, DistNodeRelationId()); + parentSessionPid, MyDatabaseId, DistNodeRelationId()); bool readOnly = true; int spiQueryResult = SPI_execute(checkIfParentLockCommandStr->data, readOnly, 0); @@ -1551,9 +1613,9 @@ EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid) /* - * citus_internal_mark_node_not_synced unsets metadatasynced flag in separate - * connection to localhost. Should only be called by - * `MarkNodesNotSyncedInLoopBackConnection`. See it for details. + * citus_internal_mark_node_not_synced unsets metadatasynced flag in separate connection + * to localhost. Should only be called by `MarkNodesNotSyncedInLoopBackConnection`. + * See it for details. */ Datum citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) @@ -1565,20 +1627,17 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) pid_t parentSessionPid = PG_GETARG_INT32(0); + /* fetch node by id */ + int nodeId = PG_GETARG_INT32(1); + HeapTuple heapTuple = GetNodeByNodeId(nodeId); + /* ensure that parent session holds Exclusive lock to pg_dist_node */ EnsureParentSessionHasExclusiveLockOnPgDistNode(parentSessionPid); /* - * We made sure parent session holds the ExclusiveLock, so we can update - * pg_dist_node safely with low level lock here. + * We made sure parent session holds the ExclusiveLock, so we can unset + * metadatasynced for the node safely with the relaxed lock here. */ - int nodeId = PG_GETARG_INT32(1); - HeapTuple heapTuple = GetNodeByNodeId(nodeId); - if (heapTuple == NULL) - { - ereport(ERROR, (errmsg("could not find valid entry for node id %d", nodeId))); - } - Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); @@ -1806,6 +1865,16 @@ static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); + + /* + * We do not allow metadata operations on secondary nodes in nontransactional + * sync mode. + */ + if (NodeIsSecondary(workerNode)) + { + EnsureTransactionalMetadataSyncMode(); + } + if (NodeIsPrimary(workerNode)) { ErrorIfNodeContainsNonRemovablePlacements(workerNode); @@ -1934,12 +2003,11 @@ CountPrimariesWithMetadata(void) * If not, the following procedure is followed while adding a node: If the groupId is not * explicitly given by the user, the function picks the group that the new node should * be in with respect to GroupSize. Then, the new node is inserted into the local - * pg_dist_node as well as the nodes with hasmetadata=true. + * pg_dist_node as well as the nodes with hasmetadata=true if localOnly is false. */ static int -AddNodeMetadata(char *nodeName, int32 nodePort, - NodeMetadata *nodeMetadata, - bool *nodeAlreadyExists) +AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, + bool *nodeAlreadyExists, bool localOnly) { EnsureCoordinator(); @@ -2068,7 +2136,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - if (EnableMetadataSync) + if (EnableMetadataSync && !localOnly) { /* send the delete command to all primary nodes with metadata */ char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); @@ -2089,6 +2157,96 @@ AddNodeMetadata(char *nodeName, int32 nodePort, } +/* + * AddNodeMetadataViaMetadataContext does the same thing as AddNodeMetadata but + * make use of metadata sync context to send commands to workers to support both + * transactional and nontransactional sync modes. + */ +static int +AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort, + NodeMetadata *nodeMetadata, bool *nodeAlreadyExists) +{ + bool localOnly = true; + int nodeId = AddNodeMetadata(nodeName, nodePort, nodeMetadata, nodeAlreadyExists, + localOnly); + + /* do nothing as the node already exists */ + if (*nodeAlreadyExists) + { + return nodeId; + } + + /* + * Create metadata sync context that is used throughout node addition + * and activation if necessary. + */ + WorkerNode *node = ModifiableWorkerNode(nodeName, nodePort); + + /* we should always set active flag to true if we call citus_add_node */ + node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, DatumGetBool(true)); + + /* + * After adding new node, if the node did not already exist, we will activate + * the node. + * If the worker is not marked as a coordinator, check that + * the node is not trying to add itself + */ + if (node != NULL && + node->groupId != COORDINATOR_GROUP_ID && + node->nodeRole != SecondaryNodeRoleId() && + IsWorkerTheCurrentNode(node)) + { + ereport(ERROR, (errmsg("Node cannot add itself as a worker."), + errhint( + "Add the node as a coordinator by using: " + "SELECT citus_set_coordinator_host('%s', %d);", + node->workerName, node->workerPort))); + } + + List *nodeList = list_make1(node); + bool collectCommands = false; + bool nodesAddedInSameTransaction = true; + MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, collectCommands, + nodesAddedInSameTransaction); + + if (EnableMetadataSync) + { + /* send the delete command to all primary nodes with metadata */ + char *nodeDeleteCommand = NodeDeleteCommand(node->nodeId); + SendOrCollectCommandListToMetadataNodes(context, list_make1(nodeDeleteCommand)); + + /* finally prepare the insert command and send it to all primary nodes */ + uint32 primariesWithMetadata = CountPrimariesWithMetadata(); + if (primariesWithMetadata != 0) + { + char *nodeInsertCommand = NULL; + if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) + { + nodeInsertCommand = NodeListInsertCommand(nodeList); + } + else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + /* + * We need to ensure node insertion is idempotent in nontransactional + * sync mode. + */ + nodeInsertCommand = NodeListIdempotentInsertCommand(nodeList); + } + Assert(nodeInsertCommand != NULL); + SendOrCollectCommandListToMetadataNodes(context, + list_make1(nodeInsertCommand)); + } + } + + ActivateNodeList(context); + + /* cleanup metadata memory context and connections */ + DestroyMetadataSyncContext(context); + + return nodeId; +} + + /* * SetWorkerColumn function sets the column with the specified index * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly. @@ -2114,17 +2272,26 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) /* - * SetWorkerColumnViaMetadataContext does the same as SetWorkerColumn but using metadata - * sync context. + * SetNodeStateViaMetadataContext sets or unsets isactive, metadatasynced, and hasmetadata + * flags via metadataSyncContext. */ static void -SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode, - int columnIndex, Datum value) +SetNodeStateViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode, + Datum value) { - char *metadataSyncCommand = - GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value); + char *isActiveCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_isactive, + value); + char *metadatasyncedCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, + Anum_pg_dist_node_metadatasynced, value); + char *hasmetadataCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_hasmetadata, + value); + List *commandList = list_make3(isActiveCommand, metadatasyncedCommand, + hasmetadataCommand); - SendOrCollectCommandListToMetadataNodes(context, list_make1(metadataSyncCommand)); + SendOrCollectCommandListToMetadataNodes(context, commandList); } @@ -2340,20 +2507,6 @@ SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards) } -/* - * SetNodeState function sets the isactive column of the specified worker in - * pg_dist_node to isActive. Also propagates this to other metadata nodes. - * It returns the new worker node after the modification. - */ -static WorkerNode * -SetNodeState(char *nodeName, int nodePort, bool isActive) -{ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum( - isActive)); -} - - /* * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * node is not found this function returns NULL. @@ -2413,6 +2566,10 @@ GetNodeByNodeId(int32 nodeId) { nodeTuple = heap_copytuple(heapTuple); } + else + { + ereport(ERROR, (errmsg("could not find valid entry for node id %d", nodeId))); + } systable_endscan(scanDescriptor); table_close(pgDistNode, NoLock); @@ -2561,9 +2718,11 @@ InsertPlaceholderCoordinatorRecord(void) nodeMetadata.nodeCluster = "default"; bool nodeAlreadyExists = false; + bool localOnly = false; /* as long as there is a single node, localhost should be ok */ - AddNodeMetadata(LocalHostName, PostPortNumber, &nodeMetadata, &nodeAlreadyExists); + AddNodeMetadata(LocalHostName, PostPortNumber, &nodeMetadata, &nodeAlreadyExists, + localOnly); } @@ -2880,8 +3039,9 @@ ErrorIfAnyNodeNotExist(List *nodeList) static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context) { + int activatedPrimaryCount = list_length(context->activatedWorkerNodeList); int nodeIdx = 0; - for (nodeIdx = 0; nodeIdx < list_length(context->activatedWorkerNodeList); nodeIdx++) + for (nodeIdx = 0; nodeIdx < activatedPrimaryCount; nodeIdx++) { WorkerNode *node = list_nth(context->activatedWorkerNodeList, nodeIdx); List *commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId)); @@ -2926,6 +3086,11 @@ SyncNodeMetadata(MetadataSyncContext *context) { CheckCitusVersion(ERROR); + if (!EnableMetadataSync) + { + return; + } + /* * Do not fail when we call this method from activate_node_snapshot * from workers. @@ -2950,5 +3115,9 @@ SyncNodeMetadata(MetadataSyncContext *context) recreateNodeSnapshotCommandList = list_concat(recreateNodeSnapshotCommandList, createMetadataCommandList); - SendOrCollectCommandListToMetadataNodes(context, recreateNodeSnapshotCommandList); + /* + * We should have already added node metadata to metadata workers. Sync node + * metadata just for activated workers. + */ + SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); } diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 4889ed3b6..a2174f7c4 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -50,13 +50,15 @@ activate_node_snapshot(PG_FUNCTION_ARGS) WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode(); /* - * Create MetadataSyncContext which will be used throughout nodes' activation. - * As we set collectCommands to true, it will not create connections to workers. - * Instead it will collect and return sync commands to be sent to workers. + * Create MetadataSyncContext which is used throughout nodes' activation. + * As we set collectCommands to true, it would not create connections to workers. + * Instead it would collect and return sync commands to be sent to workers. */ bool collectCommands = true; + bool nodesAddedInSameTransaction = false; MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(dummyWorkerNode), - collectCommands); + collectCommands, + nodesAddedInSameTransaction); ActivateNodeList(context); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ffaab9e5a..b4a497647 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -438,6 +438,8 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList return; } + ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); + UseCoordinatedTransaction(); List *connectionList = NIL; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 90253b13e..1795152b2 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -36,11 +36,12 @@ extern int MetadataSyncTransMode; typedef struct MetadataSyncContext { List *activatedWorkerNodeList; /* activated worker nodes */ - List *activatedWorkerBareConnections; /* bare connections to activated worker nodes */ + List *activatedWorkerBareConnections; /* bare connections to activated nodes */ MemoryContext context; /* memory context for all allocations */ MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */ - bool collectCommands; /* flag to collect commands instead of sending and resetting */ + bool collectCommands; /* if we collect commands instead of sending and resetting */ List *collectedCommands; /* collected commands. (NIL if collectCommands == false) */ + bool nodesAddedInSameTransaction; /* if the nodes are added just before activation */ } MetadataSyncContext; typedef enum @@ -96,6 +97,7 @@ extern char * DistributionDeleteCommand(const char *schemaName, extern char * DistributionDeleteMetadataCommand(Oid relationId); extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); +char * NodeListIdempotentInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); @@ -136,7 +138,9 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, Oid distributionColumnCollation); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); -extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool testMode); +extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, + bool collectCommands, + bool nodesAddedInSameTransaction); extern void DestroyMetadataSyncContext(MetadataSyncContext *context); extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context); extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, @@ -151,7 +155,6 @@ extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, int nodeIdx); extern void ActivateNodeList(MetadataSyncContext *context); -extern int ActivateNode(char *nodeName, int nodePort); extern char * WorkerDropAllShellTablesCommand(bool singleTransaction); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index bfc18c919..5ad7f4962 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -84,6 +84,7 @@ extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk); +extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern void EnsureCoordinatorIsInMetadata(void); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 4b93b62eb..0bbbc6899 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -2,19 +2,22 @@ SET citus.next_shard_id TO 1220000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1; -- Tests functions related to cluster membership --- add the nodes to the cluster +-- add the first node to the cluster in transactional mode SELECT 1 FROM master_add_node('localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 (1 row) +-- add the second node in nontransactional mode +SET citus.metadata_sync_mode TO 'nontransactional'; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) +RESET citus.metadata_sync_mode; -- I am coordinator SELECT citus_is_coordinator(); citus_is_coordinator @@ -374,7 +377,7 @@ SELECT master_get_active_worker_nodes(); SELECT * FROM master_add_node('localhost', :worker_2_port); master_add_node --------------------------------------------------------------------- - 7 + 6 (1 row) ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART WITH 7; @@ -445,7 +448,7 @@ SELECT run_command_on_workers('UPDATE pg_dist_placement SET shardstate=1 WHERE g -- when there is no primary we should get a pretty error UPDATE pg_dist_node SET noderole = 'secondary' WHERE nodeport=:worker_2_port; SELECT * FROM cluster_management_test; -ERROR: node group 6 does not have a primary node +ERROR: node group 5 does not have a primary node -- when there is no node at all in the group we should get a different error DELETE FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT run_command_on_workers('DELETE FROM pg_dist_node WHERE nodeport=' || :'worker_2_port'); @@ -455,13 +458,12 @@ SELECT run_command_on_workers('DELETE FROM pg_dist_node WHERE nodeport=' || :'wo (1 row) SELECT * FROM cluster_management_test; -ERROR: there is a shard placement in node group 6 but there are no nodes in that group +ERROR: there is a shard placement in node group 5 but there are no nodes in that group -- clean-up SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group; DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group; SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset WARNING: could not find any shard placements for shardId 1220001 -WARNING: could not find any shard placements for shardId 1220001 WARNING: could not find any shard placements for shardId 1220003 WARNING: could not find any shard placements for shardId 1220005 WARNING: could not find any shard placements for shardId 1220007 @@ -1216,6 +1218,18 @@ SELECT start_metadata_sync_to_all_nodes(); t (1 row) +-- do not allow nontransactional node addition inside transaction block +BEGIN; + SELECT citus_remove_node('localhost', :worker_1_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + + SELECT citus_add_node('localhost', :worker_1_port); +ERROR: do not add node in transaction block when the sync mode is nontransactional +HINT: add the node after SET citus.metadata_sync_mode TO 'transactional' +COMMIT; RESET citus.metadata_sync_mode; -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 6ed3b5b74..f371e11e7 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1802,6 +1802,7 @@ ALTER TABLE dist_table_1 ADD COLUMN b int; ERROR: localhost:xxxxx is a metadata node, but is out of sync HINT: If the node is up, wait until metadata gets synced to it and try again. SELECT master_add_node('localhost', :master_port, groupid => 0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ERROR: localhost:xxxxx is a metadata node, but is out of sync HINT: If the node is up, wait until metadata gets synced to it and try again. SELECT citus_disable_node_and_wait('localhost', :worker_1_port); diff --git a/src/test/regress/expected/multi_metadata_sync_0.out b/src/test/regress/expected/multi_metadata_sync_0.out index 463fabafe..5d5aa56dd 100644 --- a/src/test/regress/expected/multi_metadata_sync_0.out +++ b/src/test/regress/expected/multi_metadata_sync_0.out @@ -1802,6 +1802,7 @@ ALTER TABLE dist_table_1 ADD COLUMN b int; ERROR: localhost:xxxxx is a metadata node, but is out of sync HINT: If the node is up, wait until metadata gets synced to it and try again. SELECT master_add_node('localhost', :master_port, groupid => 0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ERROR: localhost:xxxxx is a metadata node, but is out of sync HINT: If the node is up, wait until metadata gets synced to it and try again. SELECT citus_disable_node_and_wait('localhost', :worker_1_port); diff --git a/src/test/regress/expected/upgrade_citus_finish_citus_upgrade.out b/src/test/regress/expected/upgrade_citus_finish_citus_upgrade.out index 8c46aae43..bb80d9103 100644 --- a/src/test/regress/expected/upgrade_citus_finish_citus_upgrade.out +++ b/src/test/regress/expected/upgrade_citus_finish_citus_upgrade.out @@ -21,3 +21,12 @@ FROM pg_dist_node_metadata, pg_extension WHERE extname = 'citus'; -- still, do not NOTICE the version as it changes per release SET client_min_messages TO WARNING; CALL citus_finish_citus_upgrade(); +-- we should be able to sync metadata in nontransactional way as well +SET citus.metadata_sync_mode TO 'nontransactional'; +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +RESET citus.metadata_sync_mode; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 572b49136..d0bb8b16d 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -4,9 +4,12 @@ ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1; -- Tests functions related to cluster membership --- add the nodes to the cluster +-- add the first node to the cluster in transactional mode SELECT 1 FROM master_add_node('localhost', :worker_1_port); +-- add the second node in nontransactional mode +SET citus.metadata_sync_mode TO 'nontransactional'; SELECT 1 FROM master_add_node('localhost', :worker_2_port); +RESET citus.metadata_sync_mode; -- I am coordinator SELECT citus_is_coordinator(); @@ -513,6 +516,11 @@ BEGIN; SELECT start_metadata_sync_to_all_nodes(); COMMIT; SELECT start_metadata_sync_to_all_nodes(); +-- do not allow nontransactional node addition inside transaction block +BEGIN; + SELECT citus_remove_node('localhost', :worker_1_port); + SELECT citus_add_node('localhost', :worker_1_port); +COMMIT; RESET citus.metadata_sync_mode; -- verify that at the end of this file, all primary nodes have metadata synced diff --git a/src/test/regress/sql/upgrade_citus_finish_citus_upgrade.sql b/src/test/regress/sql/upgrade_citus_finish_citus_upgrade.sql index bc2c40b0c..a326fb0a4 100644 --- a/src/test/regress/sql/upgrade_citus_finish_citus_upgrade.sql +++ b/src/test/regress/sql/upgrade_citus_finish_citus_upgrade.sql @@ -17,3 +17,8 @@ FROM pg_dist_node_metadata, pg_extension WHERE extname = 'citus'; -- still, do not NOTICE the version as it changes per release SET client_min_messages TO WARNING; CALL citus_finish_citus_upgrade(); + +-- we should be able to sync metadata in nontransactional way as well +SET citus.metadata_sync_mode TO 'nontransactional'; +SELECT start_metadata_sync_to_all_nodes(); +RESET citus.metadata_sync_mode;