implement shardgroups when splitting shards

feature/shardgroup
Nils Dijk 2023-10-27 14:20:16 +00:00
parent f4fccf0a78
commit 6481f7dcd3
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
12 changed files with 286 additions and 9 deletions

View File

@ -199,6 +199,7 @@ typedef struct MetadataCacheData
Oid distPartitionColocationidIndexId;
Oid distShardLogicalRelidIndexId;
Oid distShardShardidIndexId;
Oid distShardgroupShardgroupidIndexId;
Oid distPlacementShardidIndexId;
Oid distPlacementPlacementidIndexId;
Oid distColocationidIndexId;
@ -3002,6 +3003,16 @@ DistShardShardidIndexId(void)
}
Oid
DistShardgroupShardgroupIdIndexId(void)
{
CachedRelationLookup("pg_dist_shardgroup_pkey",
&MetadataCache.distShardgroupShardgroupidIndexId);
return MetadataCache.distShardgroupShardgroupidIndexId;
}
/* return oid of pg_dist_placement_shardid_index */
Oid
DistPlacementShardidIndexId(void)

View File

@ -174,12 +174,13 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shardgroup_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
@ -1189,6 +1190,72 @@ TableOwnerResetCommand(Oid relationId)
}
List *
ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals)
{
StringInfo insertShardgroupsCommand = makeStringInfo();
appendStringInfo(insertShardgroupsCommand,
"WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES ");
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervals)
{
appendStringInfo(insertShardgroupsCommand,
"(" SHARDGROUPID_FORMAT "::bigint, %u)",
shardInterval->shardgroupId, colocationId);
if (llast(shardIntervals) != shardInterval)
{
appendStringInfo(insertShardgroupsCommand, ", ");
}
}
appendStringInfo(insertShardgroupsCommand, ") ");
appendStringInfo(insertShardgroupsCommand,
"SELECT pg_catalog.citus_internal_add_shardgroup_metadata(shardgroupid, "
"colocationid) FROM shardgroup_data;");
return list_make1(insertShardgroupsCommand->data);
}
List *
ShardgroupListDeleteCommand(List *shardIntervalList)
{
if (list_length(shardIntervalList) == 0)
{
return NIL;
}
StringInfo deleteShardgroupsCommand = makeStringInfo();
appendStringInfo(deleteShardgroupsCommand,
"WITH shardgroup_data(shardgroupid) AS (VALUES ");
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
appendStringInfo(deleteShardgroupsCommand,
"(" SHARDGROUPID_FORMAT "::bigint)",
shardInterval->shardgroupId);
if (llast(shardIntervalList) != shardInterval)
{
appendStringInfo(deleteShardgroupsCommand, ", ");
}
}
appendStringInfo(deleteShardgroupsCommand, ") ");
appendStringInfo(deleteShardgroupsCommand,
"SELECT pg_catalog.citus_internal_delete_shardgroup_metadata("
"shardgroupid) FROM shardgroup_data;");
return list_make1(deleteShardgroupsCommand->data);
}
/*
* ShardListInsertCommand generates a single command that can be
* executed to replicate shard and shard placement metadata for the
@ -1285,7 +1352,8 @@ ShardListInsertCommand(List *shardIntervalList)
}
appendStringInfo(insertShardCommand,
"(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld::bigint)",
"(%s::regclass, %ld, '%c'::\"char\", %s, %s, "
SHARDGROUPID_FORMAT "::bigint)",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
@ -3382,7 +3450,7 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS)
EnsureSuperUser();
PG_ENSURE_ARGNOTNULL(0, "shardgroupid");
ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(0);
ShardgroupID shardgroupId = PG_GETARG_SHARDGROUPID(0);
PG_ENSURE_ARGNOTNULL(1, "colocationid");
uint32 colocationId = PG_GETARG_UINT32(1);
@ -3393,7 +3461,28 @@ citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS)
EnsureCoordinatorInitiatedOperation();
}
InsertShardgroupRow(shardgroupID, colocationId);
InsertShardgroupRow(shardgroupId, colocationId);
PG_RETURN_VOID();
}
Datum
citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
PG_ENSURE_ARGNOTNULL(0, "shardgroupid");
ShardgroupID shardgroupId = PG_GETARG_SHARDGROUPID(0);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}
DeleteShardgroupRow(shardgroupId);
PG_RETURN_VOID();
}
@ -4152,7 +4241,7 @@ ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 col
}
ShardgroupID shardgroupId = shardgroupIDs[i];
appendStringInfo(&buf, "(%ld::bigint, %u)",
appendStringInfo(&buf, "(" SHARDGROUPID_FORMAT "::bigint, %u)",
shardgroupId,
colocationId);
}

View File

@ -1802,6 +1802,11 @@ IsDummyPlacement(ShardPlacement *taskPlacement)
void
InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId)
{
if (!IsShardgroupIDValid(shardgroupId))
{
elog(ERROR, "cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT, shardgroupId);
}
Datum values[Natts_pg_dist_shardgroup];
bool isNulls[Natts_pg_dist_shardgroup];
@ -2070,6 +2075,40 @@ DeletePartitionRow(Oid distributedRelationId)
}
void
DeleteShardgroupRow(ShardgroupID shardgroupId)
{
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_shardgroupid,
BTEqualStrategyNumber, F_INT8EQ, ShardgroupIDGetDatum(shardgroupId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup,
DistShardgroupShardgroupIdIndexId(),
indexOK,
NULL,
lengthof(scanKey),
scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shardgroup "
SHARDGROUPID_FORMAT, shardgroupId)));
}
simple_heap_delete(pgDistShardgroup, &heapTuple->t_self);
systable_endscan(scanDescriptor);
CommandCounterIncrement();
table_close(pgDistShardgroup, NoLock);
}
/*
* DeleteShardRow opens the shard system catalog, finds the unique row that has
* the given shardId, and deletes this row.

View File

@ -129,6 +129,8 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
char distributionMethod,
int shardCount,
uint32 colocationId);
static void PopulateShardgroupIds(List *shardGroupSplitIntervalListList);
static void InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreatePartitioningHierarchyForBlockingSplit(
@ -157,6 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
static uint64 GetNextShardIdForSplitChild(void);
static void AcquireNonblockingSplitLock(Oid relationId);
static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList);
static void DropShardgroupListMetadata(List *shardIntervalList);
static void DropShardListMetadata(List *shardIntervalList);
/* Customize error message strings based on operation type */
@ -613,8 +616,15 @@ BlockingShardSplit(SplitOperation splitOperation,
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
DropShardgroupListMetadata(sourceColocatedShardIntervalList);
DropShardListMetadata(sourceColocatedShardIntervalList);
/* allocate and assign new shardgroups to newly created shardIntervals */
PopulateShardgroupIds(shardGroupSplitIntervalListList);
InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1073,6 +1083,13 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
for (int index = 0; index < shardIntervalCount; index++)
{
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
/*
* This will get populated later when we have a list of all new colocated
* shardIntervals as to make sure that all colocated shardIntervals get the same
* shardgroupId.
*/
splitChildShardInterval->shardgroupId = InvalidShardgroupID;
splitChildShardInterval->shardIndex = -1;
splitChildShardInterval->shardId = GetNextShardIdForSplitChild();
@ -1148,6 +1165,74 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
}
static void
PopulateShardgroupIds(List *shardGroupSplitIntervalListList)
{
List *firstShardIntervals = NULL;
List *currentShardIntervals = NULL;
foreach_ptr(currentShardIntervals, shardGroupSplitIntervalListList)
{
if (!firstShardIntervals)
{
/* on the first loop we assign new shardgroupId's */
firstShardIntervals = currentShardIntervals;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, firstShardIntervals)
{
ShardgroupID newShardgroupId = GetNextShardgroupId();
shardInterval->shardgroupId = newShardgroupId;
}
}
else
{
/* on subsequent loops we assign the same shardgroupId for colocation */
ShardInterval *firstShardInterval = NULL;
ShardInterval *currentShardInterval = NULL;
forboth_ptr(firstShardInterval, firstShardIntervals, currentShardInterval, currentShardIntervals)
{
currentShardInterval->shardgroupId = firstShardInterval->shardgroupId;
}
}
}
}
static void
InsertSplitChildrenShardgroupMetadata(List *shardGroupSplitIntervalListList)
{
if (list_length(shardGroupSplitIntervalListList) == 0)
{
/* no shardintervals means no shardgroups to insert */
return;
}
/* take the first list of shardintervals to get to a table */
List *shardGroupSplitIntervalList = linitial(shardGroupSplitIntervalListList);
if (list_length(shardGroupSplitIntervalList) == 0)
{
/* no shardintervals means no shardgroups to insert */
return;
}
ShardInterval *shardInterval = linitial(shardGroupSplitIntervalList);
uint32 colocationId = TableColocationId(shardInterval->relationId);
foreach_ptr(shardInterval, shardGroupSplitIntervalList)
{
InsertShardgroupRow(shardInterval->shardgroupId, colocationId);
}
/* send commands to synced nodes one by one */
List *splitOffShardMetadataCommandList = ShardgroupListInsertCommand(colocationId, shardGroupSplitIntervalList);
char *command = NULL;
foreach_ptr(command, splitOffShardMetadataCommandList)
{
SendCommandToWorkersWithMetadataViaSuperUser(command);
}
}
/*
* Insert new shard and placement metadata.
* Sync the Metadata with all nodes if enabled.
@ -1178,7 +1263,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
shardInterval->storageType,
IntegerToText(DatumGetInt32(shardInterval->minValue)),
IntegerToText(DatumGetInt32(shardInterval->maxValue)),
InvalidShardgroupID);
shardInterval->shardgroupId);
InsertShardPlacementRow(
shardInterval->shardId,
@ -1294,6 +1379,34 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
}
static void
DropShardgroupListMetadata(List *shardIntervalList)
{
List *syncedShardIntervalList = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
DeleteShardgroupRow(shardInterval->shardgroupId);
Oid relationId = shardInterval->relationId;
/* delete metadata from synced nodes */
if (ShouldSyncTableMetadata(relationId))
{
syncedShardIntervalList = lappend(syncedShardIntervalList, shardInterval);
}
}
/* delete metadata from all workers with metadata available */
List *commands = ShardgroupListDeleteCommand(syncedShardIntervalList);
char *command = NULL;
foreach_ptr(command, commands)
{
SendCommandToWorkersWithMetadataViaSuperUser(command);
}
}
/*
* DropShardListMetadata drops shard metadata from both the coordinator and
* mx nodes.
@ -1567,6 +1680,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DropShardListMetadata(sourceColocatedShardIntervalList);
DropShardgroupListMetadata(sourceColocatedShardIntervalList);
/*
* 11) In case of create_distributed_table_concurrently, which converts
* a Citus local table to a distributed table, update the distributed
@ -1600,6 +1715,11 @@ NonBlockingShardSplit(SplitOperation splitOperation,
targetColocationId);
}
/* allocate and assign new shardgroups to newly created shardIntervals */
PopulateShardgroupIds(shardGroupSplitIntervalListList);
InsertSplitChildrenShardgroupMetadata(shardGroupSplitIntervalListList);
/* 12) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);

View File

@ -5,5 +5,7 @@ DROP TABLE pg_dist_shardgroup;
ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid;
DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer);
DROP FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint);
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint);
#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql"
#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql"

View File

@ -12,4 +12,4 @@ ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog;
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
#include "../../udfs/citus_internal_add_shard_metadata/12.2-1.sql"
#include "../../udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql"
#include "../../udfs/citus_internal_delete_shardgroup_metadata/12.2-1.sql"

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(shardgroupid bigint)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint) IS
'Deletes rows from pg_dist_shardgroup';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(shardgroupid bigint)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint) IS
'Deletes rows from pg_dist_shardgroup';

View File

@ -264,6 +264,7 @@ extern Oid DistBackgroundTaskDependTaskIdIndexId(void);
extern Oid DistBackgroundTaskDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardgroupShardgroupIdIndexId(void);
extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistColocationIndexId(void);

View File

@ -99,6 +99,8 @@ extern char * DistributionDeleteMetadataCommand(Oid relationId);
extern char * TableOwnerResetCommand(Oid distributedRelationId);
extern char * NodeListInsertCommand(List *workerNodeList);
char * NodeListIdempotentInsertCommand(List *workerNodeList);
extern List * ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals);
extern List * ShardgroupListDeleteCommand(List *shardIntervalList);
extern List * ShardListInsertCommand(List *shardIntervalList);
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
extern char * NodeDeleteCommand(uint32 nodeId);

View File

@ -356,6 +356,7 @@ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
ShardgroupID shardgroupId);
extern void InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId);
extern void DeleteShardRow(uint64 shardId);
extern void DeleteShardgroupRow(ShardgroupID shardgroupId);
extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId,
uint64 placementId,
uint64 shardLength,
@ -375,7 +376,6 @@ extern void UpdateNoneDistTableMetadataGlobally(Oid relationId, char replication
uint32 colocationId, bool autoConverted);
extern void UpdateNoneDistTableMetadata(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRowGlobally(uint64 placementId);
extern void DeleteShardPlacementRow(uint64 placementId);

View File

@ -22,6 +22,7 @@
#include "fmgr.h"
typedef int64 ShardgroupID;
#define SHARDGROUPID_FORMAT INT64_FORMAT
#define InvalidShardgroupID ((ShardgroupID) 0)
#define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID)