First commit to change APIs

metadata_sync_imp
Onder Kalaci 2022-11-28 09:40:33 +01:00
parent 68de2ce601
commit db9cebf295
5 changed files with 58 additions and 41 deletions

View File

@ -663,12 +663,21 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
EnsureSequentialModeMetadataOperations(); EnsureSequentialModeMetadataOperations();
char *userName = CurrentUserName(); char *userName = CurrentUserName();
List *dropMetadataCommandList = NIL;
/* /*
* Detach partitions, break dependencies between sequences and table then * Detach partitions, break dependencies between sequences and table then
* remove shell tables first. * remove shell tables first.
*/ */
List *dropMetadataCommandList = DetachPartitionCommandList();
List *detachPartitionCommandList = NIL;
DetachPartitionCommandList(&detachPartitionCommandList);
dropMetadataCommandList = list_concat(dropMetadataCommandList,
detachPartitionCommandList);
dropMetadataCommandList = lappend(dropMetadataCommandList, dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList, dropMetadataCommandList = lappend(dropMetadataCommandList,
@ -2657,10 +2666,9 @@ CreateTableMetadataOnWorkers(Oid relationId)
* an extra step, if there are no partitions to DETACH, this function simply returns * an extra step, if there are no partitions to DETACH, this function simply returns
* empty list to not disable/enable DDL propagation for nothing. * empty list to not disable/enable DDL propagation for nothing.
*/ */
List * void
DetachPartitionCommandList(void) DetachPartitionCommandList(List **detachPartitionCommandList)
{ {
List *detachPartitionCommandList = NIL;
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
/* we iterate over all distributed partitioned tables and DETACH their partitions */ /* we iterate over all distributed partitioned tables and DETACH their partitions */
@ -2675,26 +2683,24 @@ DetachPartitionCommandList(void)
List *partitionList = PartitionList(cacheEntry->relationId); List *partitionList = PartitionList(cacheEntry->relationId);
List *detachCommands = List *detachCommands =
GenerateDetachPartitionCommandRelationIdList(partitionList); GenerateDetachPartitionCommandRelationIdList(partitionList);
detachPartitionCommandList = list_concat(detachPartitionCommandList, *detachPartitionCommandList = list_concat(*detachPartitionCommandList,
detachCommands); detachCommands);
} }
if (list_length(detachPartitionCommandList) == 0) if (list_length(*detachPartitionCommandList) == 0)
{ {
return NIL; return;
} }
detachPartitionCommandList = *detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList); lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList);
/* /*
* We probably do not need this but as an extra precaution, we are enabling * We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state. * DDL propagation to switch back to original state.
*/ */
detachPartitionCommandList = lappend(detachPartitionCommandList, *detachPartitionCommandList = lappend(*detachPartitionCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return detachPartitionCommandList;
} }

View File

@ -106,7 +106,7 @@ static void SyncPgDistTableMetadataToNodeList(List *nodeList);
static List * InterTableRelationshipCommandList(); static List * InterTableRelationshipCommandList();
static void BlockDistributedQueriesOnMetadataNodes(void); static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static List * PropagateNodeWideObjectsCommandList(); static void PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker); static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
@ -759,12 +759,9 @@ PgDistTableMetadataSyncCommandList(void)
* propagate any object that should be propagated for every node. These are * propagate any object that should be propagated for every node. These are
* generally not linked to any distributed object but change system wide behaviour. * generally not linked to any distributed object but change system wide behaviour.
*/ */
static List * static void
PropagateNodeWideObjectsCommandList() PropagateNodeWideObjectsCommandList(List **nodeWideObjectCommandList)
{ {
/* collect all commands */
List *ddlCommands = NIL;
if (EnableAlterRoleSetPropagation) if (EnableAlterRoleSetPropagation)
{ {
/* /*
@ -772,17 +769,18 @@ PropagateNodeWideObjectsCommandList()
* linked to any role that can be distributed we need to distribute them seperately * linked to any role that can be distributed we need to distribute them seperately
*/ */
List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
ddlCommands = list_concat(ddlCommands, alterRoleSetCommands); *nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList,
alterRoleSetCommands);
} }
if (list_length(ddlCommands) > 0) if (list_length(*nodeWideObjectCommandList) > 0)
{ {
/* if there are command wrap them in enable_ddl_propagation off */ /* if there are command wrap them in enable_ddl_propagation off */
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); *nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION,
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); *nodeWideObjectCommandList);
*nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList,
ENABLE_DDL_PROPAGATION);
} }
return ddlCommands;
} }
@ -802,36 +800,43 @@ PropagateNodeWideObjectsCommandList()
* requires it. * requires it.
*/ */
List * List *
SyncDistributedObjectsCommandList(WorkerNode *workerNode) SyncDistributedObjectsCommandList(WorkerNode *workerNode, List **commandList)
{ {
List *commandList = NIL;
/* /*
* Propagate node wide objects. It includes only roles for now. * Propagate node wide objects. It includes only roles for now.
*/ */
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); List *nodeWideObjectCommandList = NIL;
PropagateNodeWideObjectsCommandList(&nodeWideObjectCommandList);
*commandList = list_concat(*commandList, nodeWideObjectCommandList);
/* /*
* Detach partitions, break dependencies between sequences and table then * Detach partitions, break dependencies between sequences and table then
* remove shell tables first. * remove shell tables first.
*/ */
commandList = list_concat(commandList, DetachPartitionCommandList()); List *detachPartitionCommandList = NIL;
commandList = lappend(commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND); DetachPartitionCommandList(&detachPartitionCommandList);
*commandList = list_concat(*commandList, detachPartitionCommandList);
*commandList = lappend(*commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
*commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
/* /*
* Replicate all objects of the pg_dist_object to the remote node. * Replicate all objects of the pg_dist_object to the remote node.
*/ */
commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList( *commandList = list_concat(*commandList, ReplicateAllObjectsToNodeCommandList(
workerNode->workerName, workerNode->workerPort)); workerNode->workerName, workerNode->workerPort));
/* /*
* After creating each table, handle the inter table relationship between * After creating each table, handle the inter table relationship between
* those tables. * those tables.
*/ */
commandList = list_concat(commandList, InterTableRelationshipCommandList()); *commandList = list_concat(*commandList, InterTableRelationshipCommandList());
return commandList; return *commandList;
} }
@ -875,7 +880,9 @@ SyncDistributedObjectsToNodeList(List *workerNodeList)
Assert(ShouldPropagate()); Assert(ShouldPropagate());
List *commandList = SyncDistributedObjectsCommandList(workerNode); List *commandList = NIL;
SyncDistributedObjectsCommandList(workerNode, &commandList);
/* send commands to new workers, the current user should be a superuser */ /* send commands to new workers, the current user should be a superuser */
Assert(superuser()); Assert(superuser());

View File

@ -51,7 +51,10 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *updateLocalGroupCommand = List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode);
List *syncDistObjCommands = NIL;
SyncDistributedObjectsCommandList(dummyWorkerNode, &syncDistObjCommands);
List *dropSnapshotCommands = NodeMetadataDropCommands(); List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();

View File

@ -90,7 +90,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId);
extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId);
extern List * DetachPartitionCommandList(void); extern void DetachPartitionCommandList(List **detachPartitionCommandList);
extern void SyncNodeMetadataToNodes(void); extern void SyncNodeMetadataToNodes(void);
extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner); extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner);
extern void SyncNodeMetadataToNodesMain(Datum main_arg); extern void SyncNodeMetadataToNodesMain(Datum main_arg);

View File

@ -105,7 +105,8 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value); Datum value);
extern uint32 CountPrimariesWithMetadata(void); extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode); extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode,
List **commandList);
extern List * PgDistTableMetadataSyncCommandList(void); extern List * PgDistTableMetadataSyncCommandList(void);
/* Function declarations for worker node utilities */ /* Function declarations for worker node utilities */