PR #6728  / commit - 5

Let `ActivateNode` use new metadata sync api.
pull/6728/head
aykutbozkurt 2023-03-21 00:48:38 +03:00
parent 29ef9117e6
commit bc25ba51c3
8 changed files with 530 additions and 150 deletions

View File

@ -217,7 +217,20 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock); List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
ActivateNodeList(workerNodes); /*
* create MetadataSyncContext which will be used throughout nodes' activation.
* It contains metadata sync nodes, their connections and also a MemoryContext
* for allocations.
*/
bool collectCommands = false;
MetadataSyncContext *context = CreateMetadataSyncContext(workerNodes,
collectCommands);
ActivateNodeList(context);
/* cleanup metadata memory context and connections */
DestroyMetadataSyncContext(context);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);

View File

@ -122,6 +122,17 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static int FindCoordinatorNodeId(void); static int FindCoordinatorNodeId(void);
static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
static void ErrorIfAnyNodeNotExist(List *nodeList);
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context);
static void SendDeletionCommandsForReplicatedTablePlacements(
MetadataSyncContext *context);
static void SyncNodeMetadata(MetadataSyncContext *context);
static void SetWorkerColumnViaMetadataContext(MetadataSyncContext *context,
WorkerNode *workerNode,
int columnIndex, Datum value);
static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host); PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
@ -222,6 +233,21 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
} }
/*
* EnsureTransactionalMetadataSyncMode ensures metadata sync mode is transactional.
*/
static void
EnsureTransactionalMetadataSyncMode(void)
{
if (MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
ereport(ERROR, (errmsg("this operation cannot be completed in nontransactional "
"metadata sync mode"),
errhint("SET citus.metadata_sync_mode to 'transactional'")));
}
}
/* /*
* citus_add_node function adds a new node to the cluster and returns its id. It also * citus_add_node function adds a new node to the cluster and returns its id. It also
* replicates all reference tables to the new node. * replicates all reference tables to the new node.
@ -262,38 +288,20 @@ citus_add_node(PG_FUNCTION_ARGS)
nodeMetadata.shouldHaveShards = false; nodeMetadata.shouldHaveShards = false;
} }
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, /*
* We do not allow addition of secondary nodes in nontransactional sync mode
* via citus_add_node.
*/
if (nodeMetadata.nodeRole == SecondaryNodeRoleId())
{
EnsureTransactionalMetadataSyncMode();
}
int nodeId = AddNodeMetadataViaMetadataContext(nodeNameString, nodePort,
&nodeMetadata,
&nodeAlreadyExists); &nodeAlreadyExists);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
/*
* After adding new node, if the node did not already exist, we will activate
* the node. This means we will replicate all reference tables to the new
* node.
*/
if (!nodeAlreadyExists)
{
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort);
/*
* If the worker is not marked as a coordinator, check that
* the node is not trying to add itself
*/
if (workerNode != NULL &&
workerNode->groupId != COORDINATOR_GROUP_ID &&
workerNode->nodeRole != SecondaryNodeRoleId() &&
IsWorkerTheCurrentNode(workerNode))
{
ereport(ERROR, (errmsg("Node cannot add itself as a worker."),
errhint(
"Add the node as a coordinator by using: "
"SELECT citus_set_coordinator_host('%s', %d);",
nodeNameString, nodePort)));
}
ActivateNode(nodeNameString, nodePort);
}
PG_RETURN_INT32(nodeId); PG_RETURN_INT32(nodeId);
} }
@ -972,13 +980,11 @@ citus_activate_node(PG_FUNCTION_ARGS)
text *nodeNameText = PG_GETARG_TEXT_P(0); text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText), int32 nodeId = ActivateNode(text_to_cstring(nodeNameText), nodePort);
nodePort);
ActivateNode(workerNode->workerName, workerNode->workerPort);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(workerNode->nodeId); PG_RETURN_INT32(nodeId);
} }
@ -1131,14 +1137,130 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
/* /*
* ActivateNodeList iterates over the nodeList and activates the nodes. * MarkNodesNotSyncedInLoopBackConnection unsets metadatasynced flag in separate
* Some part of the node activation is done parallel across the nodes, * connection to localhost.
* such as syncing the metadata. However, reference table replication is */
* done one by one across nodes. static void
MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context)
{
Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL);
Assert(!MetadataSyncCollectsCommands(context));
/*
* Set metadatasynced to false for all activated nodes to mark the nodes as not synced
* in case nontransactional metadata sync fails before we activate the nodes inside
* metadataSyncContext.
* We set metadatasynced to false at coordinator to mark the nodes as not synced. But we
* do not set isactive and hasmetadata flags to false as we still want to route queries
* to the nodes if their isactive flag is true and propagate DDL to the nodes if possible.
*
* NOTES:
* 1) We use separate connection to localhost as we would rollback the local
* transaction in case of failure.
* 2) Operator should handle problems at workers if any. Wworkers probably fail
* due to improper metadata when a query hits. Or DDL might fail due to desynced
* nodes. (when hasmetadata = true, metadatasynced = false)
* In those cases, proper metadata sync for the workers should be done.)
*/
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
bool metadatasynced = false;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, context->activatedWorkerNodeList)
{
char *metadatasyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId,
metadatasynced);
List *commandList = list_make1(metadatasyncCommand);
SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
}
CloseConnection(connection);
}
/*
* SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags locally
* and, if required, remotely.
*/
static void
SetNodeMetadata(MetadataSyncContext *context, bool localOnly)
{
/* do not execute local transaction if we collect commands */
if (!MetadataSyncCollectsCommands(context))
{
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
{
node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
node = SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
}
}
if (!localOnly)
{
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
{
SetWorkerColumnViaMetadataContext(context, node, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
SetWorkerColumnViaMetadataContext(context, node,
Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
SetWorkerColumnViaMetadataContext(context, node,
Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
}
}
}
/*
* ActivateNodeList does some sanity checks and acquire Exclusive lock on pg_dist_node,
* and then iterates over the nodeList and activates the nodes.
*
* The function operates in 3 different modes according to transactionMode inside
* metadataSyncContext.
*
* 1. MetadataSyncCollectsCommands(context):
* Only collect commands instead of sending them to workers,
* 2. context.transactionMode == METADATA_SYNC_TRANSACTIONAL:
* Send all commands using coordinated transaction,
* 3. context.transactionMode == METADATA_SYNC_NON_TRANSACTIONAL:
* Send all commands using bare (no transaction block) connections.
*/ */
void void
ActivateNodeList(List *nodeList) ActivateNodeList(MetadataSyncContext *context)
{ {
if (context->activatedWorkerNodeList == NIL)
{
/*
* In case user calls with only coordinator in nodelist, we can hit here. Just bail
* out as we already warned the user, at `SetMetadataSyncNodesFromNodeList`, that
* coordinator already has metadata.
*/
return;
}
if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
IsMultiStatementTransaction())
{
/*
* prevent inside transaction block as we use bare connections which can
* lead deadlock
*/
ereport(ERROR, (errmsg("do not sync metadata in transaction block "
"when the sync mode is nontransactional"),
errhint("resync after SET citus.metadata_sync_mode "
"TO 'transactional'")));
}
/* /*
* We currently require the object propagation to happen via superuser, * We currently require the object propagation to happen via superuser,
* see #5139. While activating a node, we sync both metadata and object * see #5139. While activating a node, we sync both metadata and object
@ -1152,101 +1274,88 @@ ActivateNodeList(List *nodeList)
*/ */
EnsureSuperUser(); EnsureSuperUser();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
List *nodeToSyncMetadata = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
/* /*
* First, locally mark the node is active, if everything goes well, * we need to unset metadatasynced flag to false at coordinator in separate
* we are going to sync this information to all the metadata nodes. * transaction only at nontransactional sync mode and if we do not collect
* commands.
*
* We make sure we set the flag to false at the start of nontransactional
* metadata sync to mark those nodes are not synced in case of a failure in
* the middle of the sync.
*/ */
WorkerNode *workerNode = if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
FindWorkerNodeAnyCluster(node->workerName, node->workerPort); !MetadataSyncCollectsCommands(context))
if (workerNode == NULL)
{ {
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName, MarkNodesNotSyncedInLoopBackConnection(context);
node->workerPort)));
} }
/* both nodes should be the same */ /*
Assert(workerNode->nodeId == node->nodeId); * Take an exclusive lock on pg_dist_node to serialize pg_dist_node
* changes. We should not acquire the lock before deactivating
* metadata nodes as it causes deadlock.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
/*
* Error if there is concurrent change to node table before acquiring
* the lock
*/
ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList);
/* /*
* Delete existing reference and replicated table placements on the * Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive * given groupId if the group has been disabled earlier (e.g., isActive
* set to false). * set to false).
* todo: use metada context connections
*/
SendDeletionCommandsForReplicatedTablePlacements(context);
/*
* SetNodeMetadata sets isactive, metadatasynced and hasmetadata flags
* locally for following reasons:
* *
* Sync the metadata changes to all existing metadata nodes irrespective * 1) Set isactive to true locally so that we can find activated nodes amongst
* of the current nodes' metadata sync state. We expect all nodes up * active workers,
* and running when another node is activated. * 2) Do not fail just because the current metadata is not synced. (see
* ErrorIfAnyMetadataNodeOutOfSync),
* 3) To propagate activated nodes nodemetadata correctly.
*
* We are going to sync the metadata anyway in this transaction, set
* isactive, metadatasynced, and hasmetadata to true locally.
* The changes would rollback in case of failure.
*/ */
if (!workerNode->isActive && NodeIsPrimary(workerNode)) bool localOnly = true;
{ SetNodeMetadata(context, localOnly);
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
localOnly);
}
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
{
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/* /*
* Update local group id first, as object dependency logic requires to have * Update local group ids so that upcoming transactions can see its effect.
* updated local group id. * Object dependency logic requires to have updated local group id.
*/ */
UpdateLocalGroupIdOnNode(workerNode); UpdateLocalGroupIdsViaMetadataContext(context);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
}
}
/* /*
* Sync distributed objects first. We must sync distributed objects before * Sync node metadata so that placement insertion does not fail due to
* replicating reference tables to the remote node, as reference tables may * EnsureShardPlacementMetadataIsSane.
* need such objects.
*/ */
SyncDistributedObjectsToNodeList(nodeToSyncMetadata); SyncNodeMetadata(context);
/* /*
* Sync node metadata. We must sync node metadata before syncing table * Sync all dependencies and distributed objects with their pg_dist_xx tables to
* related pg_dist_xxx metadata. Since table related metadata requires * metadata nodes inside metadataSyncContext. Depends on node metadata.
* to have right pg_dist_node entries.
*/ */
foreach_ptr(node, nodeToSyncMetadata) SyncDistributedObjects(context);
{
SyncNodeMetadataToNode(node->workerName, node->workerPort);
}
/* /*
* As the last step, sync the table related metadata to the remote node. * Let all nodes to be active and synced after all operations succeeded.
* We must handle it as the last step because of limitations shared with * we make sure that the metadata sync is idempotent and safe overall with multiple
* above comments. * other transactions, if nontransactional mode is used.
*
* We already took Exclusive lock on node metadata, which prevents modification
* on node metadata on coordinator. The step will rollback, in case of a failure,
* to the state where metadatasynced=false.
*/ */
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); localOnly = false;
SetNodeMetadata(context, localOnly);
foreach_ptr(node, nodeList)
{
bool isActive = true;
/* finally, let all other active metadata nodes to learn about this change */
SetNodeState(node->workerName, node->workerPort, isActive);
}
} }
@ -1260,8 +1369,25 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); /*
ActivateNodeList(list_make1(workerNode)); * We take exclusive lock on pg_dist_node inside ActivateNodeList. We
* also check the node still exists after acquiring the lock.
*/
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
/*
* Create MetadataSyncContext which will be used throughout nodes' activation.
* It contains metadata sync nodes, their connections and also a MemoryContext
* for allocations.
*/
bool collectCommands = false;
MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode),
collectCommands);
ActivateNodeList(context);
/* cleanup metadata memory context and connections */
DestroyMetadataSyncContext(context);
/* finally, let all other active metadata nodes to learn about this change */ /* finally, let all other active metadata nodes to learn about this change */
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
@ -2181,6 +2307,21 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
} }
/*
* SetWorkerColumnViaMetadataContext does the same as SetWorkerColumn but using metadata
* sync context.
*/
static void
SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode,
int columnIndex, Datum value)
{
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, columnIndex, value);
SendOrCollectCommandListToMetadataNodes(context, list_make1(metadataSyncCommand));
}
/* /*
* SetWorkerColumnOptional function sets the column with the specified index * SetWorkerColumnOptional function sets the column with the specified index
* on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly. * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly.
@ -2868,3 +3009,109 @@ UnsetMetadataSyncedForAllWorkers(void)
return updatedAtLeastOne; return updatedAtLeastOne;
} }
/*
* ErrorIfAnyNodeNotExist errors if any node in given list not found.
*/
static void
ErrorIfAnyNodeNotExist(List *nodeList)
{
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
/*
* First, locally mark the node is active, if everything goes well,
* we are going to sync this information to all the metadata nodes.
*/
WorkerNode *workerNode =
FindWorkerNodeAnyCluster(node->workerName, node->workerPort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName,
node->workerPort)));
}
}
}
/*
* UpdateLocalGroupIdsViaMetadataContext updates local group ids for given list
* of nodes with transactional or nontransactional mode according to transactionMode
* inside metadataSyncContext.
*/
static void
UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context)
{
int nodeIdx = 0;
for (nodeIdx = 0; nodeIdx < list_length(context->activatedWorkerNodeList); nodeIdx++)
{
WorkerNode *node = list_nth(context->activatedWorkerNodeList, nodeIdx);
List *commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendOrCollectCommandListToSingleNode(context, commandList, nodeIdx);
}
}
/*
* SendDeletionCommandsForReplicatedTablePlacements sends commands to delete replicated
* placement for the metadata nodes with transactional or nontransactional mode according
* to transactionMode inside metadataSyncContext.
*/
static void
SendDeletionCommandsForReplicatedTablePlacements(MetadataSyncContext *context)
{
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
{
if (!node->isActive)
{
bool localOnly = false;
int32 groupId = node->groupId;
DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(context,
groupId,
localOnly);
}
}
}
/*
* SyncNodeMetadata syncs node metadata with transactional or nontransactional
* mode according to transactionMode inside metadataSyncContext.
*/
static void
SyncNodeMetadata(MetadataSyncContext *context)
{
CheckCitusVersion(ERROR);
/*
* Do not fail when we call this method from activate_node_snapshot
* from workers.
*/
if (!MetadataSyncCollectsCommands(context))
{
EnsureCoordinator();
}
EnsureModificationsCanRun();
EnsureSequentialModeMetadataOperations();
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
/* generate the queries which drop the node metadata */
List *dropMetadataCommandList = NodeMetadataDropCommands();
/* generate the queries which create the node metadata from scratch */
List *createMetadataCommandList = NodeMetadataCreateCommands();
List *recreateNodeSnapshotCommandList = dropMetadataCommandList;
recreateNodeSnapshotCommandList = list_concat(recreateNodeSnapshotCommandList,
createMetadataCommandList);
SendOrCollectCommandListToMetadataNodes(context, recreateNodeSnapshotCommandList);
}

View File

@ -503,12 +503,11 @@ GetReferenceTableColocationId()
/* /*
* DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over * GetAllReplicatedTableList returns all tables which has replicated placements.
* list of reference and replicated hash distributed tables and deletes * i.e. (all reference tables) + (distributed tables with more than 1 placements)
* all placements from pg_dist_placement table for given group.
*/ */
void List *
DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) GetAllReplicatedTableList(void)
{ {
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
List *replicatedMetadataSyncedDistributedTableList = List *replicatedMetadataSyncedDistributedTableList =
@ -517,13 +516,25 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
List *replicatedTableList = List *replicatedTableList =
list_concat(referenceTableList, replicatedMetadataSyncedDistributedTableList); list_concat(referenceTableList, replicatedMetadataSyncedDistributedTableList);
/* if there are no reference tables, we do not need to do anything */ return replicatedTableList;
}
/*
* ReplicatedPlacementsForNodeGroup filters all replicated placements for given
* node group id.
*/
List *
ReplicatedPlacementsForNodeGroup(int32 groupId)
{
List *replicatedTableList = GetAllReplicatedTableList();
if (list_length(replicatedTableList) == 0) if (list_length(replicatedTableList) == 0)
{ {
return; return NIL;
} }
StringInfo deletePlacementCommand = makeStringInfo(); List *replicatedPlacementsForNodeGroup = NIL;
Oid replicatedTableId = InvalidOid; Oid replicatedTableId = InvalidOid;
foreach_oid(replicatedTableId, replicatedTableList) foreach_oid(replicatedTableId, replicatedTableList)
{ {
@ -538,25 +549,104 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
continue; continue;
} }
replicatedPlacementsForNodeGroup = list_concat(replicatedPlacementsForNodeGroup,
placements);
}
return replicatedPlacementsForNodeGroup;
}
/*
* DeleteShardPlacementCommand returns a command for deleting given placement from
* metadata.
*/
char *
DeleteShardPlacementCommand(uint64 placementId)
{
StringInfo deletePlacementCommand = makeStringInfo();
appendStringInfo(deletePlacementCommand,
"DELETE FROM pg_catalog.pg_dist_placement "
"WHERE placementid = " UINT64_FORMAT, placementId);
return deletePlacementCommand->data;
}
/*
* DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over
* list of reference and replicated hash distributed tables and deletes
* all placements from pg_dist_placement table for given group.
*/
void
DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
{
List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId);
/* if there are no replicated tables for the group, we do not need to do anything */
if (list_length(replicatedPlacementListForGroup) == 0)
{
return;
}
GroupShardPlacement *placement = NULL; GroupShardPlacement *placement = NULL;
foreach_ptr(placement, placements) foreach_ptr(placement, replicatedPlacementListForGroup)
{ {
LockShardDistributionMetadata(placement->shardId, ExclusiveLock); LockShardDistributionMetadata(placement->shardId, ExclusiveLock);
if (!localOnly)
{
char *deletePlacementCommand =
DeleteShardPlacementCommand(placement->placementId);
SendCommandToWorkersWithMetadata(deletePlacementCommand);
}
DeleteShardPlacementRow(placement->placementId); DeleteShardPlacementRow(placement->placementId);
}
}
/*
* DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext does the same as
* DeleteAllReplicatedTablePlacementsFromNodeGroup except it uses metadataSyncContext for
* connections.
*/
void
DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
MetadataSyncContext *context, int32 groupId, bool localOnly)
{
List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId);
/* if there are no replicated tables for the group, we do not need to do anything */
if (list_length(replicatedPlacementListForGroup) == 0)
{
return;
}
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, replicatedPlacementListForGroup)
{
LockShardDistributionMetadata(placement->shardId, ExclusiveLock);
if (!localOnly) if (!localOnly)
{ {
resetStringInfo(deletePlacementCommand); char *deletePlacementCommand =
appendStringInfo(deletePlacementCommand, DeleteShardPlacementCommand(placement->placementId);
"DELETE FROM pg_catalog.pg_dist_placement "
"WHERE placementid = " UINT64_FORMAT,
placement->placementId);
SendCommandToWorkersWithMetadata(deletePlacementCommand->data); SendOrCollectCommandListToMetadataNodes(context,
list_make1(deletePlacementCommand));
} }
/* do not execute local transaction if we collect commands */
if (!MetadataSyncCollectsCommands(context))
{
DeleteShardPlacementRow(placement->placementId);
} }
ResetMetadataSyncMemoryContext(context);
} }
MemoryContextSwitchTo(oldContext);
} }

View File

@ -153,6 +153,9 @@ extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context
extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
List *commands, int nodeIdx); List *commands, int nodeIdx);
extern void ActivateNodeList(MetadataSyncContext *context);
extern int ActivateNode(char *nodeName, int nodePort);
extern char * WorkerDropAllShellTablesCommand(bool singleTransaction); extern char * WorkerDropAllShellTablesCommand(bool singleTransaction);
extern void SyncDistributedObjects(MetadataSyncContext *context); extern void SyncDistributedObjects(MetadataSyncContext *context);

View File

@ -17,14 +17,20 @@
#include "listutils.h" #include "listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList);
extern uint32 CreateReferenceTableColocationId(void); extern uint32 CreateReferenceTableColocationId(void);
extern uint32 GetReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void);
extern List * GetAllReplicatedTableList(void);
extern List * ReplicatedPlacementsForNodeGroup(int32 groupId);
extern char * DeleteShardPlacementCommand(uint64 placementId);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
bool localOnly); bool localOnly);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
MetadataSyncContext *context, int32 groupId, bool localOnly);
extern int CompareOids(const void *leftElement, const void *rightElement); extern int CompareOids(const void *leftElement, const void *rightElement);
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode); extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
extern void ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList); extern void ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList);

View File

@ -62,9 +62,6 @@ extern int MaxWorkerNodesTracked;
extern char *WorkerListFileName; extern char *WorkerListFileName;
extern char *CurrentCluster; extern char *CurrentCluster;
extern void ActivateNodeList(List *nodeList);
extern int ActivateNode(char *nodeName, int nodePort);
/* Function declarations for finding worker nodes to place shards on */ /* Function declarations for finding worker nodes to place shards on */
extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,

View File

@ -1202,6 +1202,21 @@ SELECT start_metadata_sync_to_all_nodes();
t t
(1 row) (1 row)
-- nontransactional sync mode tests
SET citus.metadata_sync_mode TO 'nontransactional';
-- do not allow nontransactional sync inside transaction block
BEGIN;
SELECT start_metadata_sync_to_all_nodes();
ERROR: do not sync metadata in transaction block when the sync mode is nontransactional
HINT: resync after SET citus.metadata_sync_mode TO 'transactional'
COMMIT;
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
RESET citus.metadata_sync_mode;
-- verify that at the end of this file, all primary nodes have metadata synced -- verify that at the end of this file, all primary nodes have metadata synced
SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
?column? ?column?

View File

@ -506,5 +506,14 @@ BEGIN;
COMMIT; COMMIT;
SELECT start_metadata_sync_to_all_nodes(); SELECT start_metadata_sync_to_all_nodes();
-- nontransactional sync mode tests
SET citus.metadata_sync_mode TO 'nontransactional';
-- do not allow nontransactional sync inside transaction block
BEGIN;
SELECT start_metadata_sync_to_all_nodes();
COMMIT;
SELECT start_metadata_sync_to_all_nodes();
RESET citus.metadata_sync_mode;
-- verify that at the end of this file, all primary nodes have metadata synced -- verify that at the end of this file, all primary nodes have metadata synced
SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';