metadata_sync_imp
Onder Kalaci 2022-11-28 13:02:03 +01:00
parent be1c5016c5
commit 38ffdab244
7 changed files with 151 additions and 124 deletions

View File

@ -532,10 +532,11 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
* clusterHasDistributedFunction if there are any distributed functions.
*/
void
ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort,
ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommands)
{
/* since we are executing ddl commands disable propagation first, primarily for mx */
if (ddlCommands != NULL)
*ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);
/*
@ -559,8 +560,8 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort,
*/
if (list_length(dependencies) > 100)
{
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName,
nodePort),
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", "lll",
5555),
errdetail("There are %d objects to replicate, depending on your "
"environment this might take a while",
list_length(dependencies))));
@ -579,10 +580,18 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort,
continue;
}
List *perObjCommands = GetDependencyCreateDDLCommands(dependency);
if (ddlCommands != NULL)
*ddlCommands = list_concat(*ddlCommands,
GetDependencyCreateDDLCommands(dependency));
perObjCommands);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), perObjCommands);
}
}
if (ddlCommands != NULL)
*ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION);
}

View File

@ -672,7 +672,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
List *detachPartitionCommandList = NIL;
DetachPartitionCommandList(&detachPartitionCommandList);
DetachPartitionCommandList(NIL, &detachPartitionCommandList);
dropMetadataCommandList = list_concat(dropMetadataCommandList,
detachPartitionCommandList);
@ -2667,40 +2667,65 @@ CreateTableMetadataOnWorkers(Oid relationId)
* empty list to not disable/enable DDL propagation for nothing.
*/
void
DetachPartitionCommandList(List **detachPartitionCommandList)
DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPartitionCommandList)
{
List *distributedTableList = CitusTableList();
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
bool foundAnyPartitionedTable = false;
/* we iterate over all distributed partitioned tables and DETACH their partitions */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIdList)
{
if (!PartitionedTable(cacheEntry->relationId))
if (!PartitionedTable(relationId))
{
continue;
}
List *partitionList = PartitionList(cacheEntry->relationId);
List *detachCommands =
GenerateDetachPartitionCommandRelationIdList(partitionList);
*detachPartitionCommandList = list_concat(*detachPartitionCommandList,
detachCommands);
List *partitionList = PartitionList(relationId);
Oid partitionRelOid = InvalidOid;
foreach_oid(partitionRelOid, partitionList)
{
foundAnyPartitionedTable = true;
Assert(PartitionTable(partitionRelOid));
char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(detachCommand));
}
if (detachPartitionCommandList != NULL)
{
*detachPartitionCommandList =
lappend(*detachPartitionCommandList, detachCommand);
}
else
pfree(detachCommand);
}
}
if (list_length(*detachPartitionCommandList) == 0)
if (!foundAnyPartitionedTable)
{
return;
}
*detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList);
if (detachPartitionCommandList != NULL)
{
*detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state.
*/
*detachPartitionCommandList = lappend(*detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state.
*/
*detachPartitionCommandList = lappend(*detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
}
}

View File

@ -102,11 +102,13 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsToNodeList(List *workerNodeList);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeList);
static void InterTableRelationshipCommandList(List **ddlCommandList);
static void SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections);
static void InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommandList);
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList);
static void PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
List **nodeWideObjectCommandList);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
@ -653,35 +655,39 @@ master_set_node_property(PG_FUNCTION_ARGS)
* for each citus table.
*/
static void
InterTableRelationshipCommandList(List **multipleTableIntegrationCommandList)
InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List **multipleTableIntegrationCommandList)
{
List *distributedTableList = CitusTableList();
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
List *propagatedTableList = NIL;
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIdList)
{
/*
* Skip foreign key and partition creation when we shouldn't need to sync
* tablem metadata or the Citus table is owned by an extension.
*/
if (ShouldSyncTableMetadata(cacheEntry->relationId) &&
!IsTableOwnedByExtension(cacheEntry->relationId))
if (ShouldSyncTableMetadataViaCatalog(relationId) &&
!IsTableOwnedByExtension(relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
propagatedTableList = lappend_oid(propagatedTableList, relationId);
}
}
foreach_ptr(cacheEntry, propagatedTableList)
foreach_oid(relationId, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
List *commandListForRelation =
InterTableRelationshipOfRelationCommandList(relationId);
*multipleTableIntegrationCommandList = list_concat(
*multipleTableIntegrationCommandList,
commandListForRelation);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), commandListForRelation);
}
}
*multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION,
@ -696,7 +702,7 @@ InterTableRelationshipCommandList(List **multipleTableIntegrationCommandList)
* (except pg_dist_node) metadata. We call them as table metadata.
*/
void
PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList)
PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **metadataSnapshotCommandList)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
@ -723,12 +729,19 @@ PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList)
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_COLOCATION);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), *metadataSnapshotCommandList);
}
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
foreach_ptr(cacheEntry, propagatedTableList)
{
List *tableMetadataCreateCommandList =
CitusTableMetadataCreateCommandList(cacheEntry->relationId);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), tableMetadataCreateCommandList);
*metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
tableMetadataCreateCommandList);
}
@ -755,8 +768,10 @@ PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList)
* generally not linked to any distributed object but change system wide behaviour.
*/
static void
PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList)
PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
List **nodeWideObjectCommandList)
{
bool hasObjects = false;
if (EnableAlterRoleSetPropagation)
{
/*
@ -764,17 +779,23 @@ PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList)
* linked to any role that can be distributed we need to distribute them seperately
*/
List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
*nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList,
alterRoleSetCommands);
}
if (list_length(*nodeWideObjectCommandList) > 0)
{
/* if there are command wrap them in enable_ddl_propagation off */
*nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION,
*nodeWideObjectCommandList);
*nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList,
ENABLE_DDL_PROPAGATION);
if (nodeWideObjectCommandList != NULL)
{
*nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList,
alterRoleSetCommands);
/* if there are command wrap them in enable_ddl_propagation off */
*nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION,
*nodeWideObjectCommandList);
*nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList,
ENABLE_DDL_PROPAGATION);
}
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), alterRoleSetCommands);
}
}
}
@ -794,37 +815,40 @@ PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList)
* We also update the local group id here, as handling sequence dependencies
* requires it.
*/
List *
SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList)
void
SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **commandList)
{
/*
* Propagate node wide objects. It includes only roles for now.
*/
List *nodeWideObjectCommandList = NIL;
PropagateNodeWideObjectsCommandList(&nodeWideObjectCommandList);
*commandList = list_concat(*commandList, nodeWideObjectCommandList);
PropagateNodeWideObjectsCommandList(nodeToSyncMetadataConnections, commandList);
/*
* Detach partitions, break dependencies between sequences and table then
* remove shell tables first.
*/
List *detachPartitionCommandList = NIL;
DetachPartitionCommandList(&detachPartitionCommandList);
*commandList = list_concat(*commandList, detachPartitionCommandList);
DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList);
*commandList = lappend(*commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
*commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
//if (commandList != NULL)
{
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(REMOVE_ALL_SHELL_TABLES_COMMAND));
}
}
/*
* Replicate all objects of the pg_dist_object to the remote node.
*/
List *replicateAllObjectsToNodeCommandList = NIL;
ReplicateAllObjectsToNodeCommandList(workerNode->workerName, workerNode->workerPort,
ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections,
&replicateAllObjectsToNodeCommandList);
if (commandList != NULL)
*commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList);
/*
@ -832,12 +856,10 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList)
* those tables.
*/
List *interTableRelationshipCommandList = NIL;
InterTableRelationshipCommandList(&interTableRelationshipCommandList);
InterTableRelationshipCommandList(nodeToSyncMetadataConnections, &interTableRelationshipCommandList);
if (commandList != NULL)
*commandList = list_concat(*commandList, interTableRelationshipCommandList);
return *commandList;
}
@ -851,28 +873,10 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList)
* since all the dependencies should be present in the coordinator already.
*/
static void
SyncDistributedObjectsToNodeList(List *workerNodeList)
SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections)
{
List *workerNodesToSync = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (NodeIsCoordinator(workerNode))
{
/* coordinator has all the objects */
continue;
}
if (!NodeIsPrimary(workerNode))
{
/* secondary nodes gets the objects from their primaries via replication */
continue;
}
workerNodesToSync = lappend(workerNodesToSync, workerNode);
}
if (workerNodesToSync == NIL)
if (nodeToSyncMetadataConnections == NIL)
{
return;
}
@ -881,16 +885,10 @@ SyncDistributedObjectsToNodeList(List *workerNodeList)
Assert(ShouldPropagate());
List *commandList = NIL;
SyncDistributedObjectsCommandList(workerNode, &commandList);
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
workerNodesToSync,
CurrentUserName(),
commandList);
SyncDistributedObjectsCommandList(nodeToSyncMetadataConnections, NULL);
}
@ -920,33 +918,13 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
*
*/
static void
SyncPgDistTableMetadataToNodeList(List *nodeList)
SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections)
{
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
List *nodesWithMetadata = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
nodesWithMetadata = lappend(nodesWithMetadata, workerNode);
}
}
if (nodesWithMetadata == NIL)
{
return;
}
List *syncPgDistMetadataCommandList = NIL;
PgDistTableMetadataSyncCommandList(&syncPgDistMetadataCommandList);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
nodesWithMetadata,
CurrentUserName(),
syncPgDistMetadataCommandList);
PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, &syncPgDistMetadataCommandList);
}
@ -1168,6 +1146,8 @@ ActivateNodeList(List *nodeList)
List *nodeToSyncMetadata = NIL;
List *nodeToSyncMetadataConnections = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
@ -1207,7 +1187,7 @@ ActivateNodeList(List *nodeList)
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode);
if (syncMetadata)
{
/*
@ -1224,6 +1204,17 @@ ActivateNodeList(List *nodeList)
UpdateLocalGroupIdOnNode(workerNode);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
MultiConnection *connection =
GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName, node->workerPort);
ClaimConnectionExclusively(connection);
nodeToSyncMetadataConnections =
lappend(nodeToSyncMetadataConnections, connection);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(DISABLE_DDL_PROPAGATION));
}
}
@ -1232,7 +1223,7 @@ ActivateNodeList(List *nodeList)
* replicating reference tables to the remote node, as reference tables may
* need such objects.
*/
SyncDistributedObjectsToNodeList(nodeToSyncMetadata);
SyncDistributedObjectsToNodeList(nodeToSyncMetadataConnections);
/*
* Sync node metadata. We must sync node metadata before syncing table
@ -1249,7 +1240,7 @@ ActivateNodeList(List *nodeList)
* We must handle it as the last step because of limitations shared with
* above comments.
*/
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata);
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadataConnections);
foreach_ptr(node, nodeList)
{

View File

@ -53,7 +53,7 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *syncDistObjCommands = NIL;
SyncDistributedObjectsCommandList(dummyWorkerNode, &syncDistObjCommands);
SyncDistributedObjectsCommandList(NIL, &syncDistObjCommands);
List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands();

View File

@ -90,7 +90,8 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId);
extern List * InterTableRelationshipOfRelationCommandList(Oid relationId);
extern void DetachPartitionCommandList(List **detachPartitionCommandList);
extern void DetachPartitionCommandList(List *nodeToSyncMetadataConnections,
List **detachPartitionCommandList);
extern void SyncNodeMetadataToNodes(void);
extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner);
extern void SyncNodeMetadataToNodesMain(Datum main_arg);

View File

@ -343,7 +343,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies);
extern bool ShouldPropagate(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateAnyObject(List *addresses);
extern void ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort,
extern void ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommands);
/* Remaining metadata utility functions */

View File

@ -105,9 +105,10 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value);
extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode,
extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections,
List **commandList);
extern void PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList);
extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List **metadataSnapshotCommandList);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);