diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index c2302a0dd..a98a291ed 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -140,6 +140,7 @@ typedef struct MetadataCacheData { bool extensionLoaded; Oid distShardRelationId; + Oid distShardgroupRelationId; Oid distPlacementRelationId; Oid distBackgroundJobRelationId; Oid distBackgroundJobPKeyIndexId; @@ -2497,6 +2498,17 @@ DistShardRelationId(void) } +/* return oid of pg_dist_shardgroup relation */ +Oid +DistShardgroupRelationId(void) +{ + CachedRelationLookup("pg_dist_shardgroup", + &MetadataCache.distShardgroupRelationId); + + return MetadataCache.distShardgroupRelationId; +} + + /* return oid of pg_dist_placement relation */ Oid DistPlacementRelationId(void) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e867b0cf9..9951bd1cf 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -52,6 +52,7 @@ #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/pg_dist_shardgroup.h" #include "distributed/pg_dist_placement.h" #include "distributed/reference_table_utils.h" #include "distributed/relay_utility.h" @@ -1671,6 +1672,47 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) } +void +InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId, + text *shardMinValue, text *shardMaxValue) +{ + Datum values[Natts_pg_dist_shardgroup]; + bool isNulls[Natts_pg_dist_shardgroup]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_shardgroup_shardgroupid - 1] = Int64GetDatum(shardGroupId); + values[Anum_pg_dist_shardgroup_colocationid - 1] = Int32GetDatum(colocationId); + + /* check if shard min/max values are null */ + if (shardMinValue != NULL && shardMaxValue != NULL) + { + values[Anum_pg_dist_shardgroup_shardminvalue - 1] = + PointerGetDatum(shardMinValue); + values[Anum_pg_dist_shardgroup_shardmaxvalue - 1] = + PointerGetDatum(shardMaxValue); + } + else + { + isNulls[Anum_pg_dist_shardgroup_shardminvalue - 1] = true; + isNulls[Anum_pg_dist_shardgroup_shardmaxvalue - 1] = true; + } + + /* open shard relation and insert new tuple */ + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + CatalogTupleInsert(pgDistShardgroup, heapTuple); + + CommandCounterIncrement(); + table_close(pgDistShardgroup, NoLock); +} + + /* * InsertShardRow opens the shard system catalog, and inserts a new row with the * given values into that system catalog. Note that we allow the user to pass in diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index ce962a1d8..ec9e41cbf 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -177,6 +177,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); uint64 shardId = GetNextShardId(); + /* we use shardid of the first shard in a shardgroup as the shardgroupid */ + uint64 shardGroupId = shardId; + /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) { @@ -187,8 +190,11 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); + InsertShardGroupRow(shardGroupId, cacheEntry->colocationId, + minHashTokenText, maxHashTokenText); + InsertShardRow(distributedTableId, shardId, shardStorageType, - minHashTokenText, maxHashTokenText, NULL); + minHashTokenText, maxHashTokenText, &shardGroupId); List *currentInsertedShardPlacements = InsertShardPlacementRows( distributedTableId, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e7cb2514d..cacbe2db6 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -229,6 +229,7 @@ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); +extern Oid DistShardgroupRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); extern Oid DistBackgroundJobRelationId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 9868d0392..6ee02ae95 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -306,6 +306,8 @@ extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ +extern void InsertShardGroupRow(uint64 shardGroupId, uint32 colocationId, + text *shardMinValue, text *shardMaxValue); extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue, uint64 *shardGroupId); diff --git a/src/include/distributed/pg_dist_shardgroup.h b/src/include/distributed/pg_dist_shardgroup.h new file mode 100644 index 000000000..b92c93736 --- /dev/null +++ b/src/include/distributed/pg_dist_shardgroup.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_shardgroup.h + * definition of the "shardgroup" relation (pg_dist_shardgroup). + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_SHARDGROUP_H +#define PG_DIST_SHARDGROUP_H + +#include "postgres.h" + +/* ---------------- + * pg_dist_shardgroup definition. + * ---------------- + */ +typedef struct FormData_pg_dist_shardgroup +{ + int64 shardgroupid; + int32 colocationid; +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text shardminvalue; /* partition key's minimum value in shard */ + text shardmaxvalue; /* partition key's maximum value in shard */ +#endif +} FormData_pg_dist_shardgroup; + +/* ---------------- + * Form_pg_dist_shardgroup corresponds to a pointer to a tuple with + * the format of pg_dist_shardgroup relation. + * ---------------- + */ +typedef FormData_pg_dist_shardgroup *Form_pg_dist_shardgroup; + +/* ---------------- + * compiler constants for pg_dist_shardgroup + * ---------------- + */ +#define Natts_pg_dist_shardgroup 4 +#define Anum_pg_dist_shardgroup_shardgroupid 1 +#define Anum_pg_dist_shardgroup_colocationid 2 +#define Anum_pg_dist_shardgroup_shardminvalue 3 +#define Anum_pg_dist_shardgroup_shardmaxvalue 4 + +#endif /* PG_DIST_SHARD_H */