sync objects without coordinated tx

metadata_sync_final
Onder Kalaci 2022-12-01 18:15:07 +01:00
parent a32aee8b28
commit daaef20d71
5 changed files with 82 additions and 112 deletions

View File

@ -532,14 +532,16 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
* clusterHasDistributedFunction if there are any distributed functions. * clusterHasDistributedFunction if there are any distributed functions.
*/ */
void void
ReplicateAllObjectsToNodes(List *connectionList, List **commandList) ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext)
{ {
List *connectionList = activateNodeContext.connectionList;
/* since we are executing ddl commands disable propagation first, primarily for mx */ /* since we are executing ddl commands disable propagation first, primarily for mx */
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
/* caller requested the commands */ /* caller requested the commands */
*commandList = lappend(*commandList, DISABLE_DDL_PROPAGATION); activateNodeContext.commandList = lappend(activateNodeContext.commandList, DISABLE_DDL_PROPAGATION);
} }
/* /*
@ -602,19 +604,20 @@ ReplicateAllObjectsToNodes(List *connectionList, List **commandList)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, command); ExecuteRemoteCommandInConnectionList(connectionList, command);
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
/* caller requested the commands */ /* caller requested the commands */
*commandList = lappend(*commandList, command); activateNodeContext.commandList =
lappend(activateNodeContext.commandList, command);
} }
} }
} }
ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION);
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
/* caller requested the commands */ /* caller requested the commands */
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION);
} }
} }

View File

@ -100,16 +100,16 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsInOutsideTransaction(List *workerNodeList); static void SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeList); static void SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateNodeContext);
static void BuildInterTableRelationships(List *connectionList, List **commandList); static void BuildInterTableRelationships(ActivateNodeContext activateNodeContext);
static void BlockDistributedQueriesOnMetadataNodes(void); static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void PropagateNodeWideObjects(List *connectionList, List **commandList); static void PropagateNodeWideObjects(ActivateNodeContext activateNodeContext);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker); static bool NodeIsLocal(WorkerNode *worker);
static void DropExistingMetadataInOutsideTransaction(List *nodeList); static void DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext);
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAllWorkers(void); static bool UnsetMetadataSyncedForAllWorkers(void);
@ -654,7 +654,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
* for each citus table. * for each citus table.
*/ */
static void static void
BuildInterTableRelationships(List *connectionList, List **commandList) BuildInterTableRelationships(ActivateNodeContext activateNodeContext)
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
@ -673,6 +673,7 @@ BuildInterTableRelationships(List *connectionList, List **commandList)
} }
} }
List *connectionList = activateNodeContext.connectionList;
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
@ -687,21 +688,24 @@ BuildInterTableRelationships(List *connectionList, List **commandList)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, command); ExecuteRemoteCommandInConnectionList(connectionList, command);
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
/* caller requested the commands */ /* caller requested the commands */
*commandList = lappend(*commandList, command); activateNodeContext.commandList =
lappend(activateNodeContext.commandList, command);
} }
} }
} }
ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION);
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
/* caller requested the commands */ /* caller requested the commands */
*commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); activateNodeContext.commandList =
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList);
activateNodeContext.commandList =
lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION);
} }
} }
@ -763,7 +767,7 @@ PgDistTableMetadataSyncCommandList(void)
* change system wide behaviour. * change system wide behaviour.
*/ */
static void static void
PropagateNodeWideObjects(List *connectionList, List **commandList) PropagateNodeWideObjects(ActivateNodeContext activateNodeContext)
{ {
if (EnableAlterRoleSetPropagation) if (EnableAlterRoleSetPropagation)
{ {
@ -775,24 +779,24 @@ PropagateNodeWideObjects(List *connectionList, List **commandList)
if (alterRoleSetCommands != NIL) if (alterRoleSetCommands != NIL)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, DISABLE_DDL_PROPAGATION);
char *command = NULL; char *command = NULL;
foreach_ptr(command, alterRoleSetCommands) foreach_ptr(command, alterRoleSetCommands)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, command); ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, command);
} }
ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList, ENABLE_DDL_PROPAGATION);
/* the caller is interested in collecting the commands */ /* the caller is interested in collecting the commands */
if (commandList != NULL) if (activateNodeContext.fetchCommands)
{ {
*commandList = list_concat(*commandList, alterRoleSetCommands); activateNodeContext.commandList = list_concat(activateNodeContext.commandList, alterRoleSetCommands);
/* if there are command wrap them in enable_ddl_propagation off */ /* if there are command wrap them in enable_ddl_propagation off */
*commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList); activateNodeContext.commandList = lcons(DISABLE_DDL_PROPAGATION, activateNodeContext.commandList);
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION); activateNodeContext.commandList = lappend(activateNodeContext.commandList, ENABLE_DDL_PROPAGATION);
} }
} }
} }
@ -816,46 +820,23 @@ PropagateNodeWideObjects(List *connectionList, List **commandList)
* partitioning, so use the list cautiously. * partitioning, so use the list cautiously.
*/ */
void void
SyncDistributedObjects(List *workerNodeList, List **commandList) SyncDistributedObjects(ActivateNodeContext activateNodeContext)
{ {
List *connectionList = NIL;
/* first, establish new connections */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
/* /*
* Propagate node wide objects. It includes only roles for now. * Propagate node wide objects. It includes only roles for now.
*/ */
PropagateNodeWideObjects(connectionList, commandList); PropagateNodeWideObjects(activateNodeContext);
/* /*
* Replicate all objects of the pg_dist_object to the remote node. * Replicate all objects of the pg_dist_object to the remote node.
*/ */
ReplicateAllObjectsToNodes(connectionList, commandList); ReplicateAllObjectsToNodes(activateNodeContext);
/* /*
* After creating each table, handle the inter table relationship between * After creating each table, handle the inter table relationship between
* those tables. * those tables.
*/ */
BuildInterTableRelationships(connectionList, commandList); BuildInterTableRelationships(activateNodeContext);
/* finally, close the connections as we don't need them anymore */
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
} }
@ -870,9 +851,9 @@ SyncDistributedObjects(List *workerNodeList, List **commandList)
* since all the dependencies should be present in the coordinator already. * since all the dependencies should be present in the coordinator already.
*/ */
static void static void
SyncDistributedObjectsInOutsideTransaction(List *workerNodeList) SyncDistributedObjectsInOutsideTransaction(ActivateNodeContext activateNodeContext)
{ {
if (workerNodeList == NIL) if (activateNodeContext.workerNodeList == NIL)
{ {
return; return;
} }
@ -883,7 +864,7 @@ SyncDistributedObjectsInOutsideTransaction(List *workerNodeList)
/* send commands to new workers, the current user should be a superuser */ /* send commands to new workers, the current user should be a superuser */
Assert(superuser()); Assert(superuser());
SyncDistributedObjects(workerNodeList, NULL); SyncDistributedObjects(activateNodeContext);
} }
@ -913,14 +894,14 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
* *
*/ */
static void static void
SyncPgDistTableMetadataToNodeList(List *nodeList) SyncPgDistTableMetadataToNodeList(ActivateNodeContext activateContext)
{ {
/* send commands to new workers, the current user should be a superuser */ /* send commands to new workers, the current user should be a superuser */
Assert(superuser()); Assert(superuser());
List *nodesWithMetadata = NIL; List *nodesWithMetadata = NIL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList) foreach_ptr(workerNode, activateContext.workerNodeList)
{ {
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{ {
@ -935,34 +916,13 @@ SyncPgDistTableMetadataToNodeList(List *nodeList)
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
List *connectionList = NIL; List *connectionList = activateContext.connectionList;
/* first, establish new connections */
workerNode = NULL;
foreach_ptr(workerNode, nodesWithMetadata)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
char *command = NULL; char *command = NULL;
foreach_ptr(command, syncPgDistMetadataCommandList) foreach_ptr(command, syncPgDistMetadataCommandList)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, command); ExecuteRemoteCommandInConnectionList(connectionList, command);
} }
/* finally, close the connections as we don't need them anymore */
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
} }
@ -1182,8 +1142,10 @@ ActivateNodeList(List *nodeList)
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
ActivateNodeContext activateContext;
activateContext.fetchCommands = false;
List *nodeToSyncMetadata = NIL;
WorkerNode *node = NULL; WorkerNode *node = NULL;
foreach_ptr(node, nodeList) foreach_ptr(node, nodeList)
{ {
@ -1247,7 +1209,17 @@ ActivateNodeList(List *nodeList)
*/ */
UpdateLocalGroupIdOnNode(workerNode); UpdateLocalGroupIdOnNode(workerNode);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); activateContext.workerNodeList =
lappend(activateContext.workerNodeList, workerNode);
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
activateContext.connectionList =
lappend(activateContext.connectionList, connection);
} }
} }
@ -1265,7 +1237,7 @@ ActivateNodeList(List *nodeList)
* It incurs some risk of failures during these steps causing * It incurs some risk of failures during these steps causing
* issues if any of the steps is not idempotent. * issues if any of the steps is not idempotent.
*/ */
DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata); DropExistingMetadataInOutsideTransaction(activateContext);
/* /*
@ -1276,14 +1248,14 @@ ActivateNodeList(List *nodeList)
* Given that object creation is idempotent, we can afford to run them * Given that object creation is idempotent, we can afford to run them
* outside the transaction. * outside the transaction.
*/ */
SyncDistributedObjectsInOutsideTransaction(nodeToSyncMetadata); SyncDistributedObjectsInOutsideTransaction(activateContext);
/* /*
* Sync node metadata. We must sync node metadata before syncing table * Sync node metadata. We must sync node metadata before syncing table
* related pg_dist_xxx metadata. Since table related metadata requires * related pg_dist_xxx metadata. Since table related metadata requires
* to have right pg_dist_node entries. * to have right pg_dist_node entries.
*/ */
foreach_ptr(node, nodeToSyncMetadata) foreach_ptr(node, activateContext.workerNodeList)
{ {
SyncNodeMetadataToNode(node->workerName, node->workerPort); SyncNodeMetadataToNode(node->workerName, node->workerPort);
} }
@ -1293,7 +1265,7 @@ ActivateNodeList(List *nodeList)
* We must handle it as the last step because of limitations shared with * We must handle it as the last step because of limitations shared with
* above comments. * above comments.
*/ */
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); SyncPgDistTableMetadataToNodeList(activateContext);
foreach_ptr(node, nodeList) foreach_ptr(node, nodeList)
{ {
@ -1312,36 +1284,20 @@ ActivateNodeList(List *nodeList)
* The function establishes one connection per node, and closes at the end. * The function establishes one connection per node, and closes at the end.
*/ */
static void static void
DropExistingMetadataInOutsideTransaction(List *nodeList) DropExistingMetadataInOutsideTransaction(ActivateNodeContext activateNodeContext)
{ {
List *connectionList = NIL;
/* first, establish new connections */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
char *command = NULL; char *command = NULL;
List *commandList = DropExistingMetadataCommandList(); List *commandList = DropExistingMetadataCommandList();
foreach_ptr(command, commandList) foreach_ptr(command, commandList)
{ {
ExecuteRemoteCommandInConnectionList(connectionList, command); ExecuteRemoteCommandInConnectionList(activateNodeContext.connectionList,
command);
} }
/* finally, close the connections as we don't need them anymore */ if (activateNodeContext.fetchCommands)
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{ {
CloseConnection(connection); activateNodeContext.commandList =
list_concat(activateNodeContext.commandList, commandList);
} }
} }

View File

@ -52,8 +52,10 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *updateLocalGroupCommand = List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *dropMetadataCommandList = DropExistingMetadataCommandList(); List *dropMetadataCommandList = DropExistingMetadataCommandList();
List *syncDistObjCommands = NIL; ActivateNodeContext context;
SyncDistributedObjects(NIL, &syncDistObjCommands); context.fetchCommands = true;
SyncDistributedObjects(context);
List *syncDistObjCommands = context.commandList;
List *dropNodeSnapshotCommands = NodeMetadataDropCommands(); List *dropNodeSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();

View File

@ -349,7 +349,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies);
extern bool ShouldPropagate(void); extern bool ShouldPropagate(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateAnyObject(List *addresses); extern bool ShouldPropagateAnyObject(List *addresses);
extern void ReplicateAllObjectsToNodes(List *connectionList, List **commandList); extern void ReplicateAllObjectsToNodes(ActivateNodeContext activateNodeContext);
/* Remaining metadata utility functions */ /* Remaining metadata utility functions */
extern Oid TableOwnerOid(Oid relationId); extern Oid TableOwnerOid(Oid relationId);

View File

@ -57,6 +57,15 @@ typedef struct WorkerNode
} WorkerNode; } WorkerNode;
typedef struct ActivateNodeContext
{
List *workerNodeList;
List *connectionList;
bool fetchCommands;
List *commandList;
} ActivateNodeContext;
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
extern int MaxWorkerNodesTracked; extern int MaxWorkerNodesTracked;
extern char *WorkerListFileName; extern char *WorkerListFileName;
@ -106,7 +115,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value); Datum value);
extern uint32 CountPrimariesWithMetadata(void); extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern void SyncDistributedObjects(List *workerNodeList, List **commandList); extern void SyncDistributedObjects(ActivateNodeContext activateNodeContext);
extern List * PgDistTableMetadataSyncCommandList(void); extern List * PgDistTableMetadataSyncCommandList(void);
/* Function declarations for worker node utilities */ /* Function declarations for worker node utilities */