From b102c6ce45b5b9911dad3aaf6e7bc7090a52d31b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 29 Nov 2022 13:55:33 +0100 Subject: [PATCH] Partition drop first --- .../distributed/metadata/node_metadata.c | 86 +++++++++++++++---- 1 file changed, 70 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 28e6ff7d9..59cc85cbb 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -715,17 +715,6 @@ PgDistTableMetadataSyncCommandList(void) } } - /* remove all dist table and object related metadata first */ - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_PARTITIONS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_PLACEMENTS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_COLOCATION); - /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ foreach_ptr(cacheEntry, propagatedTableList) { @@ -867,14 +856,36 @@ SyncDistributedObjectsToNodeList(List *workerNodeList) Assert(ShouldPropagate()); - List *commandList = SyncDistributedObjectsCommandList(workerNode); + + List *connectionList = NIL; + + workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert (superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + + connectionList = lappend(connectionList, connection); + } + /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - workerNodesToSync, - CurrentUserName(), - commandList); + + List *commandList = SyncDistributedObjectsCommandList(workerNode); + + char *commandStr; + foreach_ptr(commandStr, commandList) + ExecuteRemoteCommandInConnectionList(connectionList, commandStr); + + MultiConnection *connection; + foreach_ptr(connection, connectionList) + CloseConnection(connection); + } @@ -1194,6 +1205,8 @@ ActivateNodeList(List *nodeList) */ DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata); + SyncDistributedObjectsToNodeList(nodeToSyncMetadata); + WorkerNode *workerNode = NULL; foreach_ptr(workerNode, nodeToSyncMetadata) { @@ -1286,12 +1299,53 @@ DropExistingMetadataInOutsideTransaction(List *nodeList) ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND); ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_ALL_SHELL_TABLES_COMMAND); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PARTITIONS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_SHARDS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PLACEMENTS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_DISTRIBUTED_OBJECTS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_COLOCATION); + MultiConnection *connection; foreach_ptr(connection, connectionList) CloseConnection(connection); } +static void +CreateObjcetsInOutsideTransaction(List *nodeList) +{ + List *connectionList = NIL; + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert (superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + + connectionList = lappend(connectionList, connection); + } + + + ExecuteRemoteCommandInConnectionList(connectionList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); + + ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND); + ExecuteRemoteCommandInConnectionList(connectionList, REMOVE_ALL_SHELL_TABLES_COMMAND); + + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PARTITIONS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_SHARDS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_PLACEMENTS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_DISTRIBUTED_OBJECTS); + ExecuteRemoteCommandInConnectionList(connectionList, DELETE_ALL_COLOCATION); + + MultiConnection *connection; + foreach_ptr(connection, connectionList) + CloseConnection(connection); +} + /* * ActivateNode activates the node with nodeName and nodePort. Currently, activation * includes only replicating the reference tables and setting isactive column of the