Forth commit to change APIs

metadata_sync_imp
Onder Kalaci 2022-11-28 10:01:19 +01:00
parent 793f729808
commit be1c5016c5
3 changed files with 29 additions and 27 deletions

View File

@ -695,12 +695,11 @@ InterTableRelationshipCommandList(List **multipleTableIntegrationCommandList)
* PgDistTableMetadataSyncCommandList returns the command list to sync the pg_dist_* * PgDistTableMetadataSyncCommandList returns the command list to sync the pg_dist_*
* (except pg_dist_node) metadata. We call them as table metadata. * (except pg_dist_node) metadata. We call them as table metadata.
*/ */
List * void
PgDistTableMetadataSyncCommandList(void) PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList)
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
List *metadataSnapshotCommandList = NIL;
/* create the list of tables whose metadata will be created */ /* create the list of tables whose metadata will be created */
CitusTableCacheEntry *cacheEntry = NULL; CitusTableCacheEntry *cacheEntry = NULL;
@ -713,15 +712,16 @@ PgDistTableMetadataSyncCommandList(void)
} }
/* remove all dist table and object related metadata first */ /* remove all dist table and object related metadata first */
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_PARTITIONS); DELETE_ALL_PARTITIONS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS); *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS);
DELETE_ALL_PLACEMENTS); *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_PLACEMENTS);
DELETE_ALL_DISTRIBUTED_OBJECTS); *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS);
DELETE_ALL_COLOCATION); *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_COLOCATION);
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
@ -729,25 +729,23 @@ PgDistTableMetadataSyncCommandList(void)
List *tableMetadataCreateCommandList = List *tableMetadataCreateCommandList =
CitusTableMetadataCreateCommandList(cacheEntry->relationId); CitusTableMetadataCreateCommandList(cacheEntry->relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
tableMetadataCreateCommandList); tableMetadataCreateCommandList);
} }
/* commands to insert pg_dist_colocation entries */ /* commands to insert pg_dist_colocation entries */
List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList(); List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList();
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
colocationGroupSyncCommandList); colocationGroupSyncCommandList);
List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList(); List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList();
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
distributedObjectSyncCommandList); distributedObjectSyncCommandList);
metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION, *metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION,
metadataSnapshotCommandList); *metadataSnapshotCommandList);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return metadataSnapshotCommandList;
} }
@ -942,7 +940,9 @@ SyncPgDistTableMetadataToNodeList(List *nodeList)
return; return;
} }
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); List *syncPgDistMetadataCommandList = NIL;
PgDistTableMetadataSyncCommandList(&syncPgDistMetadataCommandList);
SendMetadataCommandListToWorkerListInCoordinatedTransaction( SendMetadataCommandListToWorkerListInCoordinatedTransaction(
nodesWithMetadata, nodesWithMetadata,
CurrentUserName(), CurrentUserName(),

View File

@ -57,7 +57,9 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *dropSnapshotCommands = NodeMetadataDropCommands(); List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();
List *pgDistTableMetadataSyncCommands = NIL;
PgDistTableMetadataSyncCommandList(&pgDistTableMetadataSyncCommands);
List *activateNodeCommandList = NIL; List *activateNodeCommandList = NIL;
int activateNodeCommandIndex = 0; int activateNodeCommandIndex = 0;

View File

@ -107,7 +107,7 @@ extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode, extern List * SyncDistributedObjectsCommandList(WorkerNode *workerNode,
List **commandList); List **commandList);
extern List * PgDistTableMetadataSyncCommandList(void); extern void PgDistTableMetadataSyncCommandList(List **metadataSnapshotCommandList);
/* Function declarations for worker node utilities */ /* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);