implement splits when a target colocation has been set

backup/feature/shardgroup
Nils Dijk 2022-12-13 14:19:24 +01:00
parent e0d3fd3d72
commit 7a818b78fb
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
12 changed files with 176 additions and 29 deletions

View File

@ -1406,6 +1406,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
char replicationModel = REPLICATION_MODEL_STREAMING;
uint32 colocationId = INVALID_COLOCATION_ID;
int64 shardgroupId = INVALID_SHARDGROUP_ID;
Var *distributionColumn = NULL;
InsertIntoPgDistPartition(citusLocalTableId, distributionMethod,
distributionColumn, colocationId,
@ -1417,7 +1418,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
text *shardMinValue = NULL;
text *shardMaxValue = NULL;
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue, NULL);
shardMinValue, shardMaxValue, &shardgroupId);
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());

View File

@ -2885,6 +2885,14 @@ DistShardgroupPkeyId(void)
}
Oid
DistShardgroupColocaionidIndexId(void)
{
/* TODO add index on colocation id */
return InvalidOid;
}
/* return oid of pg_dist_placement_shardid_index */
Oid
DistPlacementShardidIndexId(void)

View File

@ -1409,10 +1409,10 @@ ShardgroupInsertCommand(List *shardgroups)
appendStringInfo(&command, "(%ld, %d, %s, %s)",
shardgroup->shardgroupId,
shardgroup->colocationId,
TextToSQLLiteral(IntegerToText(
DatumGetInt32(shardgroup->minShardValue))),
TextToSQLLiteral(IntegerToText(
DatumGetInt32(shardgroup->maxShardValue))));
shardgroup->minShardValue == NULL ? "NULL" : quote_literal_cstr(
shardgroup->minShardValue),
shardgroup->maxShardValue == NULL ? "NULL" : quote_literal_cstr(
shardgroup->maxShardValue));
}
appendStringInfo(&command, ") ");

View File

@ -494,14 +494,21 @@ CheckHashPartitionedTable(Oid distributedTableId)
}
/* Helper function to convert an integer value to a text type */
text *
IntegerToText(int32 value)
char *
IntegerToCStr(int32 value)
{
StringInfo valueString = makeStringInfo();
appendStringInfo(valueString, "%d", value);
text *valueText = cstring_to_text(valueString->data);
return valueString->data;
}
/* Helper function to convert an integer value to a text type */
text *
IntegerToText(int32 value)
{
text *valueText = cstring_to_text(IntegerToCStr(value));
return valueText;
}

View File

@ -44,6 +44,7 @@
#include "distributed/shardsplit_logical_replication.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shardgroup.h"
#include "postmaster/postmaster.h"
/*
@ -129,7 +130,8 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
DistributionColumnMap *distCols,
char distributionMethod,
int shardCount,
uint32 colocationId);
uint32 colocationId,
List **shardgroupSplits);
static void InsertSplitChildrenShardgroupMetadata(List *newShardgroups);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
@ -1066,11 +1068,11 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitPoint;
shardgroup->minShardValue = IntegerToCStr(currentSplitChildMinValue);
shardgroup->maxShardValue = IntegerToCStr(DatumGetInt32(splitPoint));
/* increment for next shardgroup */
currentSplitChildMinValue = Int32GetDatum(DatumGetInt32(splitPoint) + 1);
currentSplitChildMinValue = DatumGetInt32(splitPoint) + 1;
newShardgroups = lappend(newShardgroups, shardgroup);
}
@ -1083,8 +1085,8 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitParentMaxValue;
shardgroup->minShardValue = IntegerToCStr(currentSplitChildMinValue);
shardgroup->maxShardValue = IntegerToCStr(DatumGetInt32(splitParentMaxValue));
newShardgroups = lappend(newShardgroups, shardgroup);
return newShardgroups;
@ -1135,9 +1137,12 @@ CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard,
splitChildShardInterval->shardId = GetNextShardIdForSplitChild();
splitChildShardInterval->shardGroupId = shardgroup->shardgroupId;
splitChildShardInterval->minValueExists = true;
splitChildShardInterval->minValue = shardgroup->minShardValue;
splitChildShardInterval->minValue =
Int32GetDatum(pg_strtoint32(shardgroup->minShardValue));
splitChildShardInterval->maxValueExists = true;
splitChildShardInterval->maxValue = shardgroup->maxShardValue;
splitChildShardInterval->maxValue =
Int32GetDatum(pg_strtoint32(shardgroup->maxShardValue));
shardSplitChildrenIntervalList = lappend(shardSplitChildrenIntervalList,
splitChildShardInterval);
@ -1165,8 +1170,25 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
DistributionColumnMap *distributionColumnMap,
char distributionMethod,
int shardCount,
uint32 colocationId)
uint32 colocationId,
List **shardgroupSplits)
{
if (colocationId != INVALID_COLOCATION_ID)
{
/*
* We know we are colocating the shards with already existing shards in their
* respective shardgroups. To prevent the new groups from being inserted we forget
* them here.
*/
*shardgroupSplits = NIL;
/*
* TODO make sure we have either already used the existing shardgroup id's or
* update the newly created shards to have to correct shardgroup id's
*/
}
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, colocatedShardList)
{
@ -1187,6 +1209,13 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
ShardReplicationFactor,
distributionColumn->vartype,
distributionColumn->varcollid);
/* Update the shardgroup splits with the newly assigned colocation id */
Shardgroup *shardgroup = NULL;
foreach_ptr(shardgroup, *shardgroupSplits)
{
shardgroup->colocationId = colocationId;
}
}
UpdateDistributionColumnGlobally(relationId, distributionMethod,
@ -1198,14 +1227,19 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
static void
InsertSplitChildrenShardgroupMetadata(List *newShardgroups)
{
if (list_length(newShardgroups) <= 0)
{
return;
}
Shardgroup *shardgroup = NULL;
foreach_ptr(shardgroup, newShardgroups)
{
InsertShardGroupRow(
shardgroup->shardgroupId,
shardgroup->colocationId,
IntegerToText(DatumGetInt32(shardgroup->minShardValue)),
IntegerToText(DatumGetInt32(shardgroup->maxShardValue)));
cstring_to_text(shardgroup->minShardValue),
cstring_to_text(shardgroup->maxShardValue));
}
if (/* TODO check if shardgroup needs to propagate */ true)
@ -1479,10 +1513,23 @@ NonBlockingShardSplit(SplitOperation splitOperation,
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
List *shardgroupSplits = CreateNewShardgroups(
colocationId,
(ShardInterval *) linitial(sourceColocatedShardIntervalList),
shardSplitPointsList);
List *shardgroupSplits = NIL;
if (targetColocationId == INVALID_COLOCATION_ID)
{
shardgroupSplits = CreateNewShardgroups(
colocationId,
(ShardInterval *) linitial(sourceColocatedShardIntervalList),
shardSplitPointsList);
}
else
{
/*
* We are distributing a new table onto an existing colocation id, load
* shardgroupSplits from there.
*/
shardgroupSplits = ShardgroupForColocationId(targetColocationId);
}
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
@ -1685,7 +1732,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
distributionColumnOverrides,
distributionMethod,
shardCount,
targetColocationId);
targetColocationId,
&shardgroupSplits);
}
/* 12) Insert new shardgroup, shard and placement metadata */
InsertSplitChildrenShardgroupMetadata(shardgroupSplits);

View File

@ -1,7 +1,7 @@
-- citus--11.3-1--11.2-1
-- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now
DROP TABLE pg_catalog.pg_dist_shardgroup;
DROP SEQUENCE citus.pg_dist_shardgroupid_seq;
DROP SEQUENCE pg_catalog.pg_dist_shardgroupid_seq;
ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid;
@ -9,4 +9,4 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch
#include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql"
DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text);
DROP FUNCTION FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int);
DROP FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int);

View File

@ -0,0 +1,68 @@
/*-------------------------------------------------------------------------
*
* shardgroup.c
*
* This file contains functions to perform useful operations on shardgroups.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "utils/fmgroids.h"
#include "access/skey.h"
#include "utils/relcache.h"
#include "storage/lockdefs.h"
#include "utils/builtins.h"
#include "distributed/listutils.h"
#include "distributed/shardgroup.h"
#include "distributed/pg_dist_shardgroup.h"
#include "distributed/metadata_cache.h"
List *
ShardgroupForColocationId(uint32 colocationId)
{
ScanKeyData scanKey[1] = { 0 };
Form_pg_dist_shardgroup shardgroupForm = NULL;
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_colocationid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
SysScanDesc scanDescriptor =
systable_beginscan(pgDistShardgroup, DistShardgroupColocaionidIndexId(), true,
NULL, lengthof(scanKey), scanKey);
HeapTuple heapTuple = NULL;
List *shardgroups = NIL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Datum datumArray[Natts_pg_dist_shardgroup] = { 0 };
bool isNullArray[Natts_pg_dist_shardgroup] = { 0 };
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
shardgroupForm = (Form_pg_dist_shardgroup) GETSTRUCT(heapTuple);
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = shardgroupForm->shardgroupid;
shardgroup->colocationId = shardgroupForm->colocationid;
/* load shard min/max values */
Datum minValueTextDatum = datumArray[Anum_pg_dist_shardgroup_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shardgroup_shardmaxvalue - 1];
shardgroup->minShardValue = TextDatumGetCString(minValueTextDatum);
shardgroup->maxShardValue = TextDatumGetCString(maxValueTextDatum);
shardgroups = lappend(shardgroups, shardgroup);
}
systable_endscan(scanDescriptor);
table_close(pgDistShardgroup, NoLock);
return shardgroups;
}

View File

@ -274,6 +274,7 @@ extern Oid ForeignConstraintGetReferencedTableId(const char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,
char **tableName);
extern char * IntegerToCStr(int32 value);
extern text * IntegerToText(int32 value);
/* Function declarations for generating metadata for shard and placement creation */

View File

@ -253,6 +253,7 @@ extern Oid DistBackgroundTaskDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardgroupPkeyId(void);
extern Oid DistShardgroupColocaionidIndexId(void);
extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistColocationIndexId(void);

View File

@ -75,8 +75,8 @@ typedef struct Shardgroup
{
int64 shardgroupId;
uint32 colocationId;
Datum minShardValue; /* a shard's typed min value datum */
Datum maxShardValue; /* a shard's typed max value datum */
const char *minShardValue;
const char *maxShardValue;
} Shardgroup;

View File

@ -23,6 +23,7 @@
/* Shard name and identifier related defines */
#define SHARD_NAME_SEPARATOR '_'
#define INVALID_SHARD_ID 0
#define INVALID_SHARDGROUP_ID 0
#define INVALID_PLACEMENT_ID 0

View File

@ -0,0 +1,11 @@
#ifndef CITUS_SHARDGROUP_H
#define CITUS_SHARDGROUP_H
#include "postgres.h"
#include "distributed/listutils.h"
extern List * ShardgroupForColocationId(uint32 colocationId);
#endif /*CITUS_SHARDGROUP_H */