sync objects without coordinated tx

metadata_sync_final
Onder Kalaci 2022-12-01 17:41:10 +01:00
parent 386a1f7889
commit a32aee8b28
4 changed files with 76 additions and 4 deletions

View File

@ -585,6 +585,17 @@ ReplicateAllObjectsToNodes(List *connectionList, List **commandList)
continue; continue;
} }
if (getObjectClass(dependency) == OCLASS_CLASS)
{
char relKind = get_rel_relkind(dependency->objectId);
if (
relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE ||
relKind == RELKIND_FOREIGN_TABLE)
{
char *q = CreateDropTableIfExistsCommand(dependency->objectId);
ExecuteRemoteCommandInConnectionList(connectionList, q);
}
}
List *commandsForDep = GetDependencyCreateDDLCommands(dependency); List *commandsForDep = GetDependencyCreateDDLCommands(dependency);
char *command = NULL; char *command = NULL;
foreach_ptr(command, commandsForDep) foreach_ptr(command, commandsForDep)

View File

@ -3284,6 +3284,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
static void static void
EnsureCoordinatorInitiatedOperation(void) EnsureCoordinatorInitiatedOperation(void)
{ {
return;
/* /*
* We are restricting the operation to only MX workers with the local group id * We are restricting the operation to only MX workers with the local group id
* check. The other two checks are to ensure that the operation is initiated * check. The other two checks are to ensure that the operation is initiated
@ -3908,6 +3910,32 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId)
} }
char *
CreateDropTableIfExistsCommand(Oid relationId)
{
char *schemaName = get_namespace_name(get_rel_namespace(relationId));
char *relationName = get_rel_name(relationId);
StringInfo workerDropQuery = makeStringInfo();
const char *quotedShardName = quote_qualified_identifier(schemaName,
relationName);
if (IsForeignTable(relationId))
{
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND,
quotedShardName);
}
else
{
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
quotedShardName);
}
return workerDropQuery->data;
}
/* /*
* ColocationGroupDeleteCommand returns a command for deleting a colocation group. * ColocationGroupDeleteCommand returns a command for deleting a colocation group.
*/ */

View File

@ -849,6 +849,13 @@ SyncDistributedObjects(List *workerNodeList, List **commandList)
* those tables. * those tables.
*/ */
BuildInterTableRelationships(connectionList, commandList); BuildInterTableRelationships(connectionList, commandList);
/* finally, close the connections as we don't need them anymore */
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
} }
@ -927,10 +934,35 @@ SyncPgDistTableMetadataToNodeList(List *nodeList)
} }
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
nodesWithMetadata, List *connectionList = NIL;
CurrentUserName(),
syncPgDistMetadataCommandList); /* first, establish new connections */
workerNode = NULL;
foreach_ptr(workerNode, nodesWithMetadata)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
char *command = NULL;
foreach_ptr(command, syncPgDistMetadataCommandList)
{
ExecuteRemoteCommandInConnectionList(connectionList, command);
}
/* finally, close the connections as we don't need them anymore */
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
} }

View File

@ -110,6 +110,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
Oid distributionColumType, Oid distributionColumType,
Oid distributionColumnCollation); Oid distributionColumnCollation);
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
extern char * CreateDropTableIfExistsCommand(Oid relationId);
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"