From 6481f7dcd36eb0864ca92d87af16ce59851d551f Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 27 Oct 2023 14:20:16 +0000 Subject: [PATCH] implement shardgroups when splitting shards --- .../distributed/metadata/metadata_cache.c | 11 ++ .../distributed/metadata/metadata_sync.c | 99 +++++++++++++- .../distributed/metadata/metadata_utility.c | 39 ++++++ .../distributed/operations/shard_split.c | 122 +++++++++++++++++- .../sql/feature/shardgroup/downgrade.sql | 4 +- .../sql/feature/shardgroup/upgrade.sql | 2 +- .../12.2-1.sql | 6 + .../latest.sql | 6 + src/include/distributed/metadata_cache.h | 1 + src/include/distributed/metadata_sync.h | 2 + src/include/distributed/metadata_utility.h | 2 +- src/include/distributed/shardgroup.h | 1 + 12 files changed, 286 insertions(+), 9 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 3a90bed97..16494c984 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -199,6 +199,7 @@ typedef struct MetadataCacheData Oid distPartitionColocationidIndexId; Oid distShardLogicalRelidIndexId; Oid distShardShardidIndexId; + Oid distShardgroupShardgroupidIndexId; Oid distPlacementShardidIndexId; Oid distPlacementPlacementidIndexId; Oid distColocationidIndexId; @@ -3002,6 +3003,16 @@ DistShardShardidIndexId(void) } +Oid +DistShardgroupShardgroupIdIndexId(void) +{ + CachedRelationLookup("pg_dist_shardgroup_pkey", + &MetadataCache.distShardgroupShardgroupidIndexId); + + return MetadataCache.distShardgroupShardgroupidIndexId; +} + + /* return oid of pg_dist_placement_shardid_index */ Oid DistPlacementShardidIndexId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 32be9b8af..585781c78 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -174,12 +174,13 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); +PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata); +PG_FUNCTION_INFO_V1(citus_internal_delete_shardgroup_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); -PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); @@ -1189,6 +1190,72 @@ TableOwnerResetCommand(Oid relationId) } +List * +ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals) +{ + StringInfo insertShardgroupsCommand = makeStringInfo(); + + appendStringInfo(insertShardgroupsCommand, + "WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES "); + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervals) + { + appendStringInfo(insertShardgroupsCommand, + "(" SHARDGROUPID_FORMAT "::bigint, %u)", + shardInterval->shardgroupId, colocationId); + + if (llast(shardIntervals) != shardInterval) + { + appendStringInfo(insertShardgroupsCommand, ", "); + } + } + + appendStringInfo(insertShardgroupsCommand, ") "); + + appendStringInfo(insertShardgroupsCommand, + "SELECT pg_catalog.citus_internal_add_shardgroup_metadata(shardgroupid, " + "colocationid) FROM shardgroup_data;"); + + return list_make1(insertShardgroupsCommand->data); +} + + +List * +ShardgroupListDeleteCommand(List *shardIntervalList) +{ + if (list_length(shardIntervalList) == 0) + { + return NIL; + } + + StringInfo deleteShardgroupsCommand = makeStringInfo(); + + appendStringInfo(deleteShardgroupsCommand, + "WITH shardgroup_data(shardgroupid) AS (VALUES "); + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + appendStringInfo(deleteShardgroupsCommand, + "(" SHARDGROUPID_FORMAT "::bigint)", + shardInterval->shardgroupId); + + if (llast(shardIntervalList) != shardInterval) + { + appendStringInfo(deleteShardgroupsCommand, ", "); + } + } + + appendStringInfo(deleteShardgroupsCommand, ") "); + + appendStringInfo(deleteShardgroupsCommand, + "SELECT pg_catalog.citus_internal_delete_shardgroup_metadata(" + "shardgroupid) FROM shardgroup_data;"); + + return list_make1(deleteShardgroupsCommand->data); +} + /* * ShardListInsertCommand generates a single command that can be * executed to replicate shard and shard placement metadata for the @@ -1285,7 +1352,8 @@ ShardListInsertCommand(List *shardIntervalList) } appendStringInfo(insertShardCommand, - "(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld::bigint)", + "(%s::regclass, %ld, '%c'::\"char\", %s, %s, " + SHARDGROUPID_FORMAT "::bigint)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, @@ -3382,7 +3450,7 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS) EnsureSuperUser(); PG_ENSURE_ARGNOTNULL(0, "shardgroupid"); - ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(0); + ShardgroupID shardgroupId = PG_GETARG_SHARDGROUPID(0); PG_ENSURE_ARGNOTNULL(1, "colocationid"); uint32 colocationId = PG_GETARG_UINT32(1); @@ -3393,7 +3461,28 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS) EnsureCoordinatorInitiatedOperation(); } - InsertShardgroupRow(shardgroupID, colocationId); + InsertShardgroupRow(shardgroupId, colocationId); + + PG_RETURN_VOID(); +} + + +Datum +citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + PG_ENSURE_ARGNOTNULL(0, "shardgroupid"); + ShardgroupID shardgroupId = PG_GETARG_SHARDGROUPID(0); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + DeleteShardgroupRow(shardgroupId); PG_RETURN_VOID(); } @@ -4152,7 +4241,7 @@ ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 col } ShardgroupID shardgroupId = shardgroupIDs[i]; - appendStringInfo(&buf, "(%ld::bigint, %u)", + appendStringInfo(&buf, "(" SHARDGROUPID_FORMAT "::bigint, %u)", shardgroupId, colocationId); } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index b486423a2..d48a8cbfb 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1802,6 +1802,11 @@ IsDummyPlacement(ShardPlacement *taskPlacement) void InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId) { + if (!IsShardgroupIDValid(shardgroupId)) + { + elog(ERROR, "cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT, shardgroupId); + } + Datum values[Natts_pg_dist_shardgroup]; bool isNulls[Natts_pg_dist_shardgroup]; @@ -2070,6 +2075,40 @@ DeletePartitionRow(Oid distributedRelationId) } +void +DeleteShardgroupRow(ShardgroupID shardgroupId) +{ + ScanKeyData scanKey[1]; + bool indexOK = true; + + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_shardgroupid, + BTEqualStrategyNumber, F_INT8EQ, ShardgroupIDGetDatum(shardgroupId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup, + DistShardgroupShardgroupIdIndexId(), + indexOK, + NULL, + lengthof(scanKey), + scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shardgroup " + SHARDGROUPID_FORMAT, shardgroupId))); + } + + simple_heap_delete(pgDistShardgroup, &heapTuple->t_self); + + systable_endscan(scanDescriptor); + + CommandCounterIncrement(); + table_close(pgDistShardgroup, NoLock); +} + + /* * DeleteShardRow opens the shard system catalog, finds the unique row that has * the given shardId, and deletes this row. diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 417640293..b40b6cf39 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -129,6 +129,8 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList, char distributionMethod, int shardCount, uint32 colocationId); +static void PopulateShardgroupIds(List *shardGroupSplitIntervalListList); +static void InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreatePartitioningHierarchyForBlockingSplit( @@ -157,6 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 static uint64 GetNextShardIdForSplitChild(void); static void AcquireNonblockingSplitLock(Oid relationId); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); +static void DropShardgroupListMetadata(List *shardIntervalList); static void DropShardListMetadata(List *shardIntervalList); /* Customize error message strings based on operation type */ @@ -613,8 +616,15 @@ BlockingShardSplit(SplitOperation splitOperation, InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList); + DropShardgroupListMetadata(sourceColocatedShardIntervalList); + DropShardListMetadata(sourceColocatedShardIntervalList); + /* allocate and assign new shardgroups to newly created shardIntervals */ + PopulateShardgroupIds(shardGroupSplitIntervalListList); + + InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList); + /* Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1073,6 +1083,13 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, for (int index = 0; index < shardIntervalCount; index++) { ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); + /* + * This will get populated later when we have a list of all new colocated + * shardIntervals as to make sure that all colocated shardIntervals get the same + * shardgroupId. + */ + splitChildShardInterval->shardgroupId = InvalidShardgroupID; + splitChildShardInterval->shardIndex = -1; splitChildShardInterval->shardId = GetNextShardIdForSplitChild(); @@ -1148,6 +1165,74 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, } +static void +PopulateShardgroupIds(List *shardGroupSplitIntervalListList) +{ + List *firstShardIntervals = NULL; + List *currentShardIntervals = NULL; + foreach_ptr(currentShardIntervals, shardGroupSplitIntervalListList) + { + if (!firstShardIntervals) + { + /* on the first loop we assign new shardgroupId's */ + firstShardIntervals = currentShardIntervals; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, firstShardIntervals) + { + ShardgroupID newShardgroupId = GetNextShardgroupId(); + shardInterval->shardgroupId = newShardgroupId; + } + } + else + { + /* on subsequent loops we assign the same shardgroupId for colocation */ + ShardInterval *firstShardInterval = NULL; + ShardInterval *currentShardInterval = NULL; + forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval, currentShardIntervals) + { + currentShardInterval->shardgroupId = firstShardInterval->shardgroupId; + } + } + } +} + + +static void +InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList) +{ + if (list_length(shardGroupSplitIntervalListList) == 0) + { + /* no shardintervals means no shardgroups to insert */ + return; + } + + /* take the first list of shardintervals to get to a table */ + List *shardGroupSplitIntervalList = linitial(shardGroupSplitIntervalListList); + if (list_length(shardGroupSplitIntervalList) == 0) + { + /* no shardintervals means no shardgroups to insert */ + return; + } + + ShardInterval *shardInterval = linitial(shardGroupSplitIntervalList); + + uint32 colocationId = TableColocationId(shardInterval->relationId); + + foreach_ptr(shardInterval, shardGroupSplitIntervalList) + { + InsertShardgroupRow(shardInterval->shardgroupId, colocationId); + } + + /* send commands to synced nodes one by one */ + List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId, shardGroupSplitIntervalList); + char *command = NULL; + foreach_ptr(command, splitOffShardMetadataCommandList) + { + SendCommandToWorkersWithMetadataViaSuperUser(command); + } +} + + /* * Insert new shard and placement metadata. * Sync the Metadata with all nodes if enabled. @@ -1178,7 +1263,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->storageType, IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->maxValue)), - InvalidShardgroupID); + shardInterval->shardgroupId); InsertShardPlacementRow( shardInterval->shardId, @@ -1294,6 +1379,34 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, } +static void +DropShardgroupListMetadata(List *shardIntervalList) +{ + List *syncedShardIntervalList = NIL; + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + DeleteShardgroupRow(shardInterval->shardgroupId); + + Oid relationId = shardInterval->relationId; + /* delete metadata from synced nodes */ + if (ShouldSyncTableMetadata(relationId)) + { + syncedShardIntervalList = lappend(syncedShardIntervalList, shardInterval); + } + } + + /* delete metadata from all workers with metadata available */ + List *commands = ShardgroupListDeleteCommand(syncedShardIntervalList); + char *command = NULL; + foreach_ptr(command, commands) + { + SendCommandToWorkersWithMetadataViaSuperUser(command); + } +} + + /* * DropShardListMetadata drops shard metadata from both the coordinator and * mx nodes. @@ -1567,6 +1680,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, DropShardListMetadata(sourceColocatedShardIntervalList); + DropShardgroupListMetadata(sourceColocatedShardIntervalList); + /* * 11) In case of create_distributed_table_concurrently, which converts * a Citus local table to a distributed table, update the distributed @@ -1600,6 +1715,11 @@ NonBlockingShardSplit(SplitOperation splitOperation, targetColocationId); } + /* allocate and assign new shardgroups to newly created shardIntervals */ + PopulateShardgroupIds(shardGroupSplitIntervalListList); + + InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList); + /* 12) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); diff --git a/src/backend/distributed/sql/feature/shardgroup/downgrade.sql b/src/backend/distributed/sql/feature/shardgroup/downgrade.sql index 94782b12c..b8865615d 100644 --- a/src/backend/distributed/sql/feature/shardgroup/downgrade.sql +++ b/src/backend/distributed/sql/feature/shardgroup/downgrade.sql @@ -5,5 +5,7 @@ DROP TABLE pg_dist_shardgroup; ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid; DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer); +DROP FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint); + DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint); -#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql" \ No newline at end of file +#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql" diff --git a/src/backend/distributed/sql/feature/shardgroup/upgrade.sql b/src/backend/distributed/sql/feature/shardgroup/upgrade.sql index 1dff2e886..054c1afdb 100644 --- a/src/backend/distributed/sql/feature/shardgroup/upgrade.sql +++ b/src/backend/distributed/sql/feature/shardgroup/upgrade.sql @@ -12,4 +12,4 @@ ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog; DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); #include "../../udfs/citus_internal_add_shard_metadata/12.2-1.sql" #include "../../udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql" - +#include "../../udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql new file mode 100644 index 000000000..7e670b024 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(shardgroupid bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint) IS + 'Deletes rows from pg_dist_shardgroup'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql new file mode 100644 index 000000000..7e670b024 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(shardgroupid bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint) IS + 'Deletes rows from pg_dist_shardgroup'; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6d4c32cdb..dd43be936 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -264,6 +264,7 @@ extern Oid DistBackgroundTaskDependTaskIdIndexId(void); extern Oid DistBackgroundTaskDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); +extern Oid DistShardgroupShardgroupIdIndexId(void); extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementPlacementidIndexId(void); extern Oid DistColocationIndexId(void); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 7d3861715..aa52ecfc7 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -99,6 +99,8 @@ extern char * DistributionDeleteMetadataCommand(Oid relationId); extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); char * NodeListIdempotentInsertCommand(List *workerNodeList); +extern List * ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals); +extern List * ShardgroupListDeleteCommand(List *shardIntervalList); extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index c20dbd6cd..c2146c77e 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -356,6 +356,7 @@ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, ShardgroupID shardgroupId); extern void InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId); extern void DeleteShardRow(uint64 shardId); +extern void DeleteShardgroupRow(ShardgroupID shardgroupId); extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, uint64 shardLength, @@ -375,7 +376,6 @@ extern void UpdateNoneDistTableMetadataGlobally(Oid relationId, char replication uint32 colocationId, bool autoConverted); extern void UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 colocationId, bool autoConverted); -extern void DeleteShardRow(uint64 shardId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRowGlobally(uint64 placementId); extern void DeleteShardPlacementRow(uint64 placementId); diff --git a/src/include/distributed/shardgroup.h b/src/include/distributed/shardgroup.h index 652389479..3ba24c546 100644 --- a/src/include/distributed/shardgroup.h +++ b/src/include/distributed/shardgroup.h @@ -22,6 +22,7 @@ #include "fmgr.h" typedef int64 ShardgroupID; +#define SHARDGROUPID_FORMAT INT64_FORMAT #define InvalidShardgroupID ((ShardgroupID) 0) #define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID)