sync objects without coordinated tx

metadata_sync_final
Onder Kalaci 2022-12-01 15:41:45 +01:00
parent 6c3f58afaa
commit 77ef777d53
5 changed files with 105 additions and 59 deletions

View File

@ -531,11 +531,16 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
* previously marked objects to a worker node. The function also sets * previously marked objects to a worker node. The function also sets
* clusterHasDistributedFunction if there are any distributed functions. * clusterHasDistributedFunction if there are any distributed functions.
*/ */
List * void
ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort) ReplicateAllObjectsToNodes(List *connectionList, List **commandList)
{ {
/* since we are executing ddl commands disable propagation first, primarily for mx */ /* since we are executing ddl commands disable propagation first, primarily for mx */
List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
if (commandList != NULL)
{
/* caller requested the commands */
*commandList = lappend(*commandList, DISABLE_DDL_PROPAGATION);
}
/* /*
* collect all dependencies in creation order and get their ddl commands * collect all dependencies in creation order and get their ddl commands
@ -556,10 +561,12 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
* 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too * 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too
* low we can adjust this based on experience. * low we can adjust this based on experience.
*/ */
if (list_length(dependencies) > 100) if (list_length(dependencies) > 100 && list_length(connectionList))
{ {
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName, MultiConnection *connection = (MultiConnection *) linitial(connectionList);
nodePort),
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d",
connection->hostname, connection->port),
errdetail("There are %d objects to replicate, depending on your " errdetail("There are %d objects to replicate, depending on your "
"environment this might take a while", "environment this might take a while",
list_length(dependencies)))); list_length(dependencies))));
@ -578,13 +585,26 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
continue; continue;
} }
ddlCommands = list_concat(ddlCommands, List *commandsForDep = GetDependencyCreateDDLCommands(dependency);
GetDependencyCreateDDLCommands(dependency)); char *command = NULL;
foreach_ptr(command, commandsForDep)
{
ExecuteRemoteCommandInConnectionList(connectionList, command);
if (commandList != NULL)
{
/* caller requested the commands */
*commandList = lappend(*commandList, command);
}
}
} }
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION);
if (commandList != NULL)
return ddlCommands; {
/* caller requested the commands */
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION);
}
} }

View File

@ -103,10 +103,10 @@ static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsToNodeList(List *workerNodeList); static void SyncDistributedObjectsToNodeList(List *workerNodeList);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeList); static void SyncPgDistTableMetadataToNodeList(List *nodeList);
static List * InterTableRelationshipCommandList(); static void BuildInterTableRelationships(List *connectionList, List **commandList);
static void BlockDistributedQueriesOnMetadataNodes(void); static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static List * PropagateNodeWideObjectsCommandList(); static void PropagateNodeWideObjects(List *connectionList, List **commandList);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker); static bool NodeIsLocal(WorkerNode *worker);
static void DropExistingMetadataInOutsideTransaction(List *nodeList); static void DropExistingMetadataInOutsideTransaction(List *nodeList);
@ -653,12 +653,11 @@ master_set_node_property(PG_FUNCTION_ARGS)
* *
* for each citus table. * for each citus table.
*/ */
static List * static void
InterTableRelationshipCommandList() BuildInterTableRelationships(List *connectionList, List **commandList)
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
List *multipleTableIntegrationCommandList = NIL;
CitusTableCacheEntry *cacheEntry = NULL; CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList) foreach_ptr(cacheEntry, distributedTableList)
@ -674,6 +673,8 @@ InterTableRelationshipCommandList()
} }
} }
ExecuteRemoteCommandInConnectionList(connectionList, DISABLE_DDL_PROPAGATION);
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
{ {
Oid relationId = cacheEntry->relationId; Oid relationId = cacheEntry->relationId;
@ -681,17 +682,27 @@ InterTableRelationshipCommandList()
List *commandListForRelation = List *commandListForRelation =
InterTableRelationshipOfRelationCommandList(relationId); InterTableRelationshipOfRelationCommandList(relationId);
multipleTableIntegrationCommandList = list_concat( char *command = NULL;
multipleTableIntegrationCommandList, foreach_ptr(command, commandListForRelation)
commandListForRelation); {
ExecuteRemoteCommandInConnectionList(connectionList, command);
if (commandList != NULL)
{
/* caller requested the commands */
*commandList = lappend(*commandList, command);
}
}
} }
multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, ExecuteRemoteCommandInConnectionList(connectionList, ENABLE_DDL_PROPAGATION);
multipleTableIntegrationCommandList);
multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList,
ENABLE_DDL_PROPAGATION);
return multipleTableIntegrationCommandList; if (commandList != NULL)
{
/* caller requested the commands */
*commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList);
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION);
}
} }
@ -745,16 +756,15 @@ PgDistTableMetadataSyncCommandList(void)
/* /*
* PropagateNodeWideObjectsCommandList is called during node activation to * PropagateNodeWideObjects is called during node activation to
* propagate any object that should be propagated for every node. These are * propagate any object that should be propagated for every node.
* generally not linked to any distributed object but change system wide behaviour. *
* These are generally not linked to any distributed object but
* change system wide behaviour.
*/ */
static List * static void
PropagateNodeWideObjectsCommandList() PropagateNodeWideObjects(List *connectionList, List **commandList)
{ {
/* collect all commands */
List *ddlCommands = NIL;
if (EnableAlterRoleSetPropagation) if (EnableAlterRoleSetPropagation)
{ {
/* /*
@ -762,17 +772,26 @@ PropagateNodeWideObjectsCommandList()
* linked to any role that can be distributed we need to distribute them seperately * linked to any role that can be distributed we need to distribute them seperately
*/ */
List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
ddlCommands = list_concat(ddlCommands, alterRoleSetCommands);
}
if (list_length(ddlCommands) > 0) if (alterRoleSetCommands != NIL)
{ {
/* if there are command wrap them in enable_ddl_propagation off */ char *command = NULL;
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); foreach_ptr(command, alterRoleSetCommands)
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); {
ExecuteRemoteCommandInConnectionList(connectionList, command);
} }
return ddlCommands; /* the caller is interested in collecting the commands */
if (commandList != NULL)
{
*commandList = list_concat(*commandList, alterRoleSetCommands);
/* if there are command wrap them in enable_ddl_propagation off */
*commandList = lcons(DISABLE_DDL_PROPAGATION, *commandList);
*commandList = lappend(*commandList, ENABLE_DDL_PROPAGATION);
}
}
}
} }
@ -791,29 +810,40 @@ PropagateNodeWideObjectsCommandList()
* We also update the local group id here, as handling sequence dependencies * We also update the local group id here, as handling sequence dependencies
* requires it. * requires it.
*/ */
List * void
SyncDistributedObjectsCommandList(WorkerNode *workerNode) SyncDistributedObjects(List *workerNodeList, List **commandList)
{ {
List *commandList = NIL; List *connectionList = NIL;
/* first, establish new connections */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
/* /*
* Propagate node wide objects. It includes only roles for now. * Propagate node wide objects. It includes only roles for now.
*/ */
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); PropagateNodeWideObjects(connectionList, commandList);
/* /*
* Replicate all objects of the pg_dist_object to the remote node. * Replicate all objects of the pg_dist_object to the remote node.
*/ */
commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList( ReplicateAllObjectsToNodes(connectionList, commandList);
workerNode->workerName, workerNode->workerPort));
/* /*
* After creating each table, handle the inter table relationship between * After creating each table, handle the inter table relationship between
* those tables. * those tables.
*/ */
commandList = list_concat(commandList, InterTableRelationshipCommandList()); BuildInterTableRelationships(connectionList, commandList);
return commandList;
} }
@ -838,14 +868,9 @@ SyncDistributedObjectsToNodeList(List *workerNodeList)
Assert(ShouldPropagate()); Assert(ShouldPropagate());
List *commandList = SyncDistributedObjectsCommandList(linitial(workerNodeList));
/* send commands to new workers, the current user should be a superuser */ /* send commands to new workers, the current user should be a superuser */
Assert(superuser()); Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction( SyncDistributedObjects(workerNodeList, NULL);
workerNodeList,
CurrentUserName(),
commandList);
} }

View File

@ -52,7 +52,8 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *updateLocalGroupCommand = List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *dropMetadataCommandList = DropExistingMetadataCommandList(); List *dropMetadataCommandList = DropExistingMetadataCommandList();
List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode); List *syncDistObjCommands = NIL;
SyncDistributedObjects(NIL, &syncDistObjCommands);
List *dropNodeSnapshotCommands = NodeMetadataDropCommands(); List *dropNodeSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();

View File

@ -349,7 +349,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies);
extern bool ShouldPropagate(void); extern bool ShouldPropagate(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateAnyObject(List *addresses); extern bool ShouldPropagateAnyObject(List *addresses);
extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort); extern void ReplicateAllObjectsToNodes(List *connectionList, List **commandList);
/* Remaining metadata utility functions */ /* Remaining metadata utility functions */
extern Oid TableOwnerOid(Oid relationId); extern Oid TableOwnerOid(Oid relationId);

View File

@ -106,7 +106,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value); Datum value);
extern uint32 CountPrimariesWithMetadata(void); extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode); extern void SyncDistributedObjects(List *workerNodeList, List **commandList);
extern List * PgDistTableMetadataSyncCommandList(void); extern List * PgDistTableMetadataSyncCommandList(void);
/* Function declarations for worker node utilities */ /* Function declarations for worker node utilities */