From a32aee8b2815fa53719b94848ef6ec78f2fd7c65 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 1 Dec 2022 17:41:10 +0100 Subject: [PATCH] sync objects without coordinated tx --- .../distributed/commands/dependencies.c | 11 +++++ .../distributed/metadata/metadata_sync.c | 28 +++++++++++++ .../distributed/metadata/node_metadata.c | 40 +++++++++++++++++-- src/include/distributed/metadata_sync.h | 1 + 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 5a795cffb..853b9ccde 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -585,6 +585,17 @@ ReplicateAllObjectsToNodes(List *connectionList, List **commandList) continue; } + if (getObjectClass(dependency) == OCLASS_CLASS) + { + char relKind = get_rel_relkind(dependency->objectId); + if ( + relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE || + relKind == RELKIND_FOREIGN_TABLE) + { + char *q = CreateDropTableIfExistsCommand(dependency->objectId); + ExecuteRemoteCommandInConnectionList(connectionList, q); + } + } List *commandsForDep = GetDependencyCreateDDLCommands(dependency); char *command = NULL; foreach_ptr(command, commandsForDep) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e1e8d6b96..07ed45375 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -3284,6 +3284,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) static void EnsureCoordinatorInitiatedOperation(void) { + return; + /* * We are restricting the operation to only MX workers with the local group id * check. The other two checks are to ensure that the operation is initiated @@ -3908,6 +3910,32 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId) } +char * +CreateDropTableIfExistsCommand(Oid relationId) +{ + char *schemaName = get_namespace_name(get_rel_namespace(relationId)); + char *relationName = get_rel_name(relationId); + + StringInfo workerDropQuery = makeStringInfo(); + + const char *quotedShardName = quote_qualified_identifier(schemaName, + relationName); + if (IsForeignTable(relationId)) + { + appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, + quotedShardName); + } + else + { + appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, + quotedShardName); + } + + + return workerDropQuery->data; +} + + /* * ColocationGroupDeleteCommand returns a command for deleting a colocation group. */ diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 7c5f8d53e..62314e56f 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -849,6 +849,13 @@ SyncDistributedObjects(List *workerNodeList, List **commandList) * those tables. */ BuildInterTableRelationships(connectionList, commandList); + + /* finally, close the connections as we don't need them anymore */ + MultiConnection *connection; + foreach_ptr(connection, connectionList) + { + CloseConnection(connection); + } } @@ -927,10 +934,35 @@ SyncPgDistTableMetadataToNodeList(List *nodeList) } List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - nodesWithMetadata, - CurrentUserName(), - syncPgDistMetadataCommandList); + + List *connectionList = NIL; + + /* first, establish new connections */ + workerNode = NULL; + foreach_ptr(workerNode, nodesWithMetadata) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert(superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + + connectionList = lappend(connectionList, connection); + } + + char *command = NULL; + foreach_ptr(command, syncPgDistMetadataCommandList) + { + ExecuteRemoteCommandInConnectionList(connectionList, command); + } + + /* finally, close the connections as we don't need them anymore */ + MultiConnection *connection; + foreach_ptr(connection, connectionList) + { + CloseConnection(connection); + } } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index db63c46d6..08cc585ab 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -110,6 +110,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, Oid distributionColumType, Oid distributionColumnCollation); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); +extern char * CreateDropTableIfExistsCommand(Oid relationId); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"