Partition drop first

tx_metadata_sync
Onder Kalaci 2022-11-29 13:55:33 +01:00
parent 2907a1f93c
commit b102c6ce45
1 changed files with 70 additions and 16 deletions

View File

@ -715,17 +715,6 @@ PgDistTableMetadataSyncCommandList(void)
}
}
/* remove all dist table and object related metadata first */
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_PARTITIONS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_PLACEMENTS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_COLOCATION);
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
foreach_ptr(cacheEntry, propagatedTableList)
{
@ -867,14 +856,36 @@ SyncDistributedObjectsToNodeList(List *workerNodeList)
Assert(ShouldPropagate());
List *commandList = SyncDistributedObjectsCommandList(workerNode);
List *connectionList = NIL;
workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert (superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
workerNodesToSync,
CurrentUserName(),
commandList);
List *commandList = SyncDistributedObjectsCommandList(workerNode);
char *commandStr;
foreach_ptr(commandStr, commandList)
ExecuteRemoteCommandInConnectionList(connectionList, commandStr);
MultiConnection *connection;
foreach_ptr(connection, connectionList)
CloseConnection(connection);
}
@ -1194,6 +1205,8 @@ ActivateNodeList(List *nodeList)
*/
DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata);
SyncDistributedObjectsToNodeList(nodeToSyncMetadata);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeToSyncMetadata)
{
@ -1286,12 +1299,53 @@ DropExistingMetadataInOutsideTransaction(List *nodeList)
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_ALL_SHELL_TABLES_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PARTITIONS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_SHARDS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PLACEMENTS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_DISTRIBUTED_OBJECTS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_COLOCATION);
MultiConnection *connection;
foreach_ptr(connection, connectionList)
CloseConnection(connection);
}
static void
CreateObjcetsInOutsideTransaction(List *nodeList)
{
List *connectionList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert (superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
ExecuteRemoteCommandInConnectionList(connectionList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_ALL_SHELL_TABLES_COMMAND);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PARTITIONS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_SHARDS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PLACEMENTS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_DISTRIBUTED_OBJECTS);
ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_COLOCATION);
MultiConnection *connection;
foreach_ptr(connection, connectionList)
CloseConnection(connection);
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the