From bc25ba51c33a3a56bc4f425b4ba0c27860a77916 Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Tue, 21 Mar 2023 00:48:38 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=205?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let `ActivateNode` use new metadata sync api. --- .../distributed/metadata/metadata_sync.c | 15 +- .../distributed/metadata/node_metadata.c | 487 +++++++++++++----- .../distributed/utils/reference_table_utils.c | 142 ++++- src/include/distributed/metadata_sync.h | 3 + .../distributed/reference_table_utils.h | 6 + src/include/distributed/worker_manager.h | 3 - .../expected/multi_cluster_management.out | 15 + .../regress/sql/multi_cluster_management.sql | 9 + 8 files changed, 530 insertions(+), 150 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 0b126f13f..aeac16b89 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -217,7 +217,20 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock); - ActivateNodeList(workerNodes); + /* + * create MetadataSyncContext which will be used throughout nodes' activation. + * It contains metadata sync nodes, their connections and also a MemoryContext + * for allocations. + */ + bool collectCommands = false; + MetadataSyncContext *context = CreateMetadataSyncContext(workerNodes, + collectCommands); + + ActivateNodeList(context); + + /* cleanup metadata memory context and connections */ + DestroyMetadataSyncContext(context); + TransactionModifiedNodeMetadata = true; PG_RETURN_BOOL(true); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 91ffca4fe..586006e16 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -122,6 +122,17 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static int FindCoordinatorNodeId(void); static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); +static void ErrorIfAnyNodeNotExist(List *nodeList); +static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); +static void SendDeletionCommandsForReplicatedTablePlacements( + MetadataSyncContext *context); +static void SyncNodeMetadata(MetadataSyncContext *context); +static void SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, + WorkerNode *workerNode, + int columnIndex, Datum value); +static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context); +static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); +static void EnsureTransactionalMetadataSyncMode(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -222,6 +233,21 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS) } +/* + * EnsureTransactionalMetadataSyncMode ensures metadata sync mode is transactional. + */ +static void +EnsureTransactionalMetadataSyncMode(void) +{ + if (MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + ereport(ERROR, (errmsg("this operation cannot be completed in nontransactional " + "metadata sync mode"), + errhint("SET citus.metadata_sync_mode to 'transactional'"))); + } +} + + /* * citus_add_node function adds a new node to the cluster and returns its id. It also * replicates all reference tables to the new node. @@ -262,38 +288,20 @@ citus_add_node(PG_FUNCTION_ARGS) nodeMetadata.shouldHaveShards = false; } - int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, - &nodeAlreadyExists); - TransactionModifiedNodeMetadata = true; - /* - * After adding new node, if the node did not already exist, we will activate - * the node. This means we will replicate all reference tables to the new - * node. + * We do not allow addition of secondary nodes in nontransactional sync mode + * via citus_add_node. */ - if (!nodeAlreadyExists) + if (nodeMetadata.nodeRole == SecondaryNodeRoleId()) { - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort); - - /* - * If the worker is not marked as a coordinator, check that - * the node is not trying to add itself - */ - if (workerNode != NULL && - workerNode->groupId != COORDINATOR_GROUP_ID && - workerNode->nodeRole != SecondaryNodeRoleId() && - IsWorkerTheCurrentNode(workerNode)) - { - ereport(ERROR, (errmsg("Node cannot add itself as a worker."), - errhint( - "Add the node as a coordinator by using: " - "SELECT citus_set_coordinator_host('%s', %d);", - nodeNameString, nodePort))); - } - - ActivateNode(nodeNameString, nodePort); + EnsureTransactionalMetadataSyncMode(); } + int nodeId = AddNodeMetadataViaMetadataContext(nodeNameString, nodePort, + &nodeMetadata, + &nodeAlreadyExists); + TransactionModifiedNodeMetadata = true; + PG_RETURN_INT32(nodeId); } @@ -972,13 +980,11 @@ citus_activate_node(PG_FUNCTION_ARGS) text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText), - nodePort); - ActivateNode(workerNode->workerName, workerNode->workerPort); + int32 nodeId = ActivateNode(text_to_cstring(nodeNameText), nodePort); TransactionModifiedNodeMetadata = true; - PG_RETURN_INT32(workerNode->nodeId); + PG_RETURN_INT32(nodeId); } @@ -1131,14 +1137,130 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) /* - * ActivateNodeList iterates over the nodeList and activates the nodes. - * Some part of the node activation is done parallel across the nodes, - * such as syncing the metadata. However, reference table replication is - * done one by one across nodes. + * MarkNodesNotSyncedInLoopBackConnection unsets metadatasynced flag in separate + * connection to localhost. + */ +static void +MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context) +{ + Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL); + Assert(!MetadataSyncCollectsCommands(context)); + + /* + * Set metadatasynced to false for all activated nodes to mark the nodes as not synced + * in case nontransactional metadata sync fails before we activate the nodes inside + * metadataSyncContext. + * We set metadatasynced to false at coordinator to mark the nodes as not synced. But we + * do not set isactive and hasmetadata flags to false as we still want to route queries + * to the nodes if their isactive flag is true and propagate DDL to the nodes if possible. + * + * NOTES: + * 1) We use separate connection to localhost as we would rollback the local + * transaction in case of failure. + * 2) Operator should handle problems at workers if any. Wworkers probably fail + * due to improper metadata when a query hits. Or DDL might fail due to desynced + * nodes. (when hasmetadata = true, metadatasynced = false) + * In those cases, proper metadata sync for the workers should be done.) + */ + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + + bool metadatasynced = false; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, context->activatedWorkerNodeList) + { + char *metadatasyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId, + metadatasynced); + List *commandList = list_make1(metadatasyncCommand); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList); + } + + CloseConnection(connection); +} + + +/* + * SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags locally + * and, if required, remotely. + */ +static void +SetNodeMetadata(MetadataSyncContext *context, bool localOnly) +{ + /* do not execute local transaction if we collect commands */ + if (!MetadataSyncCollectsCommands(context)) + { + WorkerNode *node = NULL; + foreach_ptr(node, context->activatedWorkerNodeList) + { + node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); + } + } + + if (!localOnly) + { + WorkerNode *node = NULL; + foreach_ptr(node, context->activatedWorkerNodeList) + { + SetWorkerColumnViaMetadataContext(context, node, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + SetWorkerColumnViaMetadataContext(context, node, + Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + SetWorkerColumnViaMetadataContext(context, node, + Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); + } + } +} + + +/* + * ActivateNodeList does some sanity checks and acquire Exclusive lock on pg_dist_node, + * and then iterates over the nodeList and activates the nodes. + * + * The function operates in 3 different modes according to transactionMode inside + * metadataSyncContext. + * + * 1. MetadataSyncCollectsCommands(context): + * Only collect commands instead of sending them to workers, + * 2. context.transactionMode == METADATA_SYNC_TRANSACTIONAL: + * Send all commands using coordinated transaction, + * 3. context.transactionMode == METADATA_SYNC_NON_TRANSACTIONAL: + * Send all commands using bare (no transaction block) connections. */ void -ActivateNodeList(List *nodeList) +ActivateNodeList(MetadataSyncContext *context) { + if (context->activatedWorkerNodeList == NIL) + { + /* + * In case user calls with only coordinator in nodelist, we can hit here. Just bail + * out as we already warned the user, at `SetMetadataSyncNodesFromNodeList`, that + * coordinator already has metadata. + */ + return; + } + + if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL && + IsMultiStatementTransaction()) + { + /* + * prevent inside transaction block as we use bare connections which can + * lead deadlock + */ + ereport(ERROR, (errmsg("do not sync metadata in transaction block " + "when the sync mode is nontransactional"), + errhint("resync after SET citus.metadata_sync_mode " + "TO 'transactional'"))); + } + /* * We currently require the object propagation to happen via superuser, * see #5139. While activating a node, we sync both metadata and object @@ -1152,101 +1274,88 @@ ActivateNodeList(List *nodeList) */ EnsureSuperUser(); - /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ + /* + * we need to unset metadatasynced flag to false at coordinator in separate + * transaction only at nontransactional sync mode and if we do not collect + * commands. + * + * We make sure we set the flag to false at the start of nontransactional + * metadata sync to mark those nodes are not synced in case of a failure in + * the middle of the sync. + */ + if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL && + !MetadataSyncCollectsCommands(context)) + { + MarkNodesNotSyncedInLoopBackConnection(context); + } + + /* + * Take an exclusive lock on pg_dist_node to serialize pg_dist_node + * changes. We should not acquire the lock before deactivating + * metadata nodes as it causes deadlock. + */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - - List *nodeToSyncMetadata = NIL; - WorkerNode *node = NULL; - foreach_ptr(node, nodeList) - { - /* - * First, locally mark the node is active, if everything goes well, - * we are going to sync this information to all the metadata nodes. - */ - WorkerNode *workerNode = - FindWorkerNodeAnyCluster(node->workerName, node->workerPort); - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName, - node->workerPort))); - } - - /* both nodes should be the same */ - Assert(workerNode->nodeId == node->nodeId); - - /* - * Delete existing reference and replicated table placements on the - * given groupId if the group has been disabled earlier (e.g., isActive - * set to false). - * - * Sync the metadata changes to all existing metadata nodes irrespective - * of the current nodes' metadata sync state. We expect all nodes up - * and running when another node is activated. - */ - if (!workerNode->isActive && NodeIsPrimary(workerNode)) - { - bool localOnly = false; - DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - localOnly); - } - - workerNode = - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, - BoolGetDatum(true)); - - /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ - bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); - if (syncMetadata) - { - /* - * We are going to sync the metadata anyway in this transaction, so do - * not fail just because the current metadata is not synced. - */ - SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - - /* - * Update local group id first, as object dependency logic requires to have - * updated local group id. - */ - UpdateLocalGroupIdOnNode(workerNode); - - nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); - } - } + /* + * Error if there is concurrent change to node table before acquiring + * the lock + */ + ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList); /* - * Sync distributed objects first. We must sync distributed objects before - * replicating reference tables to the remote node, as reference tables may - * need such objects. + * Delete existing reference and replicated table placements on the + * given groupId if the group has been disabled earlier (e.g., isActive + * set to false). + * todo: use metada context connections */ - SyncDistributedObjectsToNodeList(nodeToSyncMetadata); + SendDeletionCommandsForReplicatedTablePlacements(context); /* - * Sync node metadata. We must sync node metadata before syncing table - * related pg_dist_xxx metadata. Since table related metadata requires - * to have right pg_dist_node entries. + * SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags + * locally for following reasons: + * + * 1) Set isactive to true locally so that we can find activated nodes amongst + * active workers, + * 2) Do not fail just because the current metadata is not synced. (see + * ErrorIfAnyMetadataNodeOutOfSync), + * 3) To propagate activated nodes nodemetadata correctly. + * + * We are going to sync the metadata anyway in this transaction, set + * isactive, metadatasynced, and hasmetadata to true locally. + * The changes would rollback in case of failure. */ - foreach_ptr(node, nodeToSyncMetadata) - { - SyncNodeMetadataToNode(node->workerName, node->workerPort); - } + bool localOnly = true; + SetNodeMetadata(context, localOnly); /* - * As the last step, sync the table related metadata to the remote node. - * We must handle it as the last step because of limitations shared with - * above comments. + * Update local group ids so that upcoming transactions can see its effect. + * Object dependency logic requires to have updated local group id. */ - SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); + UpdateLocalGroupIdsViaMetadataContext(context); - foreach_ptr(node, nodeList) - { - bool isActive = true; + /* + * Sync node metadata so that placement insertion does not fail due to + * EnsureShardPlacementMetadataIsSane. + */ + SyncNodeMetadata(context); - /* finally, let all other active metadata nodes to learn about this change */ - SetNodeState(node->workerName, node->workerPort, isActive); - } + /* + * Sync all dependencies and distributed objects with their pg_dist_xx tables to + * metadata nodes inside metadataSyncContext. Depends on node metadata. + */ + SyncDistributedObjects(context); + + /* + * Let all nodes to be active and synced after all operations succeeded. + * we make sure that the metadata sync is idempotent and safe overall with multiple + * other transactions, if nontransactional mode is used. + * + * We already took Exclusive lock on node metadata, which prevents modification + * on node metadata on coordinator. The step will rollback, in case of a failure, + * to the state where metadatasynced=false. + */ + localOnly = false; + SetNodeMetadata(context, localOnly); } @@ -1260,8 +1369,25 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; - WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); - ActivateNodeList(list_make1(workerNode)); + /* + * We take exclusive lock on pg_dist_node inside ActivateNodeList. We + * also check the node still exists after acquiring the lock. + */ + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + + /* + * Create MetadataSyncContext which will be used throughout nodes' activation. + * It contains metadata sync nodes, their connections and also a MemoryContext + * for allocations. + */ + bool collectCommands = false; + MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), + collectCommands); + + ActivateNodeList(context); + + /* cleanup metadata memory context and connections */ + DestroyMetadataSyncContext(context); /* finally, let all other active metadata nodes to learn about this change */ WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); @@ -2181,6 +2307,21 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) } +/* + * SetWorkerColumnViaMetadataContext does the same as SetWorkerColumn but using metadata + * sync context. + */ +static void +SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode, + int columnIndex, Datum value) +{ + char *metadataSyncCommand = + GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value); + + SendOrCollectCommandListToMetadataNodes(context, list_make1(metadataSyncCommand)); +} + + /* * SetWorkerColumnOptional function sets the column with the specified index * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly. @@ -2868,3 +3009,109 @@ UnsetMetadataSyncedForAllWorkers(void) return updatedAtLeastOne; } + + +/* + * ErrorIfAnyNodeNotExist errors if any node in given list not found. + */ +static void +ErrorIfAnyNodeNotExist(List *nodeList) +{ + WorkerNode *node = NULL; + foreach_ptr(node, nodeList) + { + /* + * First, locally mark the node is active, if everything goes well, + * we are going to sync this information to all the metadata nodes. + */ + WorkerNode *workerNode = + FindWorkerNodeAnyCluster(node->workerName, node->workerPort); + if (workerNode == NULL) + { + ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName, + node->workerPort))); + } + } +} + + +/* + * UpdateLocalGroupIdsViaMetadataContext updates local group ids for given list + * of nodes with transactional or nontransactional mode according to transactionMode + * inside metadataSyncContext. + */ +static void +UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context) +{ + int nodeIdx = 0; + for (nodeIdx = 0; nodeIdx < list_length(context->activatedWorkerNodeList); nodeIdx++) + { + WorkerNode *node = list_nth(context->activatedWorkerNodeList, nodeIdx); + List *commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId)); + + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + + SendOrCollectCommandListToSingleNode(context, commandList, nodeIdx); + } +} + + +/* + * SendDeletionCommandsForReplicatedTablePlacements sends commands to delete replicated + * placement for the metadata nodes with transactional or nontransactional mode according + * to transactionMode inside metadataSyncContext. + */ +static void +SendDeletionCommandsForReplicatedTablePlacements(MetadataSyncContext *context) +{ + WorkerNode *node = NULL; + foreach_ptr(node, context->activatedWorkerNodeList) + { + if (!node->isActive) + { + bool localOnly = false; + int32 groupId = node->groupId; + DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(context, + groupId, + localOnly); + } + } +} + + +/* + * SyncNodeMetadata syncs node metadata with transactional or nontransactional + * mode according to transactionMode inside metadataSyncContext. + */ +static void +SyncNodeMetadata(MetadataSyncContext *context) +{ + CheckCitusVersion(ERROR); + + /* + * Do not fail when we call this method from activate_node_snapshot + * from workers. + */ + if (!MetadataSyncCollectsCommands(context)) + { + EnsureCoordinator(); + } + + EnsureModificationsCanRun(); + EnsureSequentialModeMetadataOperations(); + + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + + /* generate the queries which drop the node metadata */ + List *dropMetadataCommandList = NodeMetadataDropCommands(); + + /* generate the queries which create the node metadata from scratch */ + List *createMetadataCommandList = NodeMetadataCreateCommands(); + + List *recreateNodeSnapshotCommandList = dropMetadataCommandList; + recreateNodeSnapshotCommandList = list_concat(recreateNodeSnapshotCommandList, + createMetadataCommandList); + + SendOrCollectCommandListToMetadataNodes(context, recreateNodeSnapshotCommandList); +} diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 0b085c67a..687ce02a7 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -503,12 +503,11 @@ GetReferenceTableColocationId() /* - * DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over - * list of reference and replicated hash distributed tables and deletes - * all placements from pg_dist_placement table for given group. + * GetAllReplicatedTableList returns all tables which has replicated placements. + * i.e. (all reference tables) + (distributed tables with more than 1 placements) */ -void -DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) +List * +GetAllReplicatedTableList(void) { List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *replicatedMetadataSyncedDistributedTableList = @@ -517,13 +516,25 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) List *replicatedTableList = list_concat(referenceTableList, replicatedMetadataSyncedDistributedTableList); - /* if there are no reference tables, we do not need to do anything */ + return replicatedTableList; +} + + +/* + * ReplicatedPlacementsForNodeGroup filters all replicated placements for given + * node group id. + */ +List * +ReplicatedPlacementsForNodeGroup(int32 groupId) +{ + List *replicatedTableList = GetAllReplicatedTableList(); + if (list_length(replicatedTableList) == 0) { - return; + return NIL; } - StringInfo deletePlacementCommand = makeStringInfo(); + List *replicatedPlacementsForNodeGroup = NIL; Oid replicatedTableId = InvalidOid; foreach_oid(replicatedTableId, replicatedTableList) { @@ -538,25 +549,104 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) continue; } - GroupShardPlacement *placement = NULL; - foreach_ptr(placement, placements) - { - LockShardDistributionMetadata(placement->shardId, ExclusiveLock); - - DeleteShardPlacementRow(placement->placementId); - - if (!localOnly) - { - resetStringInfo(deletePlacementCommand); - appendStringInfo(deletePlacementCommand, - "DELETE FROM pg_catalog.pg_dist_placement " - "WHERE placementid = " UINT64_FORMAT, - placement->placementId); - - SendCommandToWorkersWithMetadata(deletePlacementCommand->data); - } - } + replicatedPlacementsForNodeGroup = list_concat(replicatedPlacementsForNodeGroup, + placements); } + + return replicatedPlacementsForNodeGroup; +} + + +/* + * DeleteShardPlacementCommand returns a command for deleting given placement from + * metadata. + */ +char * +DeleteShardPlacementCommand(uint64 placementId) +{ + StringInfo deletePlacementCommand = makeStringInfo(); + appendStringInfo(deletePlacementCommand, + "DELETE FROM pg_catalog.pg_dist_placement " + "WHERE placementid = " UINT64_FORMAT, placementId); + return deletePlacementCommand->data; +} + + +/* + * DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over + * list of reference and replicated hash distributed tables and deletes + * all placements from pg_dist_placement table for given group. + */ +void +DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) +{ + List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId); + + /* if there are no replicated tables for the group, we do not need to do anything */ + if (list_length(replicatedPlacementListForGroup) == 0) + { + return; + } + + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, replicatedPlacementListForGroup) + { + LockShardDistributionMetadata(placement->shardId, ExclusiveLock); + + if (!localOnly) + { + char *deletePlacementCommand = + DeleteShardPlacementCommand(placement->placementId); + + SendCommandToWorkersWithMetadata(deletePlacementCommand); + } + + DeleteShardPlacementRow(placement->placementId); + } +} + + +/* + * DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext does the same as + * DeleteAllReplicatedTablePlacementsFromNodeGroup except it uses metadataSyncContext for + * connections. + */ +void +DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( + MetadataSyncContext *context, int32 groupId, bool localOnly) +{ + List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId); + + /* if there are no replicated tables for the group, we do not need to do anything */ + if (list_length(replicatedPlacementListForGroup) == 0) + { + return; + } + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + GroupShardPlacement *placement = NULL; + foreach_ptr(placement, replicatedPlacementListForGroup) + { + LockShardDistributionMetadata(placement->shardId, ExclusiveLock); + + if (!localOnly) + { + char *deletePlacementCommand = + DeleteShardPlacementCommand(placement->placementId); + + SendOrCollectCommandListToMetadataNodes(context, + list_make1(deletePlacementCommand)); + } + + /* do not execute local transaction if we collect commands */ + if (!MetadataSyncCollectsCommands(context)) + { + DeleteShardPlacementRow(placement->placementId); + } + + ResetMetadataSyncMemoryContext(context); + } + MemoryContextSwitchTo(oldContext); } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 2c673b11a..0b32d34d8 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -153,6 +153,9 @@ extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, int nodeIdx); +extern void ActivateNodeList(MetadataSyncContext *context); +extern int ActivateNode(char *nodeName, int nodePort); + extern char * WorkerDropAllShellTablesCommand(bool singleTransaction); extern void SyncDistributedObjects(MetadataSyncContext *context); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index ce2de9d9d..cf5a6fd02 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -17,14 +17,20 @@ #include "listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); +extern List * GetAllReplicatedTableList(void); +extern List * ReplicatedPlacementsForNodeGroup(int32 groupId); +extern char * DeleteShardPlacementCommand(uint64 placementId); extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly); +extern void DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( + MetadataSyncContext *context, int32 groupId, bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode); extern void ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index bb7abf183..39fa3e612 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -62,9 +62,6 @@ extern int MaxWorkerNodesTracked; extern char *WorkerListFileName; extern char *CurrentCluster; -extern void ActivateNodeList(List *nodeList); -extern int ActivateNode(char *nodeName, int nodePort); - /* Function declarations for finding worker nodes to place shards on */ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 56235cc91..4b93b62eb 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1202,6 +1202,21 @@ SELECT start_metadata_sync_to_all_nodes(); t (1 row) +-- nontransactional sync mode tests +SET citus.metadata_sync_mode TO 'nontransactional'; +-- do not allow nontransactional sync inside transaction block +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); +ERROR: do not sync metadata in transaction block when the sync mode is nontransactional +HINT: resync after SET citus.metadata_sync_mode TO 'transactional' +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +RESET citus.metadata_sync_mode; -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; ?column? diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index f9aa81836..572b49136 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -506,5 +506,14 @@ BEGIN; COMMIT; SELECT start_metadata_sync_to_all_nodes(); +-- nontransactional sync mode tests +SET citus.metadata_sync_mode TO 'nontransactional'; +-- do not allow nontransactional sync inside transaction block +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); +RESET citus.metadata_sync_mode; + -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';