diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 3ff650aa0..823570621 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -537,7 +537,9 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, { /* since we are executing ddl commands disable propagation first, primarily for mx */ if (ddlCommands != NULL) - *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); + { + *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION); + } /* * collect all dependencies in creation order and get their ddl commands @@ -581,18 +583,29 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections, } List *perObjCommands = GetDependencyCreateDDLCommands(dependency); - if (ddlCommands != NULL) - *ddlCommands = list_concat(*ddlCommands, - perObjCommands); if (list_length(nodeToSyncMetadataConnections) != 0) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), perObjCommands); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + perObjCommands); + } + + if (ddlCommands != NULL) + { + *ddlCommands = list_concat(*ddlCommands, + perObjCommands); + } + else + { + list_free_deep(perObjCommands); } } if (ddlCommands != NULL) - *ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION); + { + *ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION); + } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 1ef6fa364..b245e8ab9 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -2667,7 +2667,8 @@ CreateTableMetadataOnWorkers(Oid relationId) * empty list to not disable/enable DDL propagation for nothing. */ void -DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPartitionCommandList) +DetachPartitionCommandList(List *nodeToSyncMetadataConnections, + List **detachPartitionCommandList) { List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); @@ -2692,10 +2693,12 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPar Assert(PartitionTable(partitionRelOid)); char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid); - if (list_length(nodeToSyncMetadataConnections) != 0) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(detachCommand)); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + list_make1( + detachCommand)); } if (detachPartitionCommandList != NULL) @@ -2704,8 +2707,9 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections, List **detachPar lappend(*detachPartitionCommandList, detachCommand); } else + { pfree(detachCommand); - + } } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 0c4a2da98..fcf0448de 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -680,20 +680,34 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, List *commandListForRelation = InterTableRelationshipOfRelationCommandList(relationId); - *multipleTableIntegrationCommandList = list_concat( - *multipleTableIntegrationCommandList, - commandListForRelation); if (list_length(nodeToSyncMetadataConnections) != 0) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), commandListForRelation); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + commandListForRelation); + } + + if (multipleTableIntegrationCommandList != NULL) + { + *multipleTableIntegrationCommandList = list_concat( + *multipleTableIntegrationCommandList, + commandListForRelation); + } + else + { + list_free_deep(commandListForRelation); } } - *multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, - *multipleTableIntegrationCommandList); - *multipleTableIntegrationCommandList = lappend(*multipleTableIntegrationCommandList, - ENABLE_DDL_PROPAGATION); + if (multipleTableIntegrationCommandList != NULL) + { + *multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION, + *multipleTableIntegrationCommandList); + *multipleTableIntegrationCommandList = lappend( + *multipleTableIntegrationCommandList, + ENABLE_DDL_PROPAGATION); + } } @@ -702,7 +716,8 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections, * (except pg_dist_node) metadata. We call them as table metadata. */ void -PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **metadataSnapshotCommandList) +PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, + List **metadataSnapshotCommandList) { List *distributedTableList = CitusTableList(); List *propagatedTableList = NIL; @@ -731,7 +746,10 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **m if (list_length(nodeToSyncMetadataConnections) != 0) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), *metadataSnapshotCommandList); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + * + metadataSnapshotCommandList); } /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ @@ -740,7 +758,9 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **m List *tableMetadataCreateCommandList = CitusTableMetadataCreateCommandList(cacheEntry->relationId); - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), tableMetadataCreateCommandList); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + tableMetadataCreateCommandList); *metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList, tableMetadataCreateCommandList); @@ -771,7 +791,6 @@ static void PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, List **nodeWideObjectCommandList) { - bool hasObjects = false; if (EnableAlterRoleSetPropagation) { /* @@ -794,7 +813,9 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections, if (list_length(nodeToSyncMetadataConnections) != 0) { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), alterRoleSetCommands); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + alterRoleSetCommands); } } } @@ -831,15 +852,23 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList); - //if (commandList != NULL) + if (list_length(nodeToSyncMetadataConnections) != 0) { + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + list_make1( + BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND)); + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + list_make1( + REMOVE_ALL_SHELL_TABLES_COMMAND)); + } - - if (list_length(nodeToSyncMetadataConnections) != 0) - { - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND)); - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(REMOVE_ALL_SHELL_TABLES_COMMAND)); - } + if (commandList != NULL) + { + *commandList = lappend(*commandList, + BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); + *commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND); } /* @@ -849,17 +878,22 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections, &replicateAllObjectsToNodeCommandList); if (commandList != NULL) - *commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList); + { + *commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList); + } /* * After creating each table, handle the inter table relationship between * those tables. */ List *interTableRelationshipCommandList = NIL; - InterTableRelationshipCommandList(nodeToSyncMetadataConnections, &interTableRelationshipCommandList); + InterTableRelationshipCommandList(nodeToSyncMetadataConnections, + &interTableRelationshipCommandList); if (commandList != NULL) - *commandList = list_concat(*commandList, interTableRelationshipCommandList); + { + *commandList = list_concat(*commandList, interTableRelationshipCommandList); + } } @@ -875,7 +909,6 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co static void SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections) { - if (nodeToSyncMetadataConnections == NIL) { return; @@ -924,7 +957,8 @@ SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections) Assert(superuser()); List *syncPgDistMetadataCommandList = NIL; - PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, &syncPgDistMetadataCommandList); + PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections, + &syncPgDistMetadataCommandList); } @@ -1187,7 +1221,8 @@ ActivateNodeList(List *nodeList) BoolGetDatum(true)); /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ - bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode); + bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode) && + !NodeIsCoordinator(workerNode); if (syncMetadata) { /* @@ -1207,14 +1242,18 @@ ActivateNodeList(List *nodeList) MultiConnection *connection = - GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName, node->workerPort); + GetNodeConnection(OUTSIDE_TRANSACTION, node->workerName, + node->workerPort); ClaimConnectionExclusively(connection); + nodeToSyncMetadataConnections = lappend(nodeToSyncMetadataConnections, connection); - SendCommandListToWorkerOutsideTransactionWithConnection(linitial(nodeToSyncMetadataConnections), list_make1(DISABLE_DDL_PROPAGATION)); - + SendCommandListToWorkerOutsideTransactionWithConnection(linitial( + nodeToSyncMetadataConnections), + list_make1( + DISABLE_DDL_PROPAGATION)); } } diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 8d5ab9bde..5709da66a 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -59,7 +59,7 @@ activate_node_snapshot(PG_FUNCTION_ARGS) List *createSnapshotCommands = NodeMetadataCreateCommands(); List *pgDistTableMetadataSyncCommands = NIL; - PgDistTableMetadataSyncCommandList(&pgDistTableMetadataSyncCommands); + PgDistTableMetadataSyncCommandList(NIL, &pgDistTableMetadataSyncCommands); List *activateNodeCommandList = NIL; int activateNodeCommandIndex = 0; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index e79179044..d8d38cbbe 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -106,7 +106,7 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, - List **commandList); + List **commandList); extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections, List **metadataSnapshotCommandList);