sync objects without coordinated tx

metadata_sync_final
Onder Kalaci 2022-12-02 08:03:27 +01:00
parent 512c72d2d0
commit ccf955fb52
4 changed files with 99 additions and 42 deletions

View File

@ -534,14 +534,15 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
void
ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext)
{
List *connectionList = activateNodeContext.connectionList;
List *connectionList = activateNodeContext.activatedNodeConnectionList;
/* since we are executing ddl commands disable propagation first, primarily for mx */
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
if (activateNodeContext.fetchCommands)
{
/* caller requested the commands */
activateNodeContext.commandList = lappend(activateNodeContext.commandList, DISABLE_DDL_PROPAGATION);
activateNodeContext.commandList = lappend(activateNodeContext.commandList,
DISABLE_DDL_PROPAGATION);
}
/*
@ -617,7 +618,8 @@ ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext)
if (activateNodeContext.fetchCommands)
{
/* caller requested the commands */
activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION);
activateNodeContext.commandList = lappend(activateNodeContext.commandList,
ENABLE_DDL_PROPAGATION);
}
}

View File

@ -239,26 +239,29 @@ SyncNodeMetadataToNode(ActivateNodeContext activateContext)
WorkerNode *workerNode = NULL;
MultiConnection *connection = NULL;
forboth_ptr(workerNode, activateContext.workerNodeList,
connection, activateContext.connectionList)
forboth_ptr(workerNode, activateContext.activatedNodeList,
connection, activateContext.activatedNodeConnectionList)
{
if (NodeIsCoordinator(workerNode))
{
return;
}
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
BoolGetDatum(true));
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true));
GetMetadataSyncCommandToSetNodeColumn(workerNode,
Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
BoolGetDatum(true));
metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true));
GetMetadataSyncCommandToSetNodeColumn(workerNode,
Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
if (!NodeIsPrimary(workerNode))
@ -274,7 +277,8 @@ SyncNodeMetadataToNode(ActivateNodeContext activateContext)
metadataSyncCommand = NULL;
foreach_ptr(metadataSyncCommand, nodeMetadataCommandList)
{
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
ExecuteRemoteCommandInConnectionList(list_make1(connection),
metadataSyncCommand);
}
}
}
@ -598,6 +602,7 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
List *recreateMetadataSnapshotCommandList =
NodeMetadataReCreateCommandList(workerNode);
/*
* Send the snapshot recreation commands in a single remote transaction and
* if requested, error out in any kind of failure. Note that it is not
@ -627,7 +632,6 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
static List *
NodeMetadataReCreateCommandList(WorkerNode *workerNode)
{
/* generate and add the local group id's update query */
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
@ -644,9 +648,9 @@ NodeMetadataReCreateCommandList(WorkerNode *workerNode)
createMetadataCommandList);
return recreateMetadataSnapshotCommandList;
}
/*
* DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them
* to the worker given as parameter.

View File

@ -99,7 +99,8 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext);
static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext
activateNodeContext);
static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext);
static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext);
static void BlockDistributedQueriesOnMetadataNodes(void);
@ -107,7 +108,8 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT
static void PropagateNodeWideObjects(ActivateNodeContext activateNodeContext);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker);
static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext);
static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext
activateNodeContext);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAllWorkers(void);
@ -668,7 +670,7 @@ BuildInterTableRelationships(ActivateNodeContext activateNodeContext)
}
}
List *connectionList = activateNodeContext.connectionList;
List *connectionList = activateNodeContext.activatedNodeConnectionList;
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
foreach_ptr(cacheEntry, propagatedTableList)
@ -774,24 +776,30 @@ PropagateNodeWideObjects(ActivateNodeContext activateNodeContext)
if (alterRoleSetCommands != NIL)
{
ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, DISABLE_DDL_PROPAGATION);
ExecuteRemoteCommandInConnectionList(
activateNodeContext.activatedNodeConnectionList, DISABLE_DDL_PROPAGATION);
char *command = NULL;
foreach_ptr(command, alterRoleSetCommands)
{
ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, command);
ExecuteRemoteCommandInConnectionList(
activateNodeContext.activatedNodeConnectionList, command);
}
ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, ENABLE_DDL_PROPAGATION);
ExecuteRemoteCommandInConnectionList(
activateNodeContext.activatedNodeConnectionList, ENABLE_DDL_PROPAGATION);
/* the caller is interested in collecting the commands */
if (activateNodeContext.fetchCommands)
{
activateNodeContext.commandList = list_concat(activateNodeContext.commandList, alterRoleSetCommands);
activateNodeContext.commandList = list_concat(
activateNodeContext.commandList, alterRoleSetCommands);
/* if there are command wrap them in enable_ddl_propagation off */
activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList);
activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION);
activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION,
activateNodeContext.commandList);
activateNodeContext.commandList = lappend(activateNodeContext.commandList,
ENABLE_DDL_PROPAGATION);
}
}
}
@ -848,7 +856,7 @@ SyncDistributedObjects(ActivateNodeContext activateNodeContext)
static void
SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext)
{
if (activateNodeContext.workerNodeList == NIL)
if (activateNodeContext.activatedNodeList == NIL)
{
return;
}
@ -876,7 +884,7 @@ SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext)
List *nodesWithMetadata = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, activateContext.workerNodeList)
foreach_ptr(workerNode, activateContext.activatedNodeList)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
@ -891,7 +899,7 @@ SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext)
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
List *connectionList = activateContext.connectionList;
List *connectionList = activateContext.activatedNodeConnectionList;
char *command = NULL;
foreach_ptr(command, syncPgDistMetadataCommandList)
@ -1117,13 +1125,13 @@ ActivateNodeList(List *nodeList)
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
ActivateNodeContext activateContext = {};
ActivateNodeContext activateContext = { };
activateContext.fetchCommands = false;
activateContext.workerNodeList = NIL;
activateContext.connectionList = NIL;
activateContext.commandList = NIL;
activateContext.activatedNodeList = NIL;
activateContext.activatedNodeConnectionList = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
@ -1175,8 +1183,8 @@ ActivateNodeList(List *nodeList)
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
{
activateContext.workerNodeList =
lappend(activateContext.workerNodeList, workerNode);
activateContext.activatedNodeList =
lappend(activateContext.activatedNodeList, workerNode);
int connectionFlags = FORCE_NEW_CONNECTION;
@ -1184,28 +1192,66 @@ ActivateNodeList(List *nodeList)
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
activateContext.connectionList =
lappend(activateContext.connectionList, connection);
activateContext.activatedNodeConnectionList =
lappend(activateContext.activatedNodeConnectionList, connection);
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
BoolGetDatum(true));
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
GetMetadataSyncCommandToSetNodeColumn(workerNode,
Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection),
metadataSyncCommand);
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
char *localGroupCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
ExecuteRemoteCommandInConnectionList(list_make1(connection), localGroupCommand);
ExecuteRemoteCommandInConnectionList(list_make1(connection),
localGroupCommand);
}
}
List *existingMetadataNodeList =
TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
/*ErrorIfAnyMetadataNodeOutOfSync(existingMetadataNodeList); */
node = NULL;
foreach_ptr(node, existingMetadataNodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, node->workerName,
node->workerPort, NULL, NULL);
activateContext.existingMetadataNodeConnectionList =
lappend(activateContext.existingMetadataNodeConnectionList, connection);
}
node = NULL;
foreach_ptr(node, existingMetadataNodeList)
{
if (list_member_ptr(activateContext.activatedNodeList, node))
{
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(node,
Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(
activateContext.existingMetadataNodeConnectionList, metadataSyncCommand);
}
}
/*
* Ideally, we'd want to drop and sync the metadata inside a
* single transaction. However, for some users, the metadata might
@ -1249,14 +1295,16 @@ ActivateNodeList(List *nodeList)
node = NULL;
MultiConnection *connection = NULL;
forboth_ptr(node, activateContext.workerNodeList, connection,activateContext.connectionList)
forboth_ptr(node, activateContext.activatedNodeList, connection,
activateContext.activatedNodeConnectionList)
{
/* finally, let all other active metadata nodes to learn about this change */
SetWorkerColumnLocalOnly(node, Anum_pg_dist_node_isactive, BoolGetDatum(true));
char *metadataSyncCommand =
GetMetadataSyncCommandToSetNodeColumn(node, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
ExecuteRemoteCommandInConnectionList(list_make1(connection), metadataSyncCommand);
ExecuteRemoteCommandInConnectionList(
activateContext.existingMetadataNodeConnectionList, metadataSyncCommand);
}
}
@ -1274,8 +1322,9 @@ DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext
List *commandList = DropExistingMetadataCommandList();
foreach_ptr(command, commandList)
{
ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList,
command);
ExecuteRemoteCommandInConnectionList(
activateNodeContext.activatedNodeConnectionList,
command);
}
if (activateNodeContext.fetchCommands)

View File

@ -59,8 +59,10 @@ typedef struct WorkerNode
typedef struct ActivateNodeContext
{
List *workerNodeList;
List *connectionList;
List *activatedNodeList;
List *activatedNodeConnectionList;
List *existingMetadataNodeConnectionList;
bool fetchCommands;
List *commandList;