create new shardgroups for new distributed table

backup/feature/shardgroup
Nils Dijk 2022-11-23 14:07:23 +01:00
parent c276ae9587
commit dfb0b31a84
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
6 changed files with 111 additions and 1 deletions

View File

@ -140,6 +140,7 @@ typedef struct MetadataCacheData
{
bool extensionLoaded;
Oid distShardRelationId;
Oid distShardgroupRelationId;
Oid distPlacementRelationId;
Oid distBackgroundJobRelationId;
Oid distBackgroundJobPKeyIndexId;
@ -2497,6 +2498,17 @@ DistShardRelationId(void)
}
/* return oid of pg_dist_shardgroup relation */
Oid
DistShardgroupRelationId(void)
{
CachedRelationLookup("pg_dist_shardgroup",
&MetadataCache.distShardgroupRelationId);
return MetadataCache.distShardgroupRelationId;
}
/* return oid of pg_dist_placement relation */
Oid
DistPlacementRelationId(void)

View File

@ -52,6 +52,7 @@
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shardgroup.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relay_utility.h"
@ -1671,6 +1672,47 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
}
void
InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId,
text *shardMinValue, text *shardMaxValue)
{
Datum values[Natts_pg_dist_shardgroup];
bool isNulls[Natts_pg_dist_shardgroup];
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_shardgroup_shardgroupid - 1] = Int64GetDatum(shardGroupId);
values[Anum_pg_dist_shardgroup_colocationid - 1] = Int32GetDatum(colocationId);
/* check if shard min/max values are null */
if (shardMinValue != NULL && shardMaxValue != NULL)
{
values[Anum_pg_dist_shardgroup_shardminvalue - 1] =
PointerGetDatum(shardMinValue);
values[Anum_pg_dist_shardgroup_shardmaxvalue - 1] =
PointerGetDatum(shardMaxValue);
}
else
{
isNulls[Anum_pg_dist_shardgroup_shardminvalue - 1] = true;
isNulls[Anum_pg_dist_shardgroup_shardmaxvalue - 1] = true;
}
/* open shard relation and insert new tuple */
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistShardgroup, heapTuple);
CommandCounterIncrement();
table_close(pgDistShardgroup, NoLock);
}
/*
* InsertShardRow opens the shard system catalog, and inserts a new row with the
* given values into that system catalog. Note that we allow the user to pass in

View File

@ -177,6 +177,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
/* we use shardid of the first shard in a shardgroup as the shardgroupid */
uint64 shardGroupId = shardId;
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
{
@ -187,8 +190,11 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardGroupRow(shardGroupId, cacheEntry->colocationId,
minHashTokenText, maxHashTokenText);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText, NULL);
minHashTokenText, maxHashTokenText, &shardGroupId);
List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,

View File

@ -229,6 +229,7 @@ extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistShardgroupRelationId(void);
extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistBackgroundJobRelationId(void);

View File

@ -306,6 +306,8 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId,
text *shardMinValue, text *shardMaxValue);
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue,
uint64 *shardGroupId);

View File

@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* pg_dist_shardgroup.h
* definition of the "shardgroup" relation (pg_dist_shardgroup).
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_SHARDGROUP_H
#define PG_DIST_SHARDGROUP_H
#include "postgres.h"
/* ----------------
* pg_dist_shardgroup definition.
* ----------------
*/
typedef struct FormData_pg_dist_shardgroup
{
int64 shardgroupid;
int32 colocationid;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text shardminvalue; /* partition key's minimum value in shard */
text shardmaxvalue; /* partition key's maximum value in shard */
#endif
} FormData_pg_dist_shardgroup;
/* ----------------
* Form_pg_dist_shardgroup corresponds to a pointer to a tuple with
* the format of pg_dist_shardgroup relation.
* ----------------
*/
typedef FormData_pg_dist_shardgroup *Form_pg_dist_shardgroup;
/* ----------------
* compiler constants for pg_dist_shardgroup
* ----------------
*/
#define Natts_pg_dist_shardgroup 4
#define Anum_pg_dist_shardgroup_shardgroupid 1
#define Anum_pg_dist_shardgroup_colocationid 2
#define Anum_pg_dist_shardgroup_shardminvalue 3
#define Anum_pg_dist_shardgroup_shardmaxvalue 4
#endif /* PG_DIST_SHARD_H */