basic places for hash distribution support shardgroup

feature/shardgroup
Nils Dijk 2023-10-26 11:06:45 +00:00
parent dbdde111c1
commit f4fccf0a78
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
25 changed files with 327 additions and 25 deletions

View File

@ -1459,7 +1459,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
text *shardMinValue = NULL; text *shardMinValue = NULL;
text *shardMaxValue = NULL; text *shardMaxValue = NULL;
InsertShardRow(citusLocalTableId, shardId, shardStorageType, InsertShardRow(citusLocalTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue); shardMinValue, shardMaxValue, InvalidShardgroupID);
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());

View File

@ -146,7 +146,8 @@ static void ConvertCitusLocalTableToTableType(Oid relationId,
DistributedTableParams * DistributedTableParams *
distributedTableParams); distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount, static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty); uint32 colocationId, Oid colocatedTableId,
bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId); uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
@ -1289,7 +1290,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{ {
/* create shards for hash distributed table */ /* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId, colocationId, colocatedTableId,
localTableEmpty); localTableEmpty);
} }
else if (tableType == REFERENCE_TABLE) else if (tableType == REFERENCE_TABLE)
@ -1880,7 +1881,7 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable
* CreateHashDistributedTableShards creates shards of given hash distributed table. * CreateHashDistributedTableShards creates shards of given hash distributed table.
*/ */
static void static void
CreateHashDistributedTableShards(Oid relationId, int shardCount, CreateHashDistributedTableShards(Oid relationId, int shardCount, uint32 colocationId,
Oid colocatedTableId, bool localTableEmpty) Oid colocatedTableId, bool localTableEmpty)
{ {
bool useExclusiveConnection = false; bool useExclusiveConnection = false;
@ -1919,7 +1920,10 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
* tables which will not be part of an existing colocation group. Therefore, * tables which will not be part of an existing colocation group. Therefore,
* we can directly use ShardReplicationFactor global variable here. * we can directly use ShardReplicationFactor global variable here.
*/ */
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, CreateShardsWithRoundRobinPolicy(relationId,
colocationId,
shardCount,
ShardReplicationFactor,
useExclusiveConnection); useExclusiveConnection);
} }
} }

View File

@ -155,6 +155,7 @@ typedef struct MetadataCacheData
{ {
ExtensionCreatedState extensionCreatedState; ExtensionCreatedState extensionCreatedState;
Oid distShardRelationId; Oid distShardRelationId;
Oid distShardgroupRelationId;
Oid distPlacementRelationId; Oid distPlacementRelationId;
Oid distBackgroundJobRelationId; Oid distBackgroundJobRelationId;
Oid distBackgroundJobPKeyIndexId; Oid distBackgroundJobPKeyIndexId;
@ -2594,6 +2595,16 @@ DistShardRelationId(void)
} }
Oid
DistShardgroupRelationId(void)
{
CachedRelationLookup("pg_dist_shardgroup",
&MetadataCache.distShardgroupRelationId);
return MetadataCache.distShardgroupRelationId;
}
/* return oid of pg_dist_placement relation */ /* return oid of pg_dist_placement relation */
Oid Oid
DistPlacementRelationId(void) DistPlacementRelationId(void)
@ -5378,6 +5389,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
ShardgroupID shardgoupdId = DatumGetShardgroupID(datumArray[Anum_pg_dist_shard_shardgroupid - 1]);
bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1]; bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1]; bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];
@ -5414,6 +5426,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
shardInterval->minValue = minValue; shardInterval->minValue = minValue;
shardInterval->maxValue = maxValue; shardInterval->maxValue = maxValue;
shardInterval->shardId = shardId; shardInterval->shardId = shardId;
shardInterval->shardgroupId = shardgoupdId;
return shardInterval; return shardInterval;
} }

View File

@ -84,6 +84,7 @@
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardgroup.h"
#include "distributed/tenant_schema_metadata.h" #include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h" #include "distributed/utils/array_type.h"
#include "distributed/utils/function.h" #include "distributed/utils/function.h"
@ -149,6 +150,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
int replicationFactor, int replicationFactor,
Oid distributionColumnType, Oid distributionColumnType,
Oid distributionColumnCollation); Oid distributionColumnCollation);
static char * ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount,
uint32 colocationId);
static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * ColocationGroupDeleteCommand(uint32 colocationId);
static char * RemoteSchemaIdExpressionById(Oid schemaId); static char * RemoteSchemaIdExpressionById(Oid schemaId);
static char * RemoteSchemaIdExpressionByName(char *schemaName); static char * RemoteSchemaIdExpressionByName(char *schemaName);
@ -171,6 +174,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
@ -1249,7 +1253,7 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo(); StringInfo insertShardCommand = makeStringInfo();
appendStringInfo(insertShardCommand, appendStringInfo(insertShardCommand,
"WITH shard_data(relationname, shardid, storagetype, " "WITH shard_data(relationname, shardid, storagetype, "
"shardminvalue, shardmaxvalue) AS (VALUES "); "shardminvalue, shardmaxvalue, shardgroupid) AS (VALUES ");
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
@ -1281,12 +1285,13 @@ ShardListInsertCommand(List *shardIntervalList)
} }
appendStringInfo(insertShardCommand, appendStringInfo(insertShardCommand,
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)", "(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld::bigint)",
quote_literal_cstr(qualifiedRelationName), quote_literal_cstr(qualifiedRelationName),
shardId, shardId,
shardInterval->storageType, shardInterval->storageType,
minHashToken->data, minHashToken->data,
maxHashToken->data); maxHashToken->data,
shardInterval->shardgroupId);
if (llast(shardIntervalList) != shardInterval) if (llast(shardIntervalList) != shardInterval)
{ {
@ -1298,7 +1303,7 @@ ShardListInsertCommand(List *shardIntervalList)
appendStringInfo(insertShardCommand, appendStringInfo(insertShardCommand,
"SELECT citus_internal_add_shard_metadata(relationname, shardid, " "SELECT citus_internal_add_shard_metadata(relationname, shardid, "
"storagetype, shardminvalue, shardmaxvalue) " "storagetype, shardminvalue, shardmaxvalue, shardgroupid) "
"FROM shard_data;"); "FROM shard_data;");
/* /*
@ -3341,6 +3346,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue = PG_GETARG_TEXT_P(4); shardMaxValue = PG_GETARG_TEXT_P(4);
} }
PG_ENSURE_ARGNOTNULL(5, "shardgroup id");
ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(5);
/* only owner of the table (or superuser) is allowed to add the Citus metadata */ /* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId); EnsureTableOwner(relationId);
@ -3361,7 +3369,31 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue); shardMaxValue);
} }
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, shardgroupID);
PG_RETURN_VOID();
}
Datum
citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
PG_ENSURE_ARGNOTNULL(0, "shardgroupid");
ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(0);
PG_ENSURE_ARGNOTNULL(1, "colocationid");
uint32 colocationId = PG_GETARG_UINT32(1);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}
InsertShardgroupRow(shardgroupID, colocationId);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -4091,6 +4123,50 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio
} }
void
SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId)
{
char *command = ShardgroupsCreateCommand(shardgroupIDs, shardCount, colocationId);
/*
* We require superuser for all pg_dist_shardgroups operations because we have
* no reasonable way of restricting access.
*/
SendCommandToWorkersWithMetadataViaSuperUser(command);
}
static char *
ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId)
{
StringInfoData buf = {0};
initStringInfo(&buf);
/* now add shards to insertShardCommand */
appendStringInfo(&buf,
"WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES ");
for (int i=0; i<shardCount; i++)
{
if (i > 0)
{
appendStringInfo(&buf, ", ");
}
ShardgroupID shardgroupId = shardgroupIDs[i];
appendStringInfo(&buf, "(%ld::bigint, %u)",
shardgroupId,
colocationId);
}
appendStringInfo(&buf, ") ");
appendStringInfo(&buf,
"SELECT pg_catalog.citus_internal_add_shardgroup_metadata(shardgroupid, "
"colocationid) FROM shardgroup_data;");
return buf.data;
}
/* /*
* RemoteTypeIdExpression returns an expression in text form that can * RemoteTypeIdExpression returns an expression in text form that can
* be used to obtain the OID of a type on a different node when included * be used to obtain the OID of a type on a different node when included

View File

@ -70,11 +70,13 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_placement.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shardgroup.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
#include "distributed/shardgroup.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h" #include "distributed/utils/array_type.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -1350,6 +1352,7 @@ CopyShardInterval(ShardInterval *srcInterval)
destInterval->maxValueExists = srcInterval->maxValueExists; destInterval->maxValueExists = srcInterval->maxValueExists;
destInterval->shardId = srcInterval->shardId; destInterval->shardId = srcInterval->shardId;
destInterval->shardIndex = srcInterval->shardIndex; destInterval->shardIndex = srcInterval->shardIndex;
destInterval->shardgroupId = srcInterval->shardgroupId;
destInterval->minValue = 0; destInterval->minValue = 0;
if (destInterval->minValueExists) if (destInterval->minValueExists)
@ -1796,6 +1799,32 @@ IsDummyPlacement(ShardPlacement *taskPlacement)
} }
void
InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId)
{
Datum values[Natts_pg_dist_shardgroup];
bool isNulls[Natts_pg_dist_shardgroup];
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_shardgroup_shardgroupid - 1] = ShardgroupIDGetDatum(shardgroupId);
values[Anum_pg_dist_shardgroup_colocationid - 1] = Int32GetDatum(colocationId);
/* open shard relation and insert new tuple */
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistShardgroup, heapTuple);
CommandCounterIncrement();
table_close(pgDistShardgroup, NoLock);
}
/* /*
* InsertShardRow opens the shard system catalog, and inserts a new row with the * InsertShardRow opens the shard system catalog, and inserts a new row with the
* given values into that system catalog. Note that we allow the user to pass in * given values into that system catalog. Note that we allow the user to pass in
@ -1803,7 +1832,7 @@ IsDummyPlacement(ShardPlacement *taskPlacement)
*/ */
void void
InsertShardRow(Oid relationId, uint64 shardId, char storageType, InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue) text *shardMinValue, text *shardMaxValue, ShardgroupID shardgroupId)
{ {
Datum values[Natts_pg_dist_shard]; Datum values[Natts_pg_dist_shard];
bool isNulls[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard];
@ -1831,6 +1860,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
} }
values[Anum_pg_dist_shard_shardgroupid - 1] = ShardgroupIDGetDatum(shardgroupId);
/* open shard relation and insert new tuple */ /* open shard relation and insert new tuple */
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);

View File

@ -50,6 +50,7 @@
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardgroup.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -80,8 +81,9 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
* worker nodes. * worker nodes.
*/ */
void void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, CreateShardsWithRoundRobinPolicy(Oid distributedTableId, uint32 colocationId,
int32 replicationFactor, bool useExclusiveConnections) int32 shardCount, int32 replicationFactor,
bool useExclusiveConnections)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
@ -163,6 +165,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId); char shardStorageType = ShardStorageType(distributedTableId);
ShardgroupID *shardgroupIDs = palloc0(sizeof(ShardgroupID) * shardCount);
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{ {
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
@ -184,8 +187,12 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken); text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken);
ShardgroupID shardgroupId = GetNextShardgroupId();
InsertShardgroupRow(shardgroupId, colocationId);
shardgroupIDs[shardIndex] = shardgroupId;
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType, InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText, shardgroupId);
InsertShardPlacementRows(distributedTableId, InsertShardPlacementRows(distributedTableId,
*shardIdPtr, *shardIdPtr,
@ -194,6 +201,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
replicationFactor); replicationFactor);
} }
// TODO guess we should check if metadatasync is on
SyncNewShardgoupsToNodes(shardgroupIDs, shardCount, colocationId);
/* /*
* load shard placements for the shard at once after all placement insertions * load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after * finished. This prevents MetadataCache from rebuilding unnecessarily after
@ -283,7 +293,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
sourceShardId); sourceShardId);
InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType, InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType,
shardMinValueText, shardMaxValueText); shardMinValueText, shardMaxValueText,
sourceShardInterval->shardgroupId);
ShardPlacement *sourcePlacement = NULL; ShardPlacement *sourcePlacement = NULL;
foreach_ptr(sourcePlacement, sourceShardPlacementList) foreach_ptr(sourcePlacement, sourceShardPlacementList)
@ -366,7 +377,7 @@ CreateReferenceTableShard(Oid distributedTableId)
uint64 shardId = GetNextShardId(); uint64 shardId = GetNextShardId();
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue); shardMaxValue, InvalidShardgroupID);
InsertShardPlacementRows(distributedTableId, InsertShardPlacementRows(distributedTableId,
shardId, shardId,
@ -421,8 +432,11 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
text *minHashTokenText = NULL; text *minHashTokenText = NULL;
text *maxHashTokenText = NULL; text *maxHashTokenText = NULL;
uint64 shardId = GetNextShardId(); uint64 shardId = GetNextShardId();
ShardgroupID shardgroupId = GetNextShardgroupId();
InsertShardgroupRow(shardgroupId, colocationId);
InsertShardRow(relationId, shardId, shardStorageType, InsertShardRow(relationId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText, shardgroupId);
int replicationFactor = 1; int replicationFactor = 1;
InsertShardPlacementRows(relationId, InsertShardPlacementRows(relationId,

View File

@ -253,6 +253,28 @@ GetNextShardId()
} }
ShardgroupID
GetNextShardgroupId()
{
text *sequenceName = cstring_to_text(SHARDGROUPID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
Datum shardgroupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
ShardgroupID shardgroupId = DatumGetShardgroupID(shardgroupIdDatum);
return shardgroupId;
}
/* /*
* master_get_new_placementid is a user facing wrapper function around * master_get_new_placementid is a user facing wrapper function around
* GetNextPlacementId() which allocates and returns a unique placement id for the * GetNextPlacementId() which allocates and returns a unique placement id for the

View File

@ -1177,7 +1177,8 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
shardInterval->shardId, shardInterval->shardId,
shardInterval->storageType, shardInterval->storageType,
IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->minValue)),
IntegerToText(DatumGetInt32(shardInterval->maxValue))); IntegerToText(DatumGetInt32(shardInterval->maxValue)),
InvalidShardgroupID);
InsertShardPlacementRow( InsertShardPlacementRow(
shardInterval->shardId, shardInterval->shardId,

View File

@ -193,7 +193,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++; candidateNodeIndex++;
} }
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, InvalidShardgroupID);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor); ShardReplicationFactor);

View File

@ -3,3 +3,4 @@
#include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_internal_database_command/12.2-1.sql"
#include "udfs/citus_add_rebalance_strategy/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql"
#include "feature/shardgroup/upgrade.sql"

View File

@ -3,3 +3,4 @@
DROP FUNCTION pg_catalog.citus_internal_database_command(text); DROP FUNCTION pg_catalog.citus_internal_database_command(text);
#include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql"
#include "../feature/shardgroup/downgrade.sql"

View File

@ -0,0 +1,9 @@
DROP TABLE pg_dist_shardgroup;
-- TODO probably needs to drop table and recreate original one to make sure we can upgrade
-- again later and _not_ have troubled with the internal ordering of the columns.
ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid;
DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer);
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint);
#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql"

View File

@ -0,0 +1,15 @@
CREATE TABLE citus.pg_dist_shardgroup (
shardgroupid bigint PRIMARY KEY,
colocationid integer NOT NULL
);
ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog;
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardgroupid bigint NOT NULL;
CREATE SEQUENCE citus.pg_dist_shardgroupid_seq NO CYCLE;
ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog;
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
#include "../../udfs/citus_internal_add_shard_metadata/12.2-1.sql"
#include "../../udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql"

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text, shardgoupdid bigint
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS
'Inserts into pg_dist_shard with user checks';

View File

@ -1,10 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint, relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text, storage_type "char", shard_min_value text,
shard_max_value text shard_max_value text, shardgoupdid bigint
) )
RETURNS void RETURNS void
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME'; AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS
'Inserts into pg_dist_shard with user checks'; 'Inserts into pg_dist_shard with user checks';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(
shardgroupid bigint, colocationid integer)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer) IS
'Inserts into pg_dist_shardgroup with user checks';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(
shardgroupid bigint, colocationid integer)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer) IS
'Inserts into pg_dist_shardgroup with user checks';

View File

@ -231,7 +231,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
text *maxInfoText = cstring_to_text(maxInfo->data); text *maxInfoText = cstring_to_text(maxInfo->data);
InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText, InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText,
maxInfoText); maxInfoText, InvalidShardgroupID);
PG_RETURN_INT64(newShardId); PG_RETURN_INT64(newShardId);
} }

View File

@ -23,6 +23,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/shardgroup.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
/* /*
@ -55,6 +56,7 @@
#define TRANSFER_MODE_BLOCK_WRITES 'b' #define TRANSFER_MODE_BLOCK_WRITES 'b'
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
#define SHARDGROUPID_SEQUENCE_NAME "pg_dist_shardgroupid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq" #define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq"
/* Remote call definitions to help with data staging and deletion */ /* Remote call definitions to help with data staging and deletion */
@ -222,6 +224,7 @@ extern bool IsCoordinator(void);
/* Function declarations local to the distributed module */ /* Function declarations local to the distributed module */
extern uint64 GetNextShardId(void); extern uint64 GetNextShardId(void);
extern ShardgroupID GetNextShardgroupId(void);
extern uint64 GetNextPlacementId(void); extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk); extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId, extern List * GetFullTableCreationCommands(Oid relationId,
@ -257,7 +260,9 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex, List *workerNodeList, int workerStartIndex,
int replicationFactor); int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId,
uint32 colocationId,
int32 shardCount,
int32 replicationFactor, int32 replicationFactor,
bool useExclusiveConnections); bool useExclusiveConnections);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,

View File

@ -240,6 +240,7 @@ extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void); extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistPartitionRelationId(void); extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistShardgroupRelationId(void);
extern Oid DistPlacementRelationId(void); extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void); extern Oid DistNodeRelationId(void);
extern Oid DistBackgroundJobRelationId(void); extern Oid DistBackgroundJobRelationId(void);

View File

@ -138,6 +138,8 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
int replicationFactor, int replicationFactor,
Oid distributionColumType, Oid distributionColumType,
Oid distributionColumnCollation); Oid distributionColumnCollation);
extern void SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount,
uint32 colocationId);
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId); extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId);
extern char * TenantSchemaDeleteCommand(char *schemaName); extern char * TenantSchemaDeleteCommand(char *schemaName);

View File

@ -28,6 +28,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/shardgroup.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -68,6 +69,7 @@ typedef struct ShardInterval
Datum maxValue; /* a shard's typed max value datum */ Datum maxValue; /* a shard's typed max value datum */
uint64 shardId; uint64 shardId;
int shardIndex; int shardIndex;
ShardgroupID shardgroupId;
} ShardInterval; } ShardInterval;
@ -350,7 +352,9 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue); text *shardMinValue, text *shardMaxValue,
ShardgroupID shardgroupId);
extern void InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId, extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId,
uint64 placementId, uint64 placementId,

View File

@ -16,6 +16,8 @@
#ifndef PG_DIST_SHARD_H #ifndef PG_DIST_SHARD_H
#define PG_DIST_SHARD_H #define PG_DIST_SHARD_H
#include "distributed/shardgroup.h"
/* ---------------- /* ----------------
* pg_dist_shard definition. * pg_dist_shard definition.
* ---------------- * ----------------
@ -30,6 +32,7 @@ typedef struct FormData_pg_dist_shard
text shardminvalue; /* partition key's minimum value in shard */ text shardminvalue; /* partition key's minimum value in shard */
text shardmaxvalue; /* partition key's maximum value in shard */ text shardmaxvalue; /* partition key's maximum value in shard */
#endif #endif
ShardgroupID shardgroupid;
} FormData_pg_dist_shard; } FormData_pg_dist_shard;
/* ---------------- /* ----------------
@ -43,13 +46,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
* compiler constants for pg_dist_shards * compiler constants for pg_dist_shards
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_shard 6 #define Natts_pg_dist_shard 7
#define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_logicalrelid 1
#define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardid 2
#define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardstorage 3
#define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardalias_DROPPED 4
#define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardminvalue 5
#define Anum_pg_dist_shard_shardmaxvalue 6 #define Anum_pg_dist_shard_shardmaxvalue 6
#define Anum_pg_dist_shard_shardgroupid 7
/* /*
* Valid values for shard storage types include foreign table, (standard) table * Valid values for shard storage types include foreign table, (standard) table

View File

@ -0,0 +1,41 @@
/*-------------------------------------------------------------------------
*
* pg_dist_shardgroup.h
* definition of the "shardgroup" relation (pg_dist_shardgroup).
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_SHARDGROUP_H
#define PG_DIST_SHARDGROUP_H
#include "distributed/shardgroup.h"
/* ----------------
* pg_dist_shardgroup definition.
* ----------------
*/
typedef struct FormData_pg_dist_shardgroup
{
ShardgroupID shardgroupid;
uint32 colocationid;
} FormData_pg_dist_shardgroup;
/* ----------------
* Form_pg_dist_shardgroup corresponds to a pointer to a tuple with
* the format of pg_dist_shardgroup relation.
* ----------------
*/
typedef FormData_pg_dist_shardgroup *Form_pg_dist_shardgroup;
/* ----------------
* compiler constants for pg_dist_shardgroup
* ----------------
*/
#define Natts_pg_dist_shardgroup 2
#define Anum_pg_dist_shardgroup_shardgroupid 1
#define Anum_pg_dist_shardgroup_colocationid 2
#endif /* PG_DIST_SHARDGROUP_H */

View File

@ -0,0 +1,34 @@
/*-------------------------------------------------------------------------
*
* shardgroup.h
* Shardgroups are a logical unit of colocated shards from different
* tables belonging to the same colocation group. When shards belong
* to the same shardgroup they move as one logical unit during
* shardmoves, which are better described as shardgroupmoves.
*
* This header defines functions operating on shardgroups as well as
* helpers to work with ShardgroupID's that identify shardgroups.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARDGROUP_H
#define SHARDGROUP_H
#include "postgres.h"
#include "fmgr.h"
typedef int64 ShardgroupID;
#define InvalidShardgroupID ((ShardgroupID) 0)
#define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID)
// helper functions to get a typed ShardgroupID to and from a Datum
#define DatumGetShardgroupID(datum) ((ShardgroupID) DatumGetInt64((datum)))
#define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64)(shardgroupID)))
#define PG_GETARG_SHARDGROUPID(n) DatumGetShardgroupID(PG_GETARG_DATUM(n))
#endif /* SHARDGROUP_H */