Divide object and metadata handling

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-27 18:14:51 +03:00
parent 6598a23963
commit 880533a609
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
7 changed files with 210 additions and 17 deletions

View File

@ -110,7 +110,8 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpObjectMetadata(WorkerNode *workerNode);
static void ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode);
static void ClearDistributedObjectsAndIntegrationsFromNode(WorkerNode *workerNode);
static void ClearDistributedTablesFromNode(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -814,11 +815,33 @@ DistributedObjectMetadataSyncCommandList(void)
/*
* ClearDistributedObjectsWithMetadataFromNode clears all the distributed objects and related
* metadata from the given worker node.
* ClearDistributedTablesFromNode clear (shell) distributed tables from the given node.
*/
static void
ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode)
ClearDistributedTablesFromNode(WorkerNode *workerNode)
{
List *clearDistributedTablesCommandList = NIL;
clearDistributedTablesCommandList = lappend(clearDistributedTablesCommandList,
REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND);
clearDistributedTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),
clearDistributedTablesCommandList);
clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList, list_make1(
ENABLE_DDL_PROPAGATION));
SendCommandListToWorkerOutsideTransaction(workerNode->workerName,
workerNode->workerPort,
CitusExtensionOwnerName(),
clearDistributedTablesCommandList);
}
/*
* ClearDistributedObjectsAndIntegrationsFromNode clears all the distributed objects, metadata and partition hierarchy from the given node.
*/
static void
ClearDistributedObjectsAndIntegrationsFromNode(WorkerNode *workerNode)
{
List *clearDistTableInfoCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList();
@ -827,7 +850,7 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode)
detachPartitionCommandList);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
@ -839,9 +862,9 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode)
char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
clearDistTableInfoCommandList);
workerNode->workerPort,
currentUser,
clearDistTableInfoCommandList);
}
@ -866,7 +889,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
Assert(ShouldPropagate());
if (!NodeIsCoordinator(newWorkerNode))
{
ClearDistributedObjectsWithMetadataFromNode(newWorkerNode);
ClearDistributedTablesFromNode(newWorkerNode);
PropagateNodeWideObjects(newWorkerNode);
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
@ -923,10 +946,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers*/
SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort,
CitusExtensionOwnerName(),
ddlCommands);
SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort,
CitusExtensionOwnerName(),
ddlCommands);
}
}
@ -1168,9 +1191,9 @@ ActivateNode(char *nodeName, int nodePort)
}
/*
* Delete replicated table placements from the coordinator's metadata,
* including remote ones.
*/
* Delete replicated table placements from the coordinator's metadata,
* including remote ones.
*/
bool forceRemoteDelete = true;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
@ -1183,6 +1206,7 @@ ActivateNode(char *nodeName, int nodePort)
if (!NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
{
ClearDistributedObjectsAndIntegrationsFromNode(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode);
}

View File

@ -29,3 +29,17 @@ BEGIN
END IF;
END;
$$;
CREATE FUNCTION worker_drop_distributed_table_only(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_only$$;
COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text)
IS 'drop the distributed table only without the metadata';
CREATE FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_metadata_only$$;
COMMENT ON FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid)
IS 'drops the metadata of the given table oid';

View File

@ -581,5 +581,15 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
ReplicateShardToNode(shardInterval, nodeName, nodePort);
}
/* create foreign constraints between reference tables */
foreach_ptr(shardInterval, referenceShardIntervalList)
{
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
commandList);
}
}
}

View File

@ -31,6 +31,8 @@
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_only);
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_metadata_only);
/*
@ -153,3 +155,130 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* worker_drop_distributed_table_only drops the distributed table with the given oid.
*/
Datum
worker_drop_distributed_table_only(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, true);
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
char relationKind = '\0';
if (!OidIsValid(relationId))
{
ereport(NOTICE, (errmsg("relation %s does not exist, skipping",
text_to_cstring(relationName))));
PG_RETURN_VOID();
}
EnsureTableOwner(relationId);
/* first check the relation type */
Relation distributedRelation = relation_open(relationId, AccessShareLock);
relationKind = distributedRelation->rd_rel->relkind;
EnsureRelationKindSupported(relationId);
/* close the relation since we do not need anymore */
relation_close(distributedRelation, AccessShareLock);
/* prepare distributedTableObject for dropping the table */
distributedTableObject.classId = RelationRelationId;
distributedTableObject.objectId = relationId;
distributedTableObject.objectSubId = 0;
/* Drop dependent sequences from pg_dist_object */
#if PG_VERSION_NUM >= PG_VERSION_13
List *ownedSequences = getOwnedSequences(relationId);
#else
List *ownedSequences = getOwnedSequences(relationId, InvalidAttrNumber);
#endif
Oid ownedSequenceOid = InvalidOid;
foreach_oid(ownedSequenceOid, ownedSequences)
{
ObjectAddress ownedSequenceAddress = { 0 };
ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid);
UnmarkObjectDistributed(&ownedSequenceAddress);
}
/* drop the server for the foreign relations */
if (relationKind == RELKIND_FOREIGN_TABLE)
{
ObjectAddresses *objects = new_object_addresses();
ObjectAddress foreignServerObject = { InvalidOid, InvalidOid, 0 };
ForeignTable *foreignTable = GetForeignTable(relationId);
Oid serverId = foreignTable->serverid;
/* prepare foreignServerObject for dropping the server */
foreignServerObject.classId = ForeignServerRelationId;
foreignServerObject.objectId = serverId;
foreignServerObject.objectSubId = 0;
/* add the addresses that are going to be dropped */
add_exact_object_address(&distributedTableObject, objects);
add_exact_object_address(&foreignServerObject, objects);
/* drop both the table and the server */
performMultipleDeletions(objects, DROP_RESTRICT,
PERFORM_DELETION_INTERNAL);
}
else if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
/*
* If the table is owned by an extension, we cannot drop it, nor should we
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
* table and only delete the metadata.
*
* We drop the table with cascade since other tables may be referring to it.
*/
performDeletion(&distributedTableObject, DROP_CASCADE,
PERFORM_DELETION_INTERNAL);
}
PG_RETURN_VOID();
}
/*
* worker_drop_distributed_table_metadata_only removes the associated rows from pg_dist_partition,
* pg_dist_shard and pg_dist_placement for the given relation.
*/
Datum
worker_drop_distributed_table_metadata_only(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
List *shardList = LoadShardList(relationId);
/* iterate over shardList to delete the corresponding rows */
uint64 *shardIdPointer = NULL;
foreach_ptr(shardIdPointer, shardList)
{
uint64 shardId = *shardIdPointer;
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList)
{
/* delete the row from pg_dist_placement */
DeleteShardPlacementRow(placement->placementId);
}
/* delete the row from pg_dist_shard */
DeleteShardRow(shardId);
}
/* delete the row from pg_dist_partition */
DeletePartitionRow(relationId);
PG_RETURN_VOID();
}

View File

@ -72,6 +72,10 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
#define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_metadata_only(logicalrelid) FROM pg_dist_partition"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"

View File

@ -15,7 +15,7 @@
# ---
test: multi_extension
test: multi_test_helpers multi_test_helpers_superuser
test: multi_mx_node_metadata
#test: multi_mx_node_metadata
test: multi_cluster_management
test: multi_mx_function_table_reference
test: multi_test_catalog_views

View File

@ -78,8 +78,20 @@ SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
-- adding workers with specific IP is ok now
select * from pg_dist_partition;
select * from citus_tables;
\c - - - :worker_1_port
SET search_path TO single_node;
\d
select * from pg_dist_partition;
\c - - - :master_port
SET search_path TO single_node;
set citus.log_remote_commands to true;
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
reset citus.log_remote_commands;
-- set the coordinator host back to localhost for the remainder of tests
SELECT 1 FROM citus_set_coordinator_host('localhost');