sync objects without coordinated tx

metadata_sync_final
Onder Kalaci 2022-12-01 23:48:02 +01:00
parent 5243c0b515
commit 512c72d2d0
4 changed files with 102 additions and 139 deletions

View File

@ -104,6 +104,7 @@ static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed, static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey); bool citusTableWithNoDistKey);
static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError); static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
static List * NodeMetadataReCreateCommandList(WorkerNode *workerNode);
static void DropMetadataSnapshotOnNode(WorkerNode *workerNode); static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
char *columnName); char *columnName);
@ -226,10 +227,8 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
* start_metadata_sync_to_node(). * start_metadata_sync_to_node().
*/ */
void void
SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) SyncNodeMetadataToNode(ActivateNodeContext activateContext)
{ {
char *escapedNodeName = quote_literal_cstr(nodeNameString);
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
EnsureModificationsCanRun(); EnsureModificationsCanRun();
@ -238,45 +237,29 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); WorkerNode *workerNode = NULL;
if (workerNode == NULL) MultiConnection *connection = NULL;
forboth_ptr(workerNode, activateContext.workerNodeList,
connection, activateContext.connectionList)
{ {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to a non-existent node"),
errhint("First, add the node with SELECT citus_add_node"
"(%s,%d)", escapedNodeName, nodePort)));
}
if (!workerNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to an inactive node"),
errhint("First, activate the node with "
"SELECT citus_activate_node(%s,%d)",
escapedNodeName, nodePort)));
}
if (NodeIsCoordinator(workerNode)) if (NodeIsCoordinator(workerNode))
{ {
return; return;
} }
UseCoordinatedTransaction(); SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced,
/*
* One would normally expect to set hasmetadata first, and then metadata sync.
* However, at this point we do the order reverse.
* We first set metadatasynced, and then hasmetadata; since setting columns for
* nodes with metadatasynced==false could cause errors.
* (See ErrorIfAnyMetadataNodeOutOfSync)
* We can safely do that because we are in a coordinated transaction and the changes
* are only visible to our own transaction.
* If anything goes wrong, we are going to rollback all the changes.
*/
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true)); BoolGetDatum(true));
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum( char *metadataSyncCommand =
true)); GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
if (!NodeIsPrimary(workerNode)) if (!NodeIsPrimary(workerNode))
{ {
@ -287,9 +270,13 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
return; return;
} }
/* fail if metadata synchronization doesn't succeed */ List *nodeMetadataCommandList = NodeMetadataReCreateCommandList(workerNode);
bool raiseInterrupts = true; metadataSyncCommand = NULL;
SyncNodeMetadataSnapshotToNode(workerNode, raiseInterrupts); foreach_ptr(metadataSyncCommand, nodeMetadataCommandList)
{
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
}
}
} }
@ -609,21 +596,8 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
{ {
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
/* generate and add the local group id's update query */ List *recreateMetadataSnapshotCommandList =
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); NodeMetadataReCreateCommandList(workerNode);
/* generate the queries which drop the node metadata */
List *dropMetadataCommandList = NodeMetadataDropCommands();
/* generate the queries which create the node metadata from scratch */
List *createMetadataCommandList = NodeMetadataCreateCommands();
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/* /*
* Send the snapshot recreation commands in a single remote transaction and * Send the snapshot recreation commands in a single remote transaction and
* if requested, error out in any kind of failure. Note that it is not * if requested, error out in any kind of failure. Note that it is not
@ -650,6 +624,29 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
} }
static List *
NodeMetadataReCreateCommandList(WorkerNode *workerNode)
{
/* generate and add the local group id's update query */
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
/* generate the queries which drop the node metadata */
List *dropMetadataCommandList = NodeMetadataDropCommands();
/* generate the queries which create the node metadata from scratch */
List *createMetadataCommandList = NodeMetadataCreateCommands();
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
return recreateMetadataSnapshotCommandList;
}
/* /*
* DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them
* to the worker given as parameter. * to the worker given as parameter.

View File

@ -92,7 +92,6 @@ static bool PlacementHasActivePlacementOnAnotherGroup(GroupShardPlacement
*sourcePlacement); *sourcePlacement);
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
*nodeMetadata, bool *nodeAlreadyExists); *nodeMetadata, bool *nodeAlreadyExists);
static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static int GetNextNodeId(void); static int GetNextNodeId(void);
@ -101,7 +100,6 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext); static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext); static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext);
static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext); static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext);
static void BlockDistributedQueriesOnMetadataNodes(void); static void BlockDistributedQueriesOnMetadataNodes(void);
@ -113,9 +111,6 @@ static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activat
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAllWorkers(void); static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex,
Datum value);
static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata); static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata);
static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced);
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
@ -868,26 +863,6 @@ SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeConte
} }
/*
* UpdateLocalGroupIdOnNode updates local group id on node.
*/
static void
UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
List *commandList = list_make1(LocalGroupIdUpdateCommand(workerNode->groupId));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
list_make1(workerNode),
CurrentUserName(),
commandList);
}
}
/* /*
* SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard * SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard
* pg_dist_placement and pg_dist_object metadata entries. * pg_dist_placement and pg_dist_object metadata entries.
@ -1142,9 +1117,13 @@ ActivateNodeList(List *nodeList)
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
ActivateNodeContext activateContext; ActivateNodeContext activateContext = {};
activateContext.fetchCommands = false; activateContext.fetchCommands = false;
activateContext.workerNodeList = NIL;
activateContext.connectionList = NIL;
activateContext.commandList = NIL;
WorkerNode *node = NULL; WorkerNode *node = NULL;
foreach_ptr(node, nodeList) foreach_ptr(node, nodeList)
@ -1196,19 +1175,6 @@ ActivateNodeList(List *nodeList)
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata) if (syncMetadata)
{ {
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
activateContext.workerNodeList = activateContext.workerNodeList =
lappend(activateContext.workerNodeList, workerNode); lappend(activateContext.workerNodeList, workerNode);
@ -1220,6 +1186,23 @@ ActivateNodeList(List *nodeList)
workerNode->workerPort, NULL, NULL); workerNode->workerPort, NULL, NULL);
activateContext.connectionList = activateContext.connectionList =
lappend(activateContext.connectionList, connection); lappend(activateContext.connectionList, connection);
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
char *localGroupCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
ExecuteRemoteCommandInConnectionList(list_make1(connection), localGroupCommand);
} }
} }
@ -1255,10 +1238,7 @@ ActivateNodeList(List *nodeList)
* related pg_dist_xxx metadata. Since table related metadata requires * related pg_dist_xxx metadata. Since table related metadata requires
* to have right pg_dist_node entries. * to have right pg_dist_node entries.
*/ */
foreach_ptr(node, activateContext.workerNodeList) SyncNodeMetadataToNode(activateContext);
{
SyncNodeMetadataToNode(node->workerName, node->workerPort);
}
/* /*
* As the last step, sync the table related metadata to the remote node. * As the last step, sync the table related metadata to the remote node.
@ -1267,12 +1247,16 @@ ActivateNodeList(List *nodeList)
*/ */
SyncPgDistTableMetadataToNodeList(activateContext); SyncPgDistTableMetadataToNodeList(activateContext);
foreach_ptr(node, nodeList) node = NULL;
MultiConnection *connection = NULL;
forboth_ptr(node, activateContext.workerNodeList, connection,activateContext.connectionList)
{ {
bool isActive = true;
/* finally, let all other active metadata nodes to learn about this change */ /* finally, let all other active metadata nodes to learn about this change */
SetNodeState(node->workerName, node->workerPort, isActive); SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, BoolGetDatum(true));
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(node, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
} }
} }
@ -1342,18 +1326,12 @@ DropExistingMetadataCommandList()
int int
ActivateNode(char *nodeName, int nodePort) ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true;
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
ActivateNodeList(list_make1(workerNode)); ActivateNodeList(list_make1(workerNode));
/* finally, let all other active metadata nodes to learn about this change */
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
Assert(newWorkerNode->nodeId == workerNode->nodeId);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
return newWorkerNode->nodeId; return workerNode->nodeId;
} }
@ -2386,7 +2364,7 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
* GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is
* valid or not. Then it returns the necessary metadata sync command as a string. * valid or not. Then it returns the necessary metadata sync command as a string.
*/ */
static char * char *
GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum
value) value)
{ {
@ -2500,20 +2478,6 @@ SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards)
} }
/*
* SetNodeState function sets the isactive column of the specified worker in
* pg_dist_node to isActive. Also propagates this to other metadata nodes.
* It returns the new worker node after the modification.
*/
static WorkerNode *
SetNodeState(char *nodeName, int nodePort, bool isActive)
{
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(
isActive));
}
/* /*
* GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the * GetNodeTuple function returns the heap tuple of given nodeName and nodePort. If the
* node is not found this function returns NULL. * node is not found this function returns NULL.

View File

@ -46,9 +46,8 @@ typedef struct SequenceInfo
bool isNextValDefault; bool isNextValDefault;
} SequenceInfo; } SequenceInfo;
/* Functions declarations for metadata syncing */ /* Functions declarations for metadata syncing */
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort); extern void SyncNodeMetadataToNode(ActivateNodeContext activateContext);
extern void SyncCitusTableMetadata(Oid relationId); extern void SyncCitusTableMetadata(Oid relationId);
extern void EnsureSequentialModeMetadataOperations(void); extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);

View File

@ -113,6 +113,9 @@ extern WorkerNode * SetWorkerColumnOptional(WorkerNode *workerNode, int columnIn
value); value);
extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex,
Datum value); Datum value);
extern char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex,
Datum value);
extern uint32 CountPrimariesWithMetadata(void); extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern void SyncDistributedObjects(ActivateNodeContext activateNodeContext); extern void SyncDistributedObjects(ActivateNodeContext activateNodeContext);