metadata_sync_imp
Onder Kalaci 2022-11-28 16:17:53 +01:00
parent 38ffdab244
commit 93a446254c
5 changed files with 97 additions and 41 deletions

View File

@ -537,7 +537,9 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
{
/* since we are executing ddl commands disable propagation first, primarily for mx */
if (ddlCommands != NULL)
*ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);
{
*ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);
}
/*
* collect all dependencies in creation order and get their ddl commands
@ -581,18 +583,29 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
}
List *perObjCommands = GetDependencyCreateDDLCommands(dependency);
if (ddlCommands != NULL)
*ddlCommands = list_concat(*ddlCommands,
perObjCommands);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), perObjCommands);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
perObjCommands);
}
if (ddlCommands != NULL)
{
*ddlCommands = list_concat(*ddlCommands,
perObjCommands);
}
else
{
list_free_deep(perObjCommands);
}
}
if (ddlCommands != NULL)
*ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION);
{
*ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION);
}
}

View File

@ -2667,7 +2667,8 @@ CreateTableMetadataOnWorkers(Oid relationId)
* empty list to not disable/enable DDL propagation for nothing.
*/
void
DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPartitionCommandList)
DetachPartitionCommandList(List *nodeToSyncMetadataConnections,
List **detachPartitionCommandList)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
@ -2692,10 +2693,12 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPar
Assert(PartitionTable(partitionRelOid));
char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(detachCommand));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
list_make1(
detachCommand));
}
if (detachPartitionCommandList != NULL)
@ -2704,8 +2707,9 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPar
lappend(*detachPartitionCommandList, detachCommand);
}
else
{
pfree(detachCommand);
}
}
}

View File

@ -680,20 +680,34 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List *commandListForRelation =
InterTableRelationshipOfRelationCommandList(relationId);
*multipleTableIntegrationCommandList = list_concat(
*multipleTableIntegrationCommandList,
commandListForRelation);
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), commandListForRelation);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
commandListForRelation);
}
if (multipleTableIntegrationCommandList != NULL)
{
*multipleTableIntegrationCommandList = list_concat(
*multipleTableIntegrationCommandList,
commandListForRelation);
}
else
{
list_free_deep(commandListForRelation);
}
}
*multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION,
*multipleTableIntegrationCommandList);
*multipleTableIntegrationCommandList = lappend(*multipleTableIntegrationCommandList,
ENABLE_DDL_PROPAGATION);
if (multipleTableIntegrationCommandList != NULL)
{
*multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION,
*multipleTableIntegrationCommandList);
*multipleTableIntegrationCommandList = lappend(
*multipleTableIntegrationCommandList,
ENABLE_DDL_PROPAGATION);
}
}
@ -702,7 +716,8 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
* (except pg_dist_node) metadata. We call them as table metadata.
*/
void
PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **metadataSnapshotCommandList)
PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List **metadataSnapshotCommandList)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
@ -731,7 +746,10 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **m
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), *metadataSnapshotCommandList);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
*
metadataSnapshotCommandList);
}
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
@ -740,7 +758,9 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **m
List *tableMetadataCreateCommandList =
CitusTableMetadataCreateCommandList(cacheEntry->relationId);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), tableMetadataCreateCommandList);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
tableMetadataCreateCommandList);
*metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
tableMetadataCreateCommandList);
@ -771,7 +791,6 @@ static void
PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
List **nodeWideObjectCommandList)
{
bool hasObjects = false;
if (EnableAlterRoleSetPropagation)
{
/*
@ -794,7 +813,9 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), alterRoleSetCommands);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
alterRoleSetCommands);
}
}
}
@ -831,15 +852,23 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co
DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList);
//if (commandList != NULL)
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
list_make1(
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
list_make1(
REMOVE_ALL_SHELL_TABLES_COMMAND));
}
if (list_length(nodeToSyncMetadataConnections) != 0)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(REMOVE_ALL_SHELL_TABLES_COMMAND));
}
if (commandList != NULL)
{
*commandList = lappend(*commandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
*commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
}
/*
@ -849,17 +878,22 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co
ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections,
&replicateAllObjectsToNodeCommandList);
if (commandList != NULL)
*commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList);
{
*commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList);
}
/*
* After creating each table, handle the inter table relationship between
* those tables.
*/
List *interTableRelationshipCommandList = NIL;
InterTableRelationshipCommandList(nodeToSyncMetadataConnections, &interTableRelationshipCommandList);
InterTableRelationshipCommandList(nodeToSyncMetadataConnections,
&interTableRelationshipCommandList);
if (commandList != NULL)
*commandList = list_concat(*commandList, interTableRelationshipCommandList);
{
*commandList = list_concat(*commandList, interTableRelationshipCommandList);
}
}
@ -875,7 +909,6 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co
static void
SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections)
{
if (nodeToSyncMetadataConnections == NIL)
{
return;
@ -924,7 +957,8 @@ SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections)
Assert(superuser());
List *syncPgDistMetadataCommandList = NIL;
PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, &syncPgDistMetadataCommandList);
PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections,
&syncPgDistMetadataCommandList);
}
@ -1187,7 +1221,8 @@ ActivateNodeList(List *nodeList)
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode);
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) &&
!NodeIsCoordinator(workerNode);
if (syncMetadata)
{
/*
@ -1207,14 +1242,18 @@ ActivateNodeList(List *nodeList)
MultiConnection *connection =
GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName, node->workerPort);
GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName,
node->workerPort);
ClaimConnectionExclusively(connection);
nodeToSyncMetadataConnections =
lappend(nodeToSyncMetadataConnections, connection);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(DISABLE_DDL_PROPAGATION));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
list_make1(
DISABLE_DDL_PROPAGATION));
}
}

View File

@ -59,7 +59,7 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = NIL;
PgDistTableMetadataSyncCommandList(&pgDistTableMetadataSyncCommands);
PgDistTableMetadataSyncCommandList(NIL, &pgDistTableMetadataSyncCommands);
List *activateNodeCommandList = NIL;
int activateNodeCommandIndex = 0;

View File

@ -106,7 +106,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections,
List **commandList);
List **commandList);
extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List **metadataSnapshotCommandList);