Implement shardgroup drop during splits for the old shardgroups

Also did a refactor to treat the shardgroupid as an int64 throughout.
Before it has been addressed as int64/uint64/uint32, now it is only
treated as an int64.

Still miss some propagation of the creation of the new shardgroups on
all the workers to support query from any node.
backup/feature/shardgroup
Nils Dijk 2022-12-08 15:13:48 +01:00
parent e83c0071f4
commit 8e9acf0add
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
11 changed files with 126 additions and 59 deletions

View File

@ -183,6 +183,7 @@ typedef struct MetadataCacheData
Oid distPartitionColocationidIndexId;
Oid distShardLogicalRelidIndexId;
Oid distShardShardidIndexId;
Oid distShardgrouPkeyId;
Oid distPlacementShardidIndexId;
Oid distPlacementPlacementidIndexId;
Oid distColocationidIndexId;
@ -2873,6 +2874,17 @@ DistShardShardidIndexId(void)
}
/* return oid of pg_dist_shardgroup_pkey index */
Oid
DistShardgroupPkeyId(void)
{
CachedRelationLookup("pg_dist_shardgroup_pkey",
&MetadataCache.distShardgrouPkeyId);
return MetadataCache.distShardgrouPkeyId;
}
/* return oid of pg_dist_placement_shardid_index */
Oid
DistPlacementShardidIndexId(void)

View File

@ -142,6 +142,7 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
Oid distributionColumnType,
Oid distributionColumnCollation);
static char * ShardgroupDeleteCommandByColocationId(uint32 colocationId);
static const char * ShardgroupDeleteCommandByShardgroupId(int64 shardgroupid);
static char * ColocationGroupDeleteCommand(uint32 colocationId);
static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
@ -1312,7 +1313,7 @@ ShardListInsertCommand(List *shardIntervalList)
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
uint64 shardGroupId = shardInterval->shardGroupId;
int64 shardGroupId = shardInterval->shardGroupId;
Oid distributedRelationId = shardInterval->relationId;
char *qualifiedRelationName = generate_qualified_relation_name(
distributedRelationId);
@ -3263,7 +3264,7 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
}
PG_ENSURE_ARGNOTNULL(5, "shard group id");
uint64 shardGroupId = (uint64) PG_GETARG_INT64(5);
int64 shardGroupId = (uint64) PG_GETARG_INT64(5);
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);
@ -3864,7 +3865,23 @@ citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
EnsureSuperUser();
int colocationId = PG_GETARG_INT32(0);
if (!PG_ARGISNULL(0) && PG_ARGISNULL(1))
{
int64 shardgroupId = PG_GETARG_INT64(0);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}
DeleteShardgroupRow(shardgroupId);
PG_RETURN_VOID();
}
else if (!PG_ARGISNULL(1) && PG_ARGISNULL(0))
{
int colocationId = PG_GETARG_INT32(1);
if (!ShouldSkipMetadataChecks())
{
@ -3875,6 +3892,12 @@ citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS)
DeleteShardgroupForColocationIdLocally(colocationId);
PG_RETURN_VOID();
}
else
{
ereport(ERROR, (errmsg("expected exactly one argument from: "
"shardgroupid, colocationid")));
}
}
@ -4003,6 +4026,14 @@ SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId)
}
void
SyncDeleteShardgroupForShardgroupIdToNodes(int64 shardgroupId)
{
const char *command = ShardgroupDeleteCommandByShardgroupId(shardgroupId);
SendCommandToWorkersWithMetadataViaSuperUser(command);
}
/*
* SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers.
*/
@ -4033,6 +4064,20 @@ ShardgroupDeleteCommandByColocationId(uint32 colocationId)
}
static const char *
ShardgroupDeleteCommandByShardgroupId(int64 shardgroupid)
{
StringInfoData deleteCommand = { 0 };
initStringInfo(&deleteCommand);
appendStringInfo(&deleteCommand,
"SELECT pg_catalog.citus_internal_delete_shardgroup_metadata("
"shardgroupid => " INT64_FORMAT ")", shardgroupid);
return deleteCommand.data;
}
/*
* ColocationGroupDeleteCommand returns a command for deleting a colocation group.
*/

View File

@ -1673,7 +1673,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
void
InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId,
InsertShardGroupRow(int64 shardGroupId, uint32 colocationId,
text *shardMinValue, text *shardMaxValue)
{
Datum values[Natts_pg_dist_shardgroup];
@ -1720,7 +1720,7 @@ InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId,
*/
void
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue, uint64 *shardGroupId)
text *shardMinValue, text *shardMaxValue, int64 *shardGroupId)
{
Datum values[Natts_pg_dist_shard];
bool isNulls[Natts_pg_dist_shard];
@ -1750,7 +1750,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
if (shardGroupId)
{
values[Anum_pg_dist_shard_shardgroupid - 1] = UInt64GetDatum(*shardGroupId);
values[Anum_pg_dist_shard_shardgroupid - 1] = Int64GetDatum(*shardGroupId);
}
else
{
@ -1948,37 +1948,34 @@ DeletePartitionRow(Oid distributedRelationId)
void
DeleteShardgroupRow(uint32 shardgroupId)
DeleteShardgroupRow(int64 shardgroupId)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
ScanKeyData scanKey[1] = { 0 };
const bool indexOK = true;
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_shardgroupid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardgroupId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup,
DistShardShardidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
DistShardgroupPkeyId(), indexOK, NULL,
lengthof(scanKey), scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
ereport(ERROR, (errmsg("could not find valid entry for shardgroup " INT64_FORMAT,
shardgroupId)));
}
Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
Oid distributedRelationId = pgDistShardForm->logicalrelid;
simple_heap_delete(pgDistShardgroup, &heapTuple->t_self);
systable_endscan(scanDescriptor);
/* invalidate previous cache entry */
CitusInvalidateRelcacheByRelid(distributedRelationId);
/* TODO figure out what caches to invalidate */
/* CitusInvalidateRelcacheByRelid(distributedRelationId); */
CommandCounterIncrement();
table_close(pgDistShardgroup, NoLock);

View File

@ -196,7 +196,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
uint64 shardId = GetNextShardId();
/* we use shardid of the first shard in a shardgroup as the shardgroupid */
uint64 shardGroupId = shardId;
int64 shardGroupId = (int64) shardId;
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
@ -301,7 +301,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*newShardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, newShardIdPtr);
uint64 sourceShardGroupId = sourceShardInterval->shardGroupId;
int64 sourceShardGroupId = sourceShardInterval->shardGroupId;
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
@ -399,7 +399,7 @@ CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId,
/* get the next shard id */
uint64 shardId = GetNextShardId();
uint64 shardGroupId = shardId;
int64 shardGroupId = (int64) shardId;
StringInfoData shardgroupQuery = { 0 };
initStringInfo(&shardgroupQuery);

View File

@ -159,7 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
static uint64 GetNextShardIdForSplitChild(void);
static void AcquireNonblockingSplitLock(Oid relationId);
static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList);
static void DropShardgroupMetadata(uint32 shardgroupId);
static void DropShardgroupMetadata(int64 shardgroupId);
static void DropShardListMetadata(List *shardIntervalList);
/* Customize error message strings based on operation type */
@ -1042,7 +1042,6 @@ static List *
CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
List *splitPointsForShard)
{
int32 splitParentMaxValue = DatumGetInt32(sampleInterval->maxValue);
int32 currentSplitChildMinValue = DatumGetInt32(sampleInterval->minValue);
@ -1064,7 +1063,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
Datum splitPoint = (Datum) lfirst(splitPointCell);
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardIdForSplitChild();
shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitPoint;
@ -1081,7 +1080,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
*/
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = GetNextShardIdForSplitChild();
shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitParentMaxValue;
@ -1359,8 +1358,15 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
static void
DropShardgroupMetadata(uint32 shardgroupId)
DropShardgroupMetadata(int64 shardgroupId)
{
/* delete metadata from synced nodes */
/* TODO decide how shardgroup metadata should be synced*/
if (/* ShouldSyncTableMetadata(relationId) */ true)
{
SyncDeleteShardgroupForShardgroupIdToNodes(shardgroupId);
}
DeleteShardgroupRow(shardgroupId);
}

View File

@ -186,11 +186,12 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId);
uint64 shardgroupId = shardId;
int64 shardgroupId = (int64) shardId;
InsertShardGroupRow(shardgroupId, tableEntry->colocationId,
nullMinValue, nullMaxValue);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, &shardgroupId);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue,
&shardgroupId);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor);

View File

@ -1,8 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int)
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(
shardgroupid bigint DEFAULT NULL,
colocationid int DEFAULT NULL)
RETURNS void
LANGUAGE C
STRICT
CALLED ON NULL INPUT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS
'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int) IS
'deletes a shardgroups from pg_dist_shardgroup, expect exactly one argument to be passed as an indicator what shardgroup(s) to remove';

View File

@ -1,8 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int)
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(
shardgroupid bigint DEFAULT NULL,
colocationid int DEFAULT NULL)
RETURNS void
LANGUAGE C
STRICT
CALLED ON NULL INPUT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS
'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int) IS
'deletes a shardgroups from pg_dist_shardgroup, expect exactly one argument to be passed as an indicator what shardgroup(s) to remove';

View File

@ -252,6 +252,7 @@ extern Oid DistBackgroundTaskDependTaskIdIndexId(void);
extern Oid DistBackgroundTaskDependDependsOnIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardgroupPkeyId(void);
extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistColocationIndexId(void);

View File

@ -114,6 +114,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
Oid distributionColumType,
Oid distributionColumnCollation);
extern void SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId);
extern void SyncDeleteShardgroupForShardgroupIdToNodes(int64 shardgroupId);
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"

View File

@ -66,14 +66,14 @@ typedef struct ShardInterval
Datum minValue; /* a shard's typed min value datum */
Datum maxValue; /* a shard's typed max value datum */
uint64 shardId;
uint64 shardGroupId;
int64 shardGroupId;
int shardIndex;
} ShardInterval;
typedef struct Shardgroup
{
uint64 shardgroupId;
int64 shardgroupId;
uint32 colocationId;
Datum minShardValue; /* a shard's typed min value datum */
Datum maxShardValue; /* a shard's typed max value datum */
@ -315,11 +315,11 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId,
extern void InsertShardGroupRow(int64 shardGroupId, uint32 colocationId,
text *shardMinValue, text *shardMaxValue);
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue,
uint64 *shardGroupId);
int64 *shardGroupId);
extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);
@ -332,7 +332,7 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe
extern void UpdateDistributionColumn(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardgroupRow(uint32 shardgroupId);
extern void DeleteShardgroupRow(int64 shardgroupId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);