backup/feature/shardgroup
Nils Dijk 2022-12-02 16:58:39 +01:00
parent 9cacbeab7e
commit e83c0071f4
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
5 changed files with 218 additions and 63 deletions

View File

@ -1947,6 +1947,44 @@ DeletePartitionRow(Oid distributedRelationId)
}
void
DeleteShardgroupRow(uint32 shardgroupId)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup,
DistShardShardidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
Oid distributedRelationId = pgDistShardForm->logicalrelid;
simple_heap_delete(pgDistShardgroup, &heapTuple->t_self);
systable_endscan(scanDescriptor);
/* invalidate previous cache entry */
CitusInvalidateRelcacheByRelid(distributedRelationId);
CommandCounterIncrement();
table_close(pgDistShardgroup, NoLock);
}
/*
* DeleteShardRow opens the shard system catalog, finds the unique row that has
* the given shardId, and deletes this row.

View File

@ -92,19 +92,22 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitInterval
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
List *splitPointsForShard);
static void CreateSplitIntervalsForShard(ShardInterval *sourceShard,
List *splitPointsForShard,
List **shardSplitChildrenIntervalList);
static List * CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
List *splitPointsForShard);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
List *newShardgroups);
static List * CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard,
List *newShardgroups);
static void BlockingShardSplit(SplitOperation splitOperation,
uint64 splitWorkflowId,
uint32 colocationId,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
DistributionColumnMap *distributionColumnOverrides);
static void NonBlockingShardSplit(SplitOperation splitOperation,
uint64 splitWorkflowId,
uint32 colocationId,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
@ -127,6 +130,7 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
char distributionMethod,
int shardCount,
uint32 colocationId);
static void InsertSplitChildrenShardgroupMetadata(List *newShardgroups);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreatePartitioningHierarchyForBlockingSplit(
@ -155,6 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
static uint64 GetNextShardIdForSplitChild(void);
static void AcquireNonblockingSplitLock(Oid relationId);
static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList);
static void DropShardgroupMetadata(uint32 shardgroupId);
static void DropShardListMetadata(List *shardIntervalList);
/* Customize error message strings based on operation type */
@ -493,6 +498,10 @@ SplitShard(SplitMode splitMode,
{
sourceColocatedShardIntervalList = colocatedShardIntervalList;
}
/* Find the colocationId we are splitting in */
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId);
uint32 colocationId = citusTableCacheEntry->colocationId;
DropOrphanedResourcesInSeparateTransaction();
@ -509,6 +518,7 @@ SplitShard(SplitMode splitMode,
BlockingShardSplit(
splitOperation,
splitWorkflowId,
colocationId,
sourceColocatedShardIntervalList,
shardSplitPointsList,
workersForPlacementList,
@ -521,6 +531,7 @@ SplitShard(SplitMode splitMode,
NonBlockingShardSplit(
splitOperation,
splitWorkflowId,
colocationId,
sourceColocatedShardIntervalList,
shardSplitPointsList,
workersForPlacementList,
@ -549,6 +560,7 @@ SplitShard(SplitMode splitMode,
static void
BlockingShardSplit(SplitOperation splitOperation,
uint64 splitWorkflowId,
uint32 colocationId,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
@ -558,10 +570,17 @@ BlockingShardSplit(SplitOperation splitOperation,
BlockWritesToShardList(sourceColocatedShardIntervalList);
ShardInterval *sampleShardInterval =
(ShardInterval *) linitial(sourceColocatedShardIntervalList);
List *shardgroupSplits = CreateNewShardgroups(
colocationId,
sampleShardInterval,
shardSplitPointsList);
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
sourceColocatedShardIntervalList,
shardSplitPointsList);
shardgroupSplits);
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList);
@ -611,9 +630,10 @@ BlockingShardSplit(SplitOperation splitOperation,
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
DropShardgroupMetadata(sampleShardInterval->shardGroupId);
DropShardListMetadata(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardgroupMetadata(shardgroupSplits);
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1010,28 +1030,82 @@ CreateObjectOnPlacement(List *objectCreationCommandList,
/*
* Create split children intervals for a shardgroup given list of split points.
* CreateNewShardgroups returns new shardgroup structs given a list of splitpoints
* Example:
* 'sourceColocatedShardIntervalList': Colocated shard S1[-2147483648, 2147483647] & S2[-2147483648, 2147483647]
* 'sourceColocatedShardIntervalList': sampleInterval S1[-2147483648, 2147483647]
* 'splitPointsForShard': [0] (2 way split)
* 'shardGroupSplitIntervalListList':
* [
* [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ], // Split Interval List for S1.
* [ S2_1(-2147483648, 0), S2_2(1, 2147483647) ] // Split Interval List for S2.
* ]
* [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ]
* TODO fix comment above to reflect shardgroups instead of colocated tables
*/
static List *
CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
List *splitPointsForShard)
{
int32 splitParentMaxValue = DatumGetInt32(sampleInterval->maxValue);
int32 currentSplitChildMinValue = DatumGetInt32(sampleInterval->minValue);
/* if we are splitting a Citus local table, assume whole shard range */
/* TODO before splitting a local table we should assign it the hashrange correctly */
if (!sampleInterval->maxValueExists)
{
splitParentMaxValue = PG_INT32_MAX;
}
if (!sampleInterval->minValueExists)
{
currentSplitChildMinValue = PG_INT32_MIN;
}
List *newShardgroups = NIL;
ListCell *splitPointCell = NULL;
foreach(splitPointCell, splitPointsForShard)
{
Datum splitPoint = (Datum) lfirst(splitPointCell);
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitPoint;
/* increment for next shardgroup */
currentSplitChildMinValue = Int32GetDatum(DatumGetInt32(splitPoint) + 1);
newShardgroups = lappend(newShardgroups, shardgroup);
}
/*
* We have added ranges for start of sampleInterval till the last entry in the
* splitpoints. Now we need to add one more from the last splitpoint till the end to
* complete all splits.
*/
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitParentMaxValue;
newShardgroups = lappend(newShardgroups, shardgroup);
return newShardgroups;
}
/*
* Create split children intervals for a shardgroup given list of split points.
*/
static List *
CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
List *splitPointsForShard)
List *newShardgroups)
{
List *shardGroupSplitIntervalListList = NIL;
ShardInterval *shardToSplitInterval = NULL;
foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList)
{
List *shardSplitIntervalList = NIL;
CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard,
&shardSplitIntervalList);
List *shardSplitIntervalList =
CreateShardIntervalsForNewShardgroups(shardToSplitInterval,
newShardgroups);
shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList,
shardSplitIntervalList);
@ -1045,58 +1119,31 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
* Create split children intervals given a sourceshard and a list of split points.
* Example: SourceShard is range [0, 100] and SplitPoints are (15, 30) will give us:
* [(0, 15) (16, 30) (31, 100)]
* TODO Fix comment
*/
static void
CreateSplitIntervalsForShard(ShardInterval *sourceShard,
List *splitPointsForShard,
List **shardSplitChildrenIntervalList)
static List *
CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard,
List *newShardgroups)
{
/* For 'N' split points, we will have N+1 shard intervals created. */
int shardIntervalCount = list_length(splitPointsForShard) + 1;
ListCell *splitPointCell = list_head(splitPointsForShard);
int32 splitParentMaxValue = DatumGetInt32(sourceShard->maxValue);
int32 currentSplitChildMinValue = DatumGetInt32(sourceShard->minValue);
List *shardSplitChildrenIntervalList = NIL;
/* if we are splitting a Citus local table, assume whole shard range */
if (!sourceShard->maxValueExists)
{
splitParentMaxValue = PG_INT32_MAX;
}
if (!sourceShard->minValueExists)
{
currentSplitChildMinValue = PG_INT32_MIN;
}
for (int index = 0; index < shardIntervalCount; index++)
Shardgroup *shardgroup = NULL;
foreach_ptr(shardgroup, newShardgroups)
{
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
splitChildShardInterval->shardIndex = -1;
splitChildShardInterval->shardId = GetNextShardIdForSplitChild();
/* TODO find shardgroup id */
splitChildShardInterval->shardGroupId = shardgroup->shardgroupId;
splitChildShardInterval->minValueExists = true;
splitChildShardInterval->minValue = currentSplitChildMinValue;
splitChildShardInterval->minValue = shardgroup->minShardValue;
splitChildShardInterval->maxValueExists = true;
splitChildShardInterval->maxValue = shardgroup->maxShardValue;
/* Length of splitPointsForShard is one less than 'shardIntervalCount' and we need to account */
/* for 'splitPointCell' being NULL for last iteration. */
if (splitPointCell)
{
splitChildShardInterval->maxValue = DatumGetInt32((Datum) lfirst(
splitPointCell));
splitPointCell = lnext(splitPointsForShard, splitPointCell);
}
else
{
splitChildShardInterval->maxValue = splitParentMaxValue;
}
currentSplitChildMinValue = splitChildShardInterval->maxValue + 1;
*shardSplitChildrenIntervalList = lappend(*shardSplitChildrenIntervalList,
splitChildShardInterval);
shardSplitChildrenIntervalList = lappend(shardSplitChildrenIntervalList,
splitChildShardInterval);
}
return shardSplitChildrenIntervalList;
}
@ -1148,6 +1195,23 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
}
static void
InsertSplitChildrenShardgroupMetadata(List *newShardgroups)
{
Shardgroup *shardgroup = NULL;
foreach_ptr(shardgroup, newShardgroups)
{
InsertShardGroupRow(
shardgroup->shardgroupId,
shardgroup->colocationId,
IntegerToText(DatumGetInt32(shardgroup->minShardValue)),
IntegerToText(DatumGetInt32(shardgroup->maxShardValue)));
}
/* TODO sync new shardgroups to workers */
}
/*
* Insert new shard and placement metadata.
* Sync the Metadata with all nodes if enabled.
@ -1178,7 +1242,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
shardInterval->storageType,
IntegerToText(DatumGetInt32(shardInterval->minValue)),
IntegerToText(DatumGetInt32(shardInterval->maxValue)),
NULL);
&shardInterval->shardGroupId);
InsertShardPlacementRow(
shardInterval->shardId,
@ -1294,6 +1358,13 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
}
static void
DropShardgroupMetadata(uint32 shardgroupId)
{
DeleteShardgroupRow(shardgroupId);
}
/*
* DropShardListMetadata drops shard metadata from both the coordinator and
* mx nodes.
@ -1383,6 +1454,7 @@ AcquireNonblockingSplitLock(Oid relationId)
void
NonBlockingShardSplit(SplitOperation splitOperation,
uint64 splitWorkflowId,
uint32 colocationId,
List *sourceColocatedShardIntervalList,
List *shardSplitPointsList,
List *workersForPlacementList,
@ -1396,10 +1468,15 @@ NonBlockingShardSplit(SplitOperation splitOperation,
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
List *shardgroupSplits = CreateNewShardgroups(
colocationId,
(ShardInterval *) linitial(sourceColocatedShardIntervalList),
shardSplitPointsList);
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
sourceColocatedShardIntervalList,
shardSplitPointsList);
shardgroupSplits);
ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList);
@ -1598,9 +1675,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
distributionMethod,
shardCount,
targetColocationId);
}
/* 12) Insert new shard and placement metdata */
/* 12) Insert new shardgroup, shard and placement metadata */
InsertSplitChildrenShardgroupMetadata(shardgroupSplits);
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);

View File

@ -184,7 +184,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++;
}
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, NULL);
CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId);
uint64 shardgroupId = shardId;
InsertShardGroupRow(shardgroupId, tableEntry->colocationId,
nullMinValue, nullMaxValue);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, &shardgroupId);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor);

View File

@ -71,6 +71,15 @@ typedef struct ShardInterval
} ShardInterval;
typedef struct Shardgroup
{
uint64 shardgroupId;
uint32 colocationId;
Datum minShardValue; /* a shard's typed min value datum */
Datum maxShardValue; /* a shard's typed max value datum */
} Shardgroup;
/* In-memory representation of a tuple in pg_dist_placement. */
typedef struct GroupShardPlacement
{
@ -323,6 +332,7 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe
extern void UpdateDistributionColumn(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardgroupRow(uint32 shardgroupId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);

View File

@ -147,21 +147,29 @@ SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
COPY events FROM STDIN WITH CSV;
"(1,1001)",20001,click,1472807012
@ -191,21 +199,29 @@ SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
COPY users FROM STDIN WITH CSV;
"(1,1001)",1472807115
@ -274,21 +290,29 @@ SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = 1, shardmaxvalue = 5986
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
UPDATE pg_dist_shardgroup SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id);
\set lineitem_1_data_file :abs_srcdir '/data/lineitem.1.data'
\set client_side_copy_command '\\copy lineitem_subquery FROM ' :'lineitem_1_data_file' ' with delimiter '''|''';'