From 9cacbeab7e64b48b1be549effd96f18b00fe9548 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 25 Nov 2022 15:44:34 +0100 Subject: [PATCH] remove shardgroups on colocation group removal --- .../distributed/metadata/metadata_sync.c | 48 ++++++++++++++++ .../distributed/sql/citus--11.2-1--11.3-1.sql | 1 + .../11.3-1.sql | 8 +++ .../latest.sql | 8 +++ .../distributed/utils/colocation_utils.c | 57 +++++++++++++++++++ src/include/distributed/colocation_utils.h | 1 + src/include/distributed/metadata_sync.h | 1 + 7 files changed, 124 insertions(+) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f9f1fd609..f4fd7a920 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -141,6 +141,7 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation); +static char * ShardgroupDeleteCommandByColocationId(uint32 colocationId); static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); @@ -169,6 +170,7 @@ PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); +PG_FUNCTION_INFO_V1(citus_internal_delete_shardgroup_metadata); static bool got_SIGTERM = false; @@ -3852,6 +3854,30 @@ citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS) } +/* + * citus_internal_delete_shardgroup_metadata is an internal UDF to + * delte rows from pg_dist_shardgroup. + */ +Datum +citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + int colocationId = PG_GETARG_INT32(0); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + DeleteShardgroupForColocationIdLocally(colocationId); + + PG_RETURN_VOID(); +} + + /* * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. */ @@ -3969,6 +3995,14 @@ RemoteCollationIdExpression(Oid colocationId) } +void +SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId) +{ + char *command = ShardgroupDeleteCommandByColocationId(colocationId); + SendCommandToWorkersWithMetadataViaSuperUser(command); +} + + /* * SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers. */ @@ -3985,6 +4019,20 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId) } +static char * +ShardgroupDeleteCommandByColocationId(uint32 colocationId) +{ + StringInfoData deleteCommand = { 0 }; + initStringInfo(&deleteCommand); + + appendStringInfo(&deleteCommand, + "SELECT pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid => %d)", + colocationId); + + return deleteCommand.data; +} + + /* * ColocationGroupDeleteCommand returns a command for deleting a colocation group. */ diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 75f75efd1..b410ffd3f 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -61,3 +61,4 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata( ); #include "udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql" +#include "udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql new file mode 100644 index 000000000..cd7517968 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS + 'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql new file mode 100644 index 000000000..cd7517968 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS + 'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to'; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 985d4c38e..3a0316323 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -39,6 +39,7 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" +#include "distributed/pg_dist_shardgroup.h" /* local function forward declarations */ @@ -50,6 +51,7 @@ static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval, static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); static void DeleteColocationGroup(uint32 colocationId); +static void DeleteShardgroupForColocationId(uint32 colocationId); static uint32 CreateColocationGroupForRelation(Oid sourceRelationId); static void BreakColocation(Oid sourceRelationId); @@ -1250,12 +1252,67 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId) if (colocatedTableCount == 0) { + DeleteShardgroupForColocationId(colocationId); DeleteColocationGroup(colocationId); } } } +static void +DeleteShardgroupForColocationId(uint32 colocationId) +{ + DeleteShardgroupForColocationIdLocally(colocationId); + SyncDeleteShardgroupForColocationIdToNodes(colocationId); +} + + +void +DeleteShardgroupForColocationIdLocally(uint32 colocationId) +{ + ScanKeyData scanKey[1] = { 0 }; + bool indexOK = false; + + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + /* TODO add index to find shardgroups by colocationid */ + SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup, InvalidOid, indexOK, + NULL, lengthof(scanKey), scanKey); + + /* + * simple_heap_delete() expects that the caller has at least an + * AccessShareLock on primary key index. + * + * XXX: This does not seem required, do we really need to acquire this lock? + * Postgres doesn't acquire such locks on indexes before deleting catalog tuples. + * Linking here the reasons we added this lock acquirement: + * https://github.com/citusdata/citus/pull/2851#discussion_r306569462 + * https://github.com/citusdata/citus/pull/2855#discussion_r313628554 + * https://github.com/citusdata/citus/issues/1890 + */ + Relation replicaIndex = + index_open(RelationGetPrimaryKeyIndex(pgDistShardgroup), AccessShareLock); + + /* for all records found delete from heap */ + HeapTuple heapTuple = NULL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + simple_heap_delete(pgDistShardgroup, &(heapTuple->t_self)); + + CitusInvalidateRelcacheByRelid(DistShardgroupRelationId()); + } + + CommandCounterIncrement(); + table_close(replicaIndex, AccessShareLock); + + systable_endscan(scanDescriptor); + table_close(pgDistShardgroup, NoLock); +} + + /* * DeleteColocationGroup deletes the colocation group from pg_dist_colocation * throughout the cluster. diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 9e6641cd3..efdfaf448 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -49,6 +49,7 @@ extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colo extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId); extern List * ColocationGroupTableList(uint32 colocationId, uint32 count); extern void DeleteColocationGroupLocally(uint32 colocationId); +extern void DeleteShardgroupForColocationIdLocally(uint32 colocationId); extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel, Oid distributionColumnType, Oid distributionColumnCollation, diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e06b5268f..7e1c4033a 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -113,6 +113,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumType, Oid distributionColumnCollation); +extern void SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node"