diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 823570621..787e5164d 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -532,15 +532,8 @@ GetAllDependencyCreateDDLCommands(const List *dependencies) * clusterHasDistributedFunction if there are any distributed functions. */ void -ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, - List **ddlCommands) +ReplicateAllObjectsToNodeCommandList(MetadataSyncContext syncContext) { - /* since we are executing ddl commands disable propagation first, primarily for mx */ - if (ddlCommands != NULL) - { - *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); - } - /* * collect all dependencies in creation order and get their ddl commands */ @@ -582,30 +575,23 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, continue; } + + List *perObjCommands = GetDependencyCreateDDLCommands(dependency); - if (list_length(nodeToSyncMetadataConnections) != 0) + if (syncContext.syncImmediately) { SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), + syncContext. + nodeConnectionList), perObjCommands); } - - if (ddlCommands != NULL) - { - *ddlCommands = list_concat(*ddlCommands, - perObjCommands); - } else { - list_free_deep(perObjCommands); + syncContext.ddlCommandList = + list_concat(syncContext.ddlCommandList, perObjCommands); } } - - if (ddlCommands != NULL) - { - *ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION); - } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b245e8ab9..a3145c73a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -226,10 +226,8 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) * start_metadata_sync_to_node(). */ void -SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) +SyncNodeMetadataToNode(MetadataSyncContext syncContext) { - char *escapedNodeName = quote_literal_cstr(nodeNameString); - CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureModificationsCanRun(); @@ -663,44 +661,44 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) EnsureSequentialModeMetadataOperations(); char *userName = CurrentUserName(); - List *dropMetadataCommandList = NIL; /* * Detach partitions, break dependencies between sequences and table then * remove shell tables first. */ - List *detachPartitionCommandList = NIL; + MetadataSyncContext syncContext; + syncContext.syncImmediately = false; - DetachPartitionCommandList(NIL, &detachPartitionCommandList); - - dropMetadataCommandList = list_concat(dropMetadataCommandList, - detachPartitionCommandList); + DetachPartitionCommandList(syncContext); - dropMetadataCommandList = lappend(dropMetadataCommandList, - BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); - dropMetadataCommandList = lappend(dropMetadataCommandList, - REMOVE_ALL_SHELL_TABLES_COMMAND); - dropMetadataCommandList = list_concat(dropMetadataCommandList, - NodeMetadataDropCommands()); - dropMetadataCommandList = lappend(dropMetadataCommandList, - LocalGroupIdUpdateCommand(0)); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + REMOVE_ALL_SHELL_TABLES_COMMAND); + syncContext.ddlCommandList = list_concat(syncContext.ddlCommandList, + NodeMetadataDropCommands()); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + LocalGroupIdUpdateCommand(0)); /* remove all dist table and object/table related metadata afterwards */ - dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PARTITIONS); - dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS); - dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PLACEMENTS); - dropMetadataCommandList = lappend(dropMetadataCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); - dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_COLOCATION); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + DELETE_ALL_PARTITIONS); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, DELETE_ALL_SHARDS); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + DELETE_ALL_PLACEMENTS); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + DELETE_ALL_DISTRIBUTED_OBJECTS); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + DELETE_ALL_COLOCATION); Assert(superuser()); SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( workerNode->workerName, workerNode->workerPort, userName, - dropMetadataCommandList); + syncContext.ddlCommandList); } @@ -2667,8 +2665,7 @@ CreateTableMetadataOnWorkers(Oid relationId) * empty list to not disable/enable DDL propagation for nothing. */ void -DetachPartitionCommandList(List *nodeToSyncMetadataConnections, - List **detachPartitionCommandList) +DetachPartitionCommandList(MetadataSyncContext syncContext) { List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); @@ -2693,42 +2690,31 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections, Assert(PartitionTable(partitionRelOid)); char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid); - if (list_length(nodeToSyncMetadataConnections) != 0) + if (syncContext.syncImmediately) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), - list_make1( - detachCommand)); - } + SyncMetadataCommands(syncContext, list_make1(detachCommand), false); - if (detachPartitionCommandList != NULL) - { - *detachPartitionCommandList = - lappend(*detachPartitionCommandList, detachCommand); + pfree(detachCommand); } else { - pfree(detachCommand); + syncContext.ddlCommandList = + lappend(syncContext.ddlCommandList, detachCommand); } } } - if (!foundAnyPartitionedTable) + if (foundAnyPartitionedTable && !syncContext.syncImmediately) { - return; - } - - if (detachPartitionCommandList != NULL) - { - *detachPartitionCommandList = - lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList); + syncContext.ddlCommandList = + lcons(DISABLE_DDL_PROPAGATION, syncContext.ddlCommandList); /* * We probably do not need this but as an extra precaution, we are enabling * DDL propagation to switch back to original state. */ - *detachPartitionCommandList = lappend(*detachPartitionCommandList, - ENABLE_DDL_PROPAGATION); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + ENABLE_DDL_PROPAGATION); } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index fcf0448de..8a4aee316 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -33,6 +33,7 @@ #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/metadata_sync_context.h" #include "distributed/multi_join_order.h" #include "distributed/multi_router_planner.h" #include "distributed/pg_dist_node.h" @@ -100,15 +101,13 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SyncDistributedObjectsToNodeList(List *workerNodeList); -static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); -static void SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections); -static void InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, - List **ddlCommandList); +static void SyncDistributedObjectsToNodeList(MetadataSyncContext syncContext); +static void UpdateLocalGroupIdOnNode(MultiConnection *connection, WorkerNode *workerNode); +static void SyncPgDistTableMetadataToNodeList(MetadataSyncContext syncContext); +static void InterTableRelationshipCommandList(MetadataSyncContext syncContext); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); -static void PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, - List **nodeWideObjectCommandList); +static void PropagateNodeWideObjectsCommandList(MetadataSyncContext syncContext); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); @@ -655,8 +654,7 @@ master_set_node_property(PG_FUNCTION_ARGS) * for each citus table. */ static void -InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, - List **multipleTableIntegrationCommandList) +InterTableRelationshipCommandList(MetadataSyncContext syncContext) { List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *propagatedTableList = NIL; @@ -680,32 +678,27 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, List *commandListForRelation = InterTableRelationshipOfRelationCommandList(relationId); - - if (list_length(nodeToSyncMetadataConnections) != 0) + if (syncContext.syncImmediately) { SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), + syncContext. + nodeConnectionList), commandListForRelation); } - - if (multipleTableIntegrationCommandList != NULL) - { - *multipleTableIntegrationCommandList = list_concat( - *multipleTableIntegrationCommandList, - commandListForRelation); - } else { - list_free_deep(commandListForRelation); + syncContext.ddlCommandList = list_concat( + syncContext.ddlCommandList, + commandListForRelation); } } - if (multipleTableIntegrationCommandList != NULL) + if (!syncContext.syncImmediately) { - *multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, - *multipleTableIntegrationCommandList); - *multipleTableIntegrationCommandList = lappend( - *multipleTableIntegrationCommandList, + syncContext.ddlCommandList = lcons(DISABLE_DDL_PROPAGATION, + syncContext.ddlCommandList); + syncContext.ddlCommandList = lappend( + syncContext.ddlCommandList, ENABLE_DDL_PROPAGATION); } } @@ -716,8 +709,7 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, * (except pg_dist_node) metadata. We call them as table metadata. */ void -PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, - List **metadataSnapshotCommandList) +PgDistTableMetadataSyncCommandList(MetadataSyncContext syncContext) { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -732,24 +724,27 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, } } - /* remove all dist table and object related metadata first */ - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - DELETE_ALL_PARTITIONS); - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - DELETE_ALL_SHARDS); - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - DELETE_ALL_PLACEMENTS); - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - DELETE_ALL_COLOCATION); - if (list_length(nodeToSyncMetadataConnections) != 0) + List *pgDistTableMetadata = NIL; + + /* remove all dist table and object related metadata first */ + pgDistTableMetadata = lappend(pgDistTableMetadata, + DELETE_ALL_PARTITIONS); + pgDistTableMetadata = lappend(pgDistTableMetadata, + DELETE_ALL_SHARDS); + pgDistTableMetadata = lappend(pgDistTableMetadata, + DELETE_ALL_PLACEMENTS); + pgDistTableMetadata = lappend(pgDistTableMetadata, + DELETE_ALL_DISTRIBUTED_OBJECTS); + pgDistTableMetadata = lappend(pgDistTableMetadata, + DELETE_ALL_COLOCATION); + + if (syncContext.syncImmediately) { SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), - * - metadataSnapshotCommandList); + syncContext. + nodeConnectionList), + pgDistTableMetadata); } /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ @@ -758,38 +753,105 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List *tableMetadataCreateCommandList = CitusTableMetadataCreateCommandList(cacheEntry->relationId); - SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), - tableMetadataCreateCommandList); + if (syncContext.syncImmediately) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + syncContext. + nodeConnectionList), + tableMetadataCreateCommandList); - *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList, - tableMetadataCreateCommandList); + list_free_deep(tableMetadataCreateCommandList); + } + else + { + pgDistTableMetadata = list_concat(pgDistTableMetadata, + tableMetadataCreateCommandList); + } } /* commands to insert pg_dist_colocation entries */ List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList(); - *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList, - colocationGroupSyncCommandList); + if (syncContext.syncImmediately) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + syncContext. + nodeConnectionList), + colocationGroupSyncCommandList); + + list_free_deep(colocationGroupSyncCommandList); + } + else + { + pgDistTableMetadata = list_concat(pgDistTableMetadata, + colocationGroupSyncCommandList); + } List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList(); - *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList, - distributedObjectSyncCommandList); + if (syncContext.syncImmediately) + { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + syncContext. + nodeConnectionList), + distributedObjectSyncCommandList); - *metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION, - *metadataSnapshotCommandList); - *metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList, - ENABLE_DDL_PROPAGATION); + list_free_deep(distributedObjectSyncCommandList); + } + else + { + pgDistTableMetadata = list_concat(pgDistTableMetadata, + distributedObjectSyncCommandList); + } + + if (!syncContext.syncImmediately) + { + pgDistTableMetadata = lcons(DISABLE_DDL_PROPAGATION, + pgDistTableMetadata); + pgDistTableMetadata = lappend(pgDistTableMetadata, + ENABLE_DDL_PROPAGATION); + + syncContext.ddlCommandList = list_concat(syncContext.ddlCommandList, + pgDistTableMetadata); + } } + +void +SyncMetadataCommands(MetadataSyncContext syncContext, List *commandList, + bool raiseErrors) +{ + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + MultiConnection *connection = NULL; + foreach_ptr(connection, syncContext.nodeConnectionList) + { + bool success = SendRemoteCommand(connection, commandString); + if (!success) + { + HandleRemoteTransactionConnectionError(connection, raiseErrors); + } + } + + WaitForAllConnections(syncContext.nodeConnectionList, raiseErrors); + } + + MultiConnection *connection = NULL; + foreach_ptr(connection, syncContext.nodeConnectionList) + { + ForgetResults(connection); + } + +} + /* * PropagateNodeWideObjectsCommandList is called during node activation to * propagate any object that should be propagated for every node. These are * generally not linked to any distributed object but change system wide behaviour. */ static void -PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, - List **nodeWideObjectCommandList) +PropagateNodeWideObjectsCommandList(MetadataSyncContext syncContext) { if (EnableAlterRoleSetPropagation) { @@ -799,23 +861,23 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, */ List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); - if (nodeWideObjectCommandList != NULL) + if (!syncContext.syncImmediately) { - *nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList, - alterRoleSetCommands); - /* if there are command wrap them in enable_ddl_propagation off */ - *nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION, - *nodeWideObjectCommandList); - *nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList, - ENABLE_DDL_PROPAGATION); - } + syncContext.ddlCommandList = + lappend(syncContext.ddlCommandList, DISABLE_DDL_PROPAGATION); - if (list_length(nodeToSyncMetadataConnections) != 0) + syncContext.ddlCommandList = + list_concat(syncContext.ddlCommandList, alterRoleSetCommands); + + + syncContext.ddlCommandList = + lappend(syncContext.ddlCommandList, ENABLE_DDL_PROPAGATION); + } + else { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), - alterRoleSetCommands); + bool raiseErrors = true; + SyncMetadataCommands(syncContext, alterRoleSetCommands, raiseErrors); } } } @@ -837,63 +899,51 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, * requires it. */ void -SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **commandList) +SyncDistributedObjectsCommandList(MetadataSyncContext syncContext) { /* * Propagate node wide objects. It includes only roles for now. */ - PropagateNodeWideObjectsCommandList(nodeToSyncMetadataConnections, commandList); - + PropagateNodeWideObjectsCommandList(syncContext); /* * Detach partitions, break dependencies between sequences and table then * remove shell tables first. */ - DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList); + DetachPartitionCommandList(syncContext); - if (list_length(nodeToSyncMetadataConnections) != 0) + if (syncContext.syncImmediately) { SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), + syncContext. + nodeConnectionList), list_make1( BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND)); SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), + syncContext. + nodeConnectionList), list_make1( REMOVE_ALL_SHELL_TABLES_COMMAND)); } - - if (commandList != NULL) + else { - *commandList = lappend(*commandList, - BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); - *commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); + syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, + REMOVE_ALL_SHELL_TABLES_COMMAND); } /* * Replicate all objects of the pg_dist_object to the remote node. */ - List *replicateAllObjectsToNodeCommandList = NIL; - ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections, - &replicateAllObjectsToNodeCommandList); - if (commandList != NULL) - { - *commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList); - } + ReplicateAllObjectsToNodeCommandList(syncContext); /* * After creating each table, handle the inter table relationship between * those tables. */ - List *interTableRelationshipCommandList = NIL; - InterTableRelationshipCommandList(nodeToSyncMetadataConnections, - &interTableRelationshipCommandList); - - if (commandList != NULL) - { - *commandList = list_concat(*commandList, interTableRelationshipCommandList); - } + InterTableRelationshipCommandList(syncContext); } @@ -907,13 +957,8 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co * since all the dependencies should be present in the coordinator already. */ static void -SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections) +SyncDistributedObjectsToNodeList(MetadataSyncContext syncContext) { - if (nodeToSyncMetadataConnections == NIL) - { - return; - } - EnsureSequentialModeMetadataOperations(); Assert(ShouldPropagate()); @@ -921,7 +966,7 @@ SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SyncDistributedObjectsCommandList(nodeToSyncMetadataConnections, NULL); + SyncDistributedObjectsCommandList(syncContext); } @@ -929,18 +974,14 @@ SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections) * UpdateLocalGroupIdOnNode updates local group id on node. */ static void -UpdateLocalGroupIdOnNode(WorkerNode *workerNode) +UpdateLocalGroupIdOnNode(MultiConnection *connection, WorkerNode *workerNode) { - if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) { List *commandList = list_make1(LocalGroupIdUpdateCommand(workerNode->groupId)); /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerListInCoordinatedTransaction( - list_make1(workerNode), - CurrentUserName(), - commandList); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList); } } @@ -951,14 +992,12 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) * */ static void -SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections) +SyncPgDistTableMetadataToNodeList(MetadataSyncContext syncContext) { /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - List *syncPgDistMetadataCommandList = NIL; - PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, - &syncPgDistMetadataCommandList); + PgDistTableMetadataSyncCommandList(syncContext); } @@ -1180,7 +1219,11 @@ ActivateNodeList(List *nodeList) List *nodeToSyncMetadata = NIL; - List *nodeToSyncMetadataConnections = NIL; + MetadataSyncContext syncContext; + + /* we don't need to collect any the ddl commands */ + syncContext.syncImmediately = true; + syncContext.nodeConnectionList = NIL; WorkerNode *node = NULL; foreach_ptr(node, nodeList) @@ -1232,11 +1275,6 @@ ActivateNodeList(List *nodeList) SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); - /* - * Update local group id first, as object dependency logic requires to have - * updated local group id. - */ - UpdateLocalGroupIdOnNode(workerNode); nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); @@ -1247,13 +1285,19 @@ ActivateNodeList(List *nodeList) ClaimConnectionExclusively(connection); - nodeToSyncMetadataConnections = - lappend(nodeToSyncMetadataConnections, connection); + syncContext.nodeConnectionList = + lappend(syncContext.nodeConnectionList, connection); - SendCommandListToWorkerOutsideTransactionWithConnection(linitial( - nodeToSyncMetadataConnections), + SendCommandListToWorkerOutsideTransactionWithConnection(connection, list_make1( DISABLE_DDL_PROPAGATION)); + + /* + * Update local group id first, as object dependency logic requires to have + * updated local group id. + */ + UpdateLocalGroupIdOnNode(connection, workerNode); + } } @@ -1262,24 +1306,22 @@ ActivateNodeList(List *nodeList) * replicating reference tables to the remote node, as reference tables may * need such objects. */ - SyncDistributedObjectsToNodeList(nodeToSyncMetadataConnections); + SyncDistributedObjectsToNodeList(syncContext); /* * Sync node metadata. We must sync node metadata before syncing table * related pg_dist_xxx metadata. Since table related metadata requires * to have right pg_dist_node entries. */ - foreach_ptr(node, nodeToSyncMetadata) - { - SyncNodeMetadataToNode(node->workerName, node->workerPort); - } + SyncNodeMetadataToNode(node->workerName, node->workerPort); + /* * As the last step, sync the table related metadata to the remote node. * We must handle it as the last step because of limitations shared with * above comments. */ - SyncPgDistTableMetadataToNodeList(nodeToSyncMetadataConnections); + SyncPgDistTableMetadataToNodeList(syncContext); foreach_ptr(node, nodeList) { diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 5709da66a..9166213bb 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -49,17 +49,21 @@ activate_node_snapshot(PG_FUNCTION_ARGS) */ WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode(); + List *updateLocalGroupCommand = list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); - List *syncDistObjCommands = NIL; - SyncDistributedObjectsCommandList(NIL, &syncDistObjCommands); + MetadataSyncContext syncContext = { false, NIL, NIL }; + + SyncDistributedObjectsCommandList(syncContext); + List *syncDistObjCommands = syncContext.ddlCommandList; List *dropSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); - List *pgDistTableMetadataSyncCommands = NIL; - PgDistTableMetadataSyncCommandList(NIL, &pgDistTableMetadataSyncCommands); + syncContext.ddlCommandList = NIL; + PgDistTableMetadataSyncCommandList(syncContext); + List *pgDistTableMetadataSyncCommands = syncContext.ddlCommandList; List *activateNodeCommandList = NIL; int activateNodeCommandIndex = 0; diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 7229f7c72..23e28534d 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -76,6 +76,7 @@ typedef struct DDLJob List *taskList; /* worker DDL tasks to execute */ } DDLJob; + extern ProcessUtility_hook_type PrevProcessUtility; extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString, diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 38e733185..de6b311a9 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -48,7 +48,6 @@ typedef struct SequenceInfo /* Functions declarations for metadata syncing */ -extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort); extern void SyncCitusTableMetadata(Oid relationId); extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); @@ -90,8 +89,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); -extern void DetachPartitionCommandList(List *nodeToSyncMetadataConnections, - List **detachPartitionCommandList); +extern void DetachPartitionCommandList(MetadataSyncContext syncContext); extern void SyncNodeMetadataToNodes(void); extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner); extern void SyncNodeMetadataToNodesMain(Datum main_arg); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index cdc7de79b..4b3e70c6c 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -264,6 +264,7 @@ typedef struct BackgroundTask } __nullable_storage; } BackgroundTask; + #define SET_NULLABLE_FIELD(ptr, field, value) \ (ptr)->__nullable_storage.field = (value); \ (ptr)->field = &((ptr)->__nullable_storage.field) @@ -343,8 +344,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies); extern bool ShouldPropagate(void); extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateAnyObject(List *addresses); -extern void ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, - List **ddlCommands); +extern void ReplicateAllObjectsToNodeCommandList(MetadataSyncContext syncContext); /* Remaining metadata utility functions */ extern Oid TableOwnerOid(Oid relationId); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index d8d38cbbe..a20c6e61a 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -16,6 +16,8 @@ #include "postgres.h" +#include "distributed/metadata_utility.h" +#include "distributed/metadata_sync_context.h" #include "storage/lmgr.h" #include "storage/lockdefs.h" #include "nodes/pg_list.h" @@ -105,11 +107,11 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); -extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, - List **commandList); -extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, - List **metadataSnapshotCommandList); - +extern void SyncDistributedObjectsCommandList(MetadataSyncContext syncContext); +extern void PgDistTableMetadataSyncCommandList(MetadataSyncContext syncContext); +extern void +SyncMetadataCommands(MetadataSyncContext syncContext, List *commandList, + bool raiseErrors); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); extern uint32 WorkerNodeHashCode(const void *key, Size keySize);