From 8e9acf0add263d164952f3b8c4531f6126ed141f Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Thu, 8 Dec 2022 15:13:48 +0100 Subject: [PATCH] Implement shardgroup drop during splits for the old shardgroups Also did a refactor to treat the shardgroupid as an int64 throughout. Before it has been addressed as int64/uint64/uint32, now it is only treated as an int64. Still miss some propagation of the creation of the new shardgroups on all the workers to support query from any node. --- .../distributed/metadata/metadata_cache.c | 12 ++++ .../distributed/metadata/metadata_sync.c | 63 ++++++++++++++++--- .../distributed/metadata/metadata_utility.c | 31 +++++---- .../distributed/operations/create_shards.c | 6 +- .../distributed/operations/shard_split.c | 36 ++++++----- .../distributed/operations/stage_protocol.c | 5 +- .../11.3-1.sql | 10 +-- .../latest.sql | 10 +-- src/include/distributed/metadata_cache.h | 1 + src/include/distributed/metadata_sync.h | 1 + src/include/distributed/metadata_utility.h | 10 +-- 11 files changed, 126 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index a98a291ed..802010ddd 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -183,6 +183,7 @@ typedef struct MetadataCacheData Oid distPartitionColocationidIndexId; Oid distShardLogicalRelidIndexId; Oid distShardShardidIndexId; + Oid distShardgrouPkeyId; Oid distPlacementShardidIndexId; Oid distPlacementPlacementidIndexId; Oid distColocationidIndexId; @@ -2873,6 +2874,17 @@ DistShardShardidIndexId(void) } +/* return oid of pg_dist_shardgroup_pkey index */ +Oid +DistShardgroupPkeyId(void) +{ + CachedRelationLookup("pg_dist_shardgroup_pkey", + &MetadataCache.distShardgrouPkeyId); + + return MetadataCache.distShardgrouPkeyId; +} + + /* return oid of pg_dist_placement_shardid_index */ Oid DistPlacementShardidIndexId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f4fd7a920..e672cb3e7 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -142,6 +142,7 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, Oid distributionColumnType, Oid distributionColumnCollation); static char * ShardgroupDeleteCommandByColocationId(uint32 colocationId); +static const char * ShardgroupDeleteCommandByShardgroupId(int64 shardgroupid); static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); @@ -1312,7 +1313,7 @@ ShardListInsertCommand(List *shardIntervalList) foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; - uint64 shardGroupId = shardInterval->shardGroupId; + int64 shardGroupId = shardInterval->shardGroupId; Oid distributedRelationId = shardInterval->relationId; char *qualifiedRelationName = generate_qualified_relation_name( distributedRelationId); @@ -3263,7 +3264,7 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) } PG_ENSURE_ARGNOTNULL(5, "shard group id"); - uint64 shardGroupId = (uint64) PG_GETARG_INT64(5); + int64 shardGroupId = (uint64) PG_GETARG_INT64(5); /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); @@ -3864,17 +3865,39 @@ citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureSuperUser(); - int colocationId = PG_GETARG_INT32(0); - - if (!ShouldSkipMetadataChecks()) + if (!PG_ARGISNULL(0) && PG_ARGISNULL(1)) { - /* this UDF is not allowed allowed for executing as a separate command */ - EnsureCoordinatorInitiatedOperation(); + int64 shardgroupId = PG_GETARG_INT64(0); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + DeleteShardgroupRow(shardgroupId); + + PG_RETURN_VOID(); } + else if (!PG_ARGISNULL(1) && PG_ARGISNULL(0)) + { + int colocationId = PG_GETARG_INT32(1); - DeleteShardgroupForColocationIdLocally(colocationId); + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } - PG_RETURN_VOID(); + DeleteShardgroupForColocationIdLocally(colocationId); + + PG_RETURN_VOID(); + } + else + { + ereport(ERROR, (errmsg("expected exactly one argument from: " + "shardgroupid, colocationid"))); + } } @@ -4003,6 +4026,14 @@ SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId) } +void +SyncDeleteShardgroupForShardgroupIdToNodes(int64 shardgroupId) +{ + const char *command = ShardgroupDeleteCommandByShardgroupId(shardgroupId); + SendCommandToWorkersWithMetadataViaSuperUser(command); +} + + /* * SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers. */ @@ -4033,6 +4064,20 @@ ShardgroupDeleteCommandByColocationId(uint32 colocationId) } +static const char * +ShardgroupDeleteCommandByShardgroupId(int64 shardgroupid) +{ + StringInfoData deleteCommand = { 0 }; + initStringInfo(&deleteCommand); + + appendStringInfo(&deleteCommand, + "SELECT pg_catalog.citus_internal_delete_shardgroup_metadata(" + "shardgroupid => " INT64_FORMAT ")", shardgroupid); + + return deleteCommand.data; +} + + /* * ColocationGroupDeleteCommand returns a command for deleting a colocation group. */ diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ce9f1bf9f..82e44c980 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1673,7 +1673,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) void -InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId, +InsertShardGroupRow(int64 shardGroupId, uint32 colocationId, text *shardMinValue, text *shardMaxValue) { Datum values[Natts_pg_dist_shardgroup]; @@ -1720,7 +1720,7 @@ InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId, */ void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue, uint64 *shardGroupId) + text *shardMinValue, text *shardMaxValue, int64 *shardGroupId) { Datum values[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard]; @@ -1750,7 +1750,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, if (shardGroupId) { - values[Anum_pg_dist_shard_shardgroupid - 1] = UInt64GetDatum(*shardGroupId); + values[Anum_pg_dist_shard_shardgroupid - 1] = Int64GetDatum(*shardGroupId); } else { @@ -1948,37 +1948,34 @@ DeletePartitionRow(Oid distributedRelationId) void -DeleteShardgroupRow(uint32 shardgroupId) +DeleteShardgroupRow(int64 shardgroupId) { - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - bool indexOK = true; + ScanKeyData scanKey[1] = { 0 }; + const bool indexOK = true; Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); - ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_shardgroupid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardgroupId)); SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup, - DistShardShardidIndexId(), indexOK, - NULL, scanKeyCount, scanKey); + DistShardgroupPkeyId(), indexOK, NULL, + lengthof(scanKey), scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); if (!HeapTupleIsValid(heapTuple)) { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); + ereport(ERROR, (errmsg("could not find valid entry for shardgroup " INT64_FORMAT, + shardgroupId))); } - Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); - Oid distributedRelationId = pgDistShardForm->logicalrelid; - simple_heap_delete(pgDistShardgroup, &heapTuple->t_self); systable_endscan(scanDescriptor); /* invalidate previous cache entry */ - CitusInvalidateRelcacheByRelid(distributedRelationId); + /* TODO figure out what caches to invalidate */ + /* CitusInvalidateRelcacheByRelid(distributedRelationId); */ CommandCounterIncrement(); table_close(pgDistShardgroup, NoLock); diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 40a1ca7c0..71a78ef5f 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -196,7 +196,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, uint64 shardId = GetNextShardId(); /* we use shardid of the first shard in a shardgroup as the shardgroupid */ - uint64 shardGroupId = shardId; + int64 shardGroupId = (int64) shardId; /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -301,7 +301,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64)); *newShardIdPtr = GetNextShardId(); insertedShardIds = lappend(insertedShardIds, newShardIdPtr); - uint64 sourceShardGroupId = sourceShardInterval->shardGroupId; + int64 sourceShardGroupId = sourceShardInterval->shardGroupId; int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); @@ -399,7 +399,7 @@ CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId, /* get the next shard id */ uint64 shardId = GetNextShardId(); - uint64 shardGroupId = shardId; + int64 shardGroupId = (int64) shardId; StringInfoData shardgroupQuery = { 0 }; initStringInfo(&shardgroupQuery); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index d1ea324d5..facc1dcaa 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -159,7 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 static uint64 GetNextShardIdForSplitChild(void); static void AcquireNonblockingSplitLock(Oid relationId); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); -static void DropShardgroupMetadata(uint32 shardgroupId); +static void DropShardgroupMetadata(int64 shardgroupId); static void DropShardListMetadata(List *shardIntervalList); /* Customize error message strings based on operation type */ @@ -498,7 +498,7 @@ SplitShard(SplitMode splitMode, { sourceColocatedShardIntervalList = colocatedShardIntervalList; } - + /* Find the colocationId we are splitting in */ CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId); uint32 colocationId = citusTableCacheEntry->colocationId; @@ -571,11 +571,11 @@ BlockingShardSplit(SplitOperation splitOperation, BlockWritesToShardList(sourceColocatedShardIntervalList); ShardInterval *sampleShardInterval = - (ShardInterval *) linitial(sourceColocatedShardIntervalList); + (ShardInterval *) linitial(sourceColocatedShardIntervalList); List *shardgroupSplits = CreateNewShardgroups( - colocationId, - sampleShardInterval, - shardSplitPointsList); + colocationId, + sampleShardInterval, + shardSplitPointsList); /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( @@ -1042,7 +1042,6 @@ static List * CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, List *splitPointsForShard) { - int32 splitParentMaxValue = DatumGetInt32(sampleInterval->maxValue); int32 currentSplitChildMinValue = DatumGetInt32(sampleInterval->minValue); @@ -1064,7 +1063,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, Datum splitPoint = (Datum) lfirst(splitPointCell); Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); - shardgroup->shardgroupId = GetNextShardIdForSplitChild(); + shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild(); shardgroup->colocationId = colocationId; shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); shardgroup->maxShardValue = splitPoint; @@ -1081,7 +1080,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, */ Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); - shardgroup->shardgroupId = GetNextShardIdForSplitChild(); + shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild(); shardgroup->colocationId = colocationId; shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); shardgroup->maxShardValue = splitParentMaxValue; @@ -1104,8 +1103,8 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) { List *shardSplitIntervalList = - CreateShardIntervalsForNewShardgroups(shardToSplitInterval, - newShardgroups); + CreateShardIntervalsForNewShardgroups(shardToSplitInterval, + newShardgroups); shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList, shardSplitIntervalList); @@ -1359,8 +1358,15 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void -DropShardgroupMetadata(uint32 shardgroupId) +DropShardgroupMetadata(int64 shardgroupId) { + /* delete metadata from synced nodes */ + /* TODO decide how shardgroup metadata should be synced*/ + if (/* ShouldSyncTableMetadata(relationId) */ true) + { + SyncDeleteShardgroupForShardgroupIdToNodes(shardgroupId); + } + DeleteShardgroupRow(shardgroupId); } @@ -1469,9 +1475,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, char *databaseName = get_database_name(MyDatabaseId); List *shardgroupSplits = CreateNewShardgroups( - colocationId, - (ShardInterval *) linitial(sourceColocatedShardIntervalList), - shardSplitPointsList); + colocationId, + (ShardInterval *) linitial(sourceColocatedShardIntervalList), + shardSplitPointsList); /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index b93d1eccb..a7cb61882 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -186,11 +186,12 @@ master_create_empty_shard(PG_FUNCTION_ARGS) CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); - uint64 shardgroupId = shardId; + int64 shardgroupId = (int64) shardId; InsertShardGroupRow(shardgroupId, tableEntry->colocationId, nullMinValue, nullMaxValue); - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, &shardgroupId); + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, + &shardgroupId); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql index cd7517968..b4ebe6bcb 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql @@ -1,8 +1,10 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata( + shardgroupid bigint DEFAULT NULL, + colocationid int DEFAULT NULL) RETURNS void LANGUAGE C - STRICT + CALLED ON NULL INPUT AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS - 'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int) IS + 'deletes a shardgroups from pg_dist_shardgroup, expect exactly one argument to be passed as an indicator what shardgroup(s) to remove'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql index cd7517968..b4ebe6bcb 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql @@ -1,8 +1,10 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int) +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata( + shardgroupid bigint DEFAULT NULL, + colocationid int DEFAULT NULL) RETURNS void LANGUAGE C - STRICT + CALLED ON NULL INPUT AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS - 'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int) IS + 'deletes a shardgroups from pg_dist_shardgroup, expect exactly one argument to be passed as an indicator what shardgroup(s) to remove'; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index cacbe2db6..e1a0a5965 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -252,6 +252,7 @@ extern Oid DistBackgroundTaskDependTaskIdIndexId(void); extern Oid DistBackgroundTaskDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); +extern Oid DistShardgroupPkeyId(void); extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementPlacementidIndexId(void); extern Oid DistColocationIndexId(void); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 7e1c4033a..73fe7c8b3 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -114,6 +114,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, Oid distributionColumType, Oid distributionColumnCollation); extern void SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId); +extern void SyncDeleteShardgroupForShardgroupIdToNodes(int64 shardgroupId); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index a8b0cb1f5..c81c92e20 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -66,14 +66,14 @@ typedef struct ShardInterval Datum minValue; /* a shard's typed min value datum */ Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; - uint64 shardGroupId; + int64 shardGroupId; int shardIndex; } ShardInterval; typedef struct Shardgroup { - uint64 shardgroupId; + int64 shardgroupId; uint32 colocationId; Datum minShardValue; /* a shard's typed min value datum */ Datum maxShardValue; /* a shard's typed max value datum */ @@ -315,11 +315,11 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ -extern void InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId, +extern void InsertShardGroupRow(int64 shardGroupId, uint32 colocationId, text *shardMinValue, text *shardMaxValue); extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue, - uint64 *shardGroupId); + int64 *shardGroupId); extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId); @@ -332,7 +332,7 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe extern void UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distributionColumn, int colocationId); extern void DeletePartitionRow(Oid distributedRelationId); -extern void DeleteShardgroupRow(uint32 shardgroupId); +extern void DeleteShardgroupRow(int64 shardgroupId); extern void DeleteShardRow(uint64 shardId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRow(uint64 placementId);