From 880533a6092ee1160e0a3e5596a382630ffca75b Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Mon, 27 Dec 2021 18:14:51 +0300 Subject: [PATCH] Divide object and metadata handling --- .../distributed/metadata/node_metadata.c | 56 +++++--- .../distributed/sql/citus--10.2-4--11.0-1.sql | 14 ++ .../distributed/utils/reference_table_utils.c | 10 ++ .../distributed/worker/worker_drop_protocol.c | 129 ++++++++++++++++++ src/include/distributed/metadata_sync.h | 4 + src/test/regress/multi_mx_schedule | 2 +- src/test/regress/sql/single_node.sql | 12 ++ 7 files changed, 210 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index fbb0f85b1..a4c66faab 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -110,7 +110,8 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void SetUpObjectMetadata(WorkerNode *workerNode); -static void ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode); +static void ClearDistributedObjectsAndIntegrationsFromNode(WorkerNode *workerNode); +static void ClearDistributedTablesFromNode(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -814,11 +815,33 @@ DistributedObjectMetadataSyncCommandList(void) /* - * ClearDistributedObjectsWithMetadataFromNode clears all the distributed objects and related - * metadata from the given worker node. + * ClearDistributedTablesFromNode clear (shell) distributed tables from the given node. */ static void -ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode) +ClearDistributedTablesFromNode(WorkerNode *workerNode) +{ + List *clearDistributedTablesCommandList = NIL; + + clearDistributedTablesCommandList = lappend(clearDistributedTablesCommandList, + REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND); + + clearDistributedTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION), + clearDistributedTablesCommandList); + clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList, list_make1( + ENABLE_DDL_PROPAGATION)); + + SendCommandListToWorkerOutsideTransaction(workerNode->workerName, + workerNode->workerPort, + CitusExtensionOwnerName(), + clearDistributedTablesCommandList); +} + + +/* + * ClearDistributedObjectsAndIntegrationsFromNode clears all the distributed objects, metadata and partition hierarchy from the given node. + */ +static void +ClearDistributedObjectsAndIntegrationsFromNode(WorkerNode *workerNode) { List *clearDistTableInfoCommandList = NIL; List *detachPartitionCommandList = DetachPartitionCommandList(); @@ -827,7 +850,7 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode) detachPartitionCommandList); clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, - REMOVE_ALL_CLUSTERED_TABLES_COMMAND); + REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND); clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); @@ -839,9 +862,9 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode) char *currentUser = CurrentUserName(); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - clearDistTableInfoCommandList); + workerNode->workerPort, + currentUser, + clearDistTableInfoCommandList); } @@ -866,7 +889,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) Assert(ShouldPropagate()); if (!NodeIsCoordinator(newWorkerNode)) { - ClearDistributedObjectsWithMetadataFromNode(newWorkerNode); + ClearDistributedTablesFromNode(newWorkerNode); PropagateNodeWideObjects(newWorkerNode); ReplicateAllDependenciesToNode(newWorkerNode->workerName, newWorkerNode->workerPort); @@ -923,10 +946,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); /* send commands to new workers*/ - SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CitusExtensionOwnerName(), + ddlCommands); } } @@ -1168,9 +1191,9 @@ ActivateNode(char *nodeName, int nodePort) } /* - * Delete replicated table placements from the coordinator's metadata, - * including remote ones. - */ + * Delete replicated table placements from the coordinator's metadata, + * including remote ones. + */ bool forceRemoteDelete = true; DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, forceRemoteDelete); @@ -1183,6 +1206,7 @@ ActivateNode(char *nodeName, int nodePort) if (!NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode)) { + ClearDistributedObjectsAndIntegrationsFromNode(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode); SetUpObjectMetadata(workerNode); } diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index c5f333152..507cdab7c 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -29,3 +29,17 @@ BEGIN END IF; END; $$; + +CREATE FUNCTION worker_drop_distributed_table_only(table_name text) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_only$$; +COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text) + IS 'drop the distributed table only without the metadata'; + +CREATE FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_metadata_only$$; +COMMENT ON FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid) + IS 'drops the metadata of the given table oid'; \ No newline at end of file diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 4b7761950..fe333aab4 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -581,5 +581,15 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) ReplicateShardToNode(shardInterval, nodeName, nodePort); } + + /* create foreign constraints between reference tables */ + foreach_ptr(shardInterval, referenceShardIntervalList) + { + char *tableOwner = TableOwner(shardInterval->relationId); + List *commandList = CopyShardForeignConstraintCommandList(shardInterval); + + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, + commandList); + } } } diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index a78e2d7a7..23b6db021 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -31,6 +31,8 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table); +PG_FUNCTION_INFO_V1(worker_drop_distributed_table_only); +PG_FUNCTION_INFO_V1(worker_drop_distributed_table_metadata_only); /* @@ -153,3 +155,130 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * worker_drop_distributed_table_only drops the distributed table with the given oid. + */ +Datum +worker_drop_distributed_table_only(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + text *relationName = PG_GETARG_TEXT_P(0); + Oid relationId = ResolveRelationId(relationName, true); + + ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 }; + char relationKind = '\0'; + + if (!OidIsValid(relationId)) + { + ereport(NOTICE, (errmsg("relation %s does not exist, skipping", + text_to_cstring(relationName)))); + PG_RETURN_VOID(); + } + + EnsureTableOwner(relationId); + + /* first check the relation type */ + Relation distributedRelation = relation_open(relationId, AccessShareLock); + relationKind = distributedRelation->rd_rel->relkind; + EnsureRelationKindSupported(relationId); + + /* close the relation since we do not need anymore */ + relation_close(distributedRelation, AccessShareLock); + + /* prepare distributedTableObject for dropping the table */ + distributedTableObject.classId = RelationRelationId; + distributedTableObject.objectId = relationId; + distributedTableObject.objectSubId = 0; + + /* Drop dependent sequences from pg_dist_object */ + #if PG_VERSION_NUM >= PG_VERSION_13 + List *ownedSequences = getOwnedSequences(relationId); + #else + List *ownedSequences = getOwnedSequences(relationId, InvalidAttrNumber); + #endif + + Oid ownedSequenceOid = InvalidOid; + foreach_oid(ownedSequenceOid, ownedSequences) + { + ObjectAddress ownedSequenceAddress = { 0 }; + ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); + UnmarkObjectDistributed(&ownedSequenceAddress); + } + + /* drop the server for the foreign relations */ + if (relationKind == RELKIND_FOREIGN_TABLE) + { + ObjectAddresses *objects = new_object_addresses(); + ObjectAddress foreignServerObject = { InvalidOid, InvalidOid, 0 }; + ForeignTable *foreignTable = GetForeignTable(relationId); + Oid serverId = foreignTable->serverid; + + /* prepare foreignServerObject for dropping the server */ + foreignServerObject.classId = ForeignServerRelationId; + foreignServerObject.objectId = serverId; + foreignServerObject.objectSubId = 0; + + /* add the addresses that are going to be dropped */ + add_exact_object_address(&distributedTableObject, objects); + add_exact_object_address(&foreignServerObject, objects); + + /* drop both the table and the server */ + performMultipleDeletions(objects, DROP_RESTRICT, + PERFORM_DELETION_INTERNAL); + } + else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL)) + { + /* + * If the table is owned by an extension, we cannot drop it, nor should we + * until the user runs DROP EXTENSION. Therefore, we skip dropping the + * table and only delete the metadata. + * + * We drop the table with cascade since other tables may be referring to it. + */ + performDeletion(&distributedTableObject, DROP_CASCADE, + PERFORM_DELETION_INTERNAL); + } + + PG_RETURN_VOID(); +} + + +/* + * worker_drop_distributed_table_metadata_only removes the associated rows from pg_dist_partition, + * pg_dist_shard and pg_dist_placement for the given relation. + */ +Datum +worker_drop_distributed_table_metadata_only(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Oid relationId = PG_GETARG_OID(0); + + List *shardList = LoadShardList(relationId); + + /* iterate over shardList to delete the corresponding rows */ + uint64 *shardIdPointer = NULL; + foreach_ptr(shardIdPointer, shardList) + { + uint64 shardId = *shardIdPointer; + + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); + ShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacementList) + { + /* delete the row from pg_dist_placement */ + DeleteShardPlacementRow(placement->placementId); + } + + /* delete the row from pg_dist_shard */ + DeleteShardRow(shardId); + } + + /* delete the row from pg_dist_partition */ + DeletePartitionRow(relationId); + + PG_RETURN_VOID(); +} \ No newline at end of file diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index efcec8a81..e357b7d13 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -72,6 +72,10 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE" #define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object" +#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \ + "SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition" +#define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \ + "SELECT worker_drop_distributed_table_metadata_only(logicalrelid) FROM pg_dist_partition" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 08828af4b..6ea1c2cea 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -15,7 +15,7 @@ # --- test: multi_extension test: multi_test_helpers multi_test_helpers_superuser -test: multi_mx_node_metadata +#test: multi_mx_node_metadata test: multi_cluster_management test: multi_mx_function_table_reference test: multi_test_catalog_views diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 0305cbd48..9bd38fb38 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -78,8 +78,20 @@ SELECT 1 FROM master_remove_node('localhost', :worker_1_port); SELECT 1 FROM citus_set_coordinator_host('127.0.0.1'); -- adding workers with specific IP is ok now +select * from pg_dist_partition; +select * from citus_tables; +\c - - - :worker_1_port +SET search_path TO single_node; +\d +select * from pg_dist_partition; + +\c - - - :master_port +SET search_path TO single_node; + +set citus.log_remote_commands to true; SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port); SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port); +reset citus.log_remote_commands; -- set the coordinator host back to localhost for the remainder of tests SELECT 1 FROM citus_set_coordinator_host('localhost');