diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index d95cdd353..af90b21c5 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1459,7 +1459,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, text *shardMinValue = NULL; text *shardMaxValue = NULL; InsertShardRow(citusLocalTableId, shardId, shardStorageType, - shardMinValue, shardMaxValue); + shardMinValue, shardMaxValue, InvalidShardgroupID); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9f3975a1e..a3d5dcce2 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -146,7 +146,8 @@ static void ConvertCitusLocalTableToTableType(Oid relationId, DistributedTableParams * distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty); + uint32 colocationId, Oid colocatedTableId, + bool localTableEmpty); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId); static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, @@ -1289,7 +1290,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, { /* create shards for hash distributed table */ CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, - colocatedTableId, + colocationId, colocatedTableId, localTableEmpty); } else if (tableType == REFERENCE_TABLE) @@ -1880,7 +1881,7 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable * CreateHashDistributedTableShards creates shards of given hash distributed table. */ static void -CreateHashDistributedTableShards(Oid relationId, int shardCount, +CreateHashDistributedTableShards(Oid relationId, int shardCount, uint32 colocationId, Oid colocatedTableId, bool localTableEmpty) { bool useExclusiveConnection = false; @@ -1919,7 +1920,10 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, * tables which will not be part of an existing colocation group. Therefore, * we can directly use ShardReplicationFactor global variable here. */ - CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, + CreateShardsWithRoundRobinPolicy(relationId, + colocationId, + shardCount, + ShardReplicationFactor, useExclusiveConnection); } } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 1b2fa229f..3a90bed97 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -155,6 +155,7 @@ typedef struct MetadataCacheData { ExtensionCreatedState extensionCreatedState; Oid distShardRelationId; + Oid distShardgroupRelationId; Oid distPlacementRelationId; Oid distBackgroundJobRelationId; Oid distBackgroundJobPKeyIndexId; @@ -2594,6 +2595,16 @@ DistShardRelationId(void) } +Oid +DistShardgroupRelationId(void) +{ + CachedRelationLookup("pg_dist_shardgroup", + &MetadataCache.distShardgroupRelationId); + + return MetadataCache.distShardgroupRelationId; +} + + /* return oid of pg_dist_placement relation */ Oid DistPlacementRelationId(void) @@ -5378,6 +5389,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; + ShardgroupID shardgoupdId = DatumGetShardgroupID(datumArray[Anum_pg_dist_shard_shardgroupid - 1]); bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1]; bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1]; @@ -5414,6 +5426,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, shardInterval->minValue = minValue; shardInterval->maxValue = maxValue; shardInterval->shardId = shardId; + shardInterval->shardgroupId = shardgoupdId; return shardInterval; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f0be1995b..32be9b8af 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -84,6 +84,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/shardgroup.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/utils/array_type.h" #include "distributed/utils/function.h" @@ -149,6 +150,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation); +static char * ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, + uint32 colocationId); static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteSchemaIdExpressionById(Oid schemaId); static char * RemoteSchemaIdExpressionByName(char *schemaName); @@ -171,6 +174,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); +PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); @@ -1249,7 +1253,7 @@ ShardListInsertCommand(List *shardIntervalList) StringInfo insertShardCommand = makeStringInfo(); appendStringInfo(insertShardCommand, "WITH shard_data(relationname, shardid, storagetype, " - "shardminvalue, shardmaxvalue) AS (VALUES "); + "shardminvalue, shardmaxvalue, shardgroupid) AS (VALUES "); foreach_ptr(shardInterval, shardIntervalList) { @@ -1281,12 +1285,13 @@ ShardListInsertCommand(List *shardIntervalList) } appendStringInfo(insertShardCommand, - "(%s::regclass, %ld, '%c'::\"char\", %s, %s)", + "(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld::bigint)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, minHashToken->data, - maxHashToken->data); + maxHashToken->data, + shardInterval->shardgroupId); if (llast(shardIntervalList) != shardInterval) { @@ -1298,7 +1303,7 @@ ShardListInsertCommand(List *shardIntervalList) appendStringInfo(insertShardCommand, "SELECT citus_internal_add_shard_metadata(relationname, shardid, " - "storagetype, shardminvalue, shardmaxvalue) " + "storagetype, shardminvalue, shardmaxvalue, shardgroupid) " "FROM shard_data;"); /* @@ -3341,6 +3346,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) shardMaxValue = PG_GETARG_TEXT_P(4); } + PG_ENSURE_ARGNOTNULL(5, "shardgroup id"); + ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(5); + /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); @@ -3361,7 +3369,31 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); + InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, shardgroupID); + + PG_RETURN_VOID(); +} + + +Datum +citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + PG_ENSURE_ARGNOTNULL(0, "shardgroupid"); + ShardgroupID shardgroupID = PG_GETARG_SHARDGROUPID(0); + + PG_ENSURE_ARGNOTNULL(1, "colocationid"); + uint32 colocationId = PG_GETARG_UINT32(1); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + InsertShardgroupRow(shardgroupID, colocationId); PG_RETURN_VOID(); } @@ -4091,6 +4123,50 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio } +void +SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId) +{ + char *command = ShardgroupsCreateCommand(shardgroupIDs, shardCount, colocationId); + + /* + * We require superuser for all pg_dist_shardgroups operations because we have + * no reasonable way of restricting access. + */ + SendCommandToWorkersWithMetadataViaSuperUser(command); +} + + +static char * +ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId) +{ + StringInfoData buf = {0}; + initStringInfo(&buf); + /* now add shards to insertShardCommand */ + appendStringInfo(&buf, + "WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES "); + for (int i=0; i 0) + { + appendStringInfo(&buf, ", "); + } + + ShardgroupID shardgroupId = shardgroupIDs[i]; + appendStringInfo(&buf, "(%ld::bigint, %u)", + shardgroupId, + colocationId); + } + + appendStringInfo(&buf, ") "); + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_internal_add_shardgroup_metadata(shardgroupid, " + "colocationid) FROM shardgroup_data;"); + + return buf.data; +} + + /* * RemoteTypeIdExpression returns an expression in text form that can * be used to obtain the OID of a type on a different node when included diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 15e167008..b486423a2 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -70,11 +70,13 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_placement.h" #include "distributed/pg_dist_shard.h" +#include "distributed/pg_dist_shardgroup.h" #include "distributed/reference_table_utils.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shardgroup.h" #include "distributed/tuplestore.h" #include "distributed/utils/array_type.h" #include "distributed/version_compat.h" @@ -1350,6 +1352,7 @@ CopyShardInterval(ShardInterval *srcInterval) destInterval->maxValueExists = srcInterval->maxValueExists; destInterval->shardId = srcInterval->shardId; destInterval->shardIndex = srcInterval->shardIndex; + destInterval->shardgroupId = srcInterval->shardgroupId; destInterval->minValue = 0; if (destInterval->minValueExists) @@ -1796,6 +1799,32 @@ IsDummyPlacement(ShardPlacement *taskPlacement) } +void +InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId) +{ + Datum values[Natts_pg_dist_shardgroup]; + bool isNulls[Natts_pg_dist_shardgroup]; + + /* form new shard tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_shardgroup_shardgroupid - 1] = ShardgroupIDGetDatum(shardgroupId); + values[Anum_pg_dist_shardgroup_colocationid - 1] = Int32GetDatum(colocationId); + + /* open shard relation and insert new tuple */ + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + CatalogTupleInsert(pgDistShardgroup, heapTuple); + + CommandCounterIncrement(); + table_close(pgDistShardgroup, NoLock); +} + + /* * InsertShardRow opens the shard system catalog, and inserts a new row with the * given values into that system catalog. Note that we allow the user to pass in @@ -1803,7 +1832,7 @@ IsDummyPlacement(ShardPlacement *taskPlacement) */ void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue) + text *shardMinValue, text *shardMaxValue, ShardgroupID shardgroupId) { Datum values[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard]; @@ -1831,6 +1860,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; } + values[Anum_pg_dist_shard_shardgroupid - 1] = ShardgroupIDGetDatum(shardgroupId); + /* open shard relation and insert new tuple */ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 962547051..98f0d3399 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -50,6 +50,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" +#include "distributed/shardgroup.h" #include "distributed/shardinterval_utils.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" @@ -80,8 +81,9 @@ master_create_worker_shards(PG_FUNCTION_ARGS) * worker nodes. */ void -CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, - int32 replicationFactor, bool useExclusiveConnections) +CreateShardsWithRoundRobinPolicy(Oid distributedTableId, uint32 colocationId, + int32 shardCount, int32 replicationFactor, + bool useExclusiveConnections) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; @@ -163,6 +165,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + ShardgroupID *shardgroupIDs = palloc0(sizeof(ShardgroupID) * shardCount); for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; @@ -184,8 +187,12 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); + ShardgroupID shardgroupId = GetNextShardgroupId(); + InsertShardgroupRow(shardgroupId, colocationId); + shardgroupIDs[shardIndex] = shardgroupId; + InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType, - minHashTokenText, maxHashTokenText); + minHashTokenText, maxHashTokenText, shardgroupId); InsertShardPlacementRows(distributedTableId, *shardIdPtr, @@ -194,6 +201,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, replicationFactor); } + // TODO guess we should check if metadatasync is on + SyncNewShardgoupsToNodes(shardgroupIDs, shardCount, colocationId); + /* * load shard placements for the shard at once after all placement insertions * finished. This prevents MetadataCache from rebuilding unnecessarily after @@ -283,7 +293,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool sourceShardId); InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType, - shardMinValueText, shardMaxValueText); + shardMinValueText, shardMaxValueText, + sourceShardInterval->shardgroupId); ShardPlacement *sourcePlacement = NULL; foreach_ptr(sourcePlacement, sourceShardPlacementList) @@ -366,7 +377,7 @@ CreateReferenceTableShard(Oid distributedTableId) uint64 shardId = GetNextShardId(); InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, - shardMaxValue); + shardMaxValue, InvalidShardgroupID); InsertShardPlacementRows(distributedTableId, shardId, @@ -421,8 +432,11 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio text *minHashTokenText = NULL; text *maxHashTokenText = NULL; uint64 shardId = GetNextShardId(); + + ShardgroupID shardgroupId = GetNextShardgroupId(); + InsertShardgroupRow(shardgroupId, colocationId); InsertShardRow(relationId, shardId, shardStorageType, - minHashTokenText, maxHashTokenText); + minHashTokenText, maxHashTokenText, shardgroupId); int replicationFactor = 1; InsertShardPlacementRows(relationId, diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 52e44bea0..00e690b6f 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -253,6 +253,28 @@ GetNextShardId() } +ShardgroupID +GetNextShardgroupId() +{ + text *sequenceName = cstring_to_text(SHARDGROUPID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName, false); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + Datum shardgroupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + ShardgroupID shardgroupId = DatumGetShardgroupID(shardgroupIdDatum); + return shardgroupId; +} + + /* * master_get_new_placementid is a user facing wrapper function around * GetNextPlacementId() which allocates and returns a unique placement id for the diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index cf9f301b7..417640293 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1177,7 +1177,8 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->shardId, shardInterval->storageType, IntegerToText(DatumGetInt32(shardInterval->minValue)), - IntegerToText(DatumGetInt32(shardInterval->maxValue))); + IntegerToText(DatumGetInt32(shardInterval->maxValue)), + InvalidShardgroupID); InsertShardPlacementRow( shardInterval->shardId, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 5770d648e..8222fd290 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -193,7 +193,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, InvalidShardgroupID); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 63c4a527f..806440efc 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -3,3 +3,4 @@ #include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" +#include "feature/shardgroup/upgrade.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index d18f7257b..c6c153eb6 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -3,3 +3,4 @@ DROP FUNCTION pg_catalog.citus_internal_database_command(text); #include "../udfs/citus_add_rebalance_strategy/10.1-1.sql" +#include "../feature/shardgroup/downgrade.sql" diff --git a/src/backend/distributed/sql/feature/shardgroup/downgrade.sql b/src/backend/distributed/sql/feature/shardgroup/downgrade.sql new file mode 100644 index 000000000..94782b12c --- /dev/null +++ b/src/backend/distributed/sql/feature/shardgroup/downgrade.sql @@ -0,0 +1,9 @@ +DROP TABLE pg_dist_shardgroup; + +-- TODO probably needs to drop table and recreate original one to make sure we can upgrade +-- again later and _not_ have troubled with the internal ordering of the columns. +ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid; + +DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer); +DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint); +#include "../../udfs/citus_internal_add_shard_metadata/10.2-1.sql" \ No newline at end of file diff --git a/src/backend/distributed/sql/feature/shardgroup/upgrade.sql b/src/backend/distributed/sql/feature/shardgroup/upgrade.sql new file mode 100644 index 000000000..1dff2e886 --- /dev/null +++ b/src/backend/distributed/sql/feature/shardgroup/upgrade.sql @@ -0,0 +1,15 @@ +CREATE TABLE citus.pg_dist_shardgroup ( + shardgroupid bigint PRIMARY KEY, + colocationid integer NOT NULL +); + +ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog; +ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardgroupid bigint NOT NULL; + +CREATE SEQUENCE citus.pg_dist_shardgroupid_seq NO CYCLE; +ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog; + +DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); +#include "../../udfs/citus_internal_add_shard_metadata/12.2-1.sql" +#include "../../udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql" + diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql new file mode 100644 index 000000000..7c90607d7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/12.2-1.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shard_min_value text, + shard_max_value text, shardgoupdid bigint + ) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS + 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql index 7411d9179..7c90607d7 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql @@ -1,10 +1,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, storage_type "char", shard_min_value text, - shard_max_value text + shard_max_value text, shardgoupdid bigint ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql new file mode 100644 index 000000000..968aa8c8e --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/12.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata( + shardgroupid bigint, colocationid integer) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer) IS + 'Inserts into pg_dist_shardgroup with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql new file mode 100644 index 000000000..968aa8c8e --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata( + shardgroupid bigint, colocationid integer) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer) IS + 'Inserts into pg_dist_shardgroup with user checks'; diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 01117922e..9bf8864c4 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -231,7 +231,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS) text *maxInfoText = cstring_to_text(maxInfo->data); InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText, - maxInfoText); + maxInfoText, InvalidShardgroupID); PG_RETURN_INT64(newShardId); } diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index b2170fd2e..9f9650e74 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -23,6 +23,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_utility.h" +#include "distributed/shardgroup.h" #include "distributed/shardinterval_utils.h" /* @@ -55,6 +56,7 @@ #define TRANSFER_MODE_BLOCK_WRITES 'b' #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" +#define SHARDGROUPID_SEQUENCE_NAME "pg_dist_shardgroupid_seq" #define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq" /* Remote call definitions to help with data staging and deletion */ @@ -222,6 +224,7 @@ extern bool IsCoordinator(void); /* Function declarations local to the distributed module */ extern uint64 GetNextShardId(void); +extern ShardgroupID GetNextShardgroupId(void); extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName, bool missingOk); extern List * GetFullTableCreationCommands(Oid relationId, @@ -257,7 +260,9 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); -extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, +extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, + uint32 colocationId, + int32 shardCount, int32 replicationFactor, bool useExclusiveConnections); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index f1120497b..6d4c32cdb 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -240,6 +240,7 @@ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); +extern Oid DistShardgroupRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); extern Oid DistBackgroundJobRelationId(void); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e20c44535..7d3861715 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -138,6 +138,8 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumType, Oid distributionColumnCollation); +extern void SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount, + uint32 colocationId); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId); extern char * TenantSchemaDeleteCommand(char *schemaName); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 04a4b500b..c20dbd6cd 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -28,6 +28,7 @@ #include "distributed/connection_management.h" #include "distributed/errormessage.h" #include "distributed/relay_utility.h" +#include "distributed/shardgroup.h" #include "distributed/worker_manager.h" @@ -68,6 +69,7 @@ typedef struct ShardInterval Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; int shardIndex; + ShardgroupID shardgroupId; } ShardInterval; @@ -350,7 +352,9 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue); + text *shardMinValue, text *shardMaxValue, + ShardgroupID shardgroupId); +extern void InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId); extern void DeleteShardRow(uint64 shardId); extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..e6ed5124b 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -16,6 +16,8 @@ #ifndef PG_DIST_SHARD_H #define PG_DIST_SHARD_H +#include "distributed/shardgroup.h" + /* ---------------- * pg_dist_shard definition. * ---------------- @@ -30,6 +32,7 @@ typedef struct FormData_pg_dist_shard text shardminvalue; /* partition key's minimum value in shard */ text shardmaxvalue; /* partition key's maximum value in shard */ #endif + ShardgroupID shardgroupid; } FormData_pg_dist_shard; /* ---------------- @@ -43,13 +46,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; * compiler constants for pg_dist_shards * ---------------- */ -#define Natts_pg_dist_shard 6 +#define Natts_pg_dist_shard 7 #define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardmaxvalue 6 +#define Anum_pg_dist_shard_shardgroupid 7 /* * Valid values for shard storage types include foreign table, (standard) table diff --git a/src/include/distributed/pg_dist_shardgroup.h b/src/include/distributed/pg_dist_shardgroup.h new file mode 100644 index 000000000..d966ef2c8 --- /dev/null +++ b/src/include/distributed/pg_dist_shardgroup.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_shardgroup.h + * definition of the "shardgroup" relation (pg_dist_shardgroup). + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_SHARDGROUP_H +#define PG_DIST_SHARDGROUP_H + +#include "distributed/shardgroup.h" + +/* ---------------- + * pg_dist_shardgroup definition. + * ---------------- + */ +typedef struct FormData_pg_dist_shardgroup +{ + ShardgroupID shardgroupid; + uint32 colocationid; +} FormData_pg_dist_shardgroup; + +/* ---------------- + * Form_pg_dist_shardgroup corresponds to a pointer to a tuple with + * the format of pg_dist_shardgroup relation. + * ---------------- + */ +typedef FormData_pg_dist_shardgroup *Form_pg_dist_shardgroup; + +/* ---------------- + * compiler constants for pg_dist_shardgroup + * ---------------- + */ +#define Natts_pg_dist_shardgroup 2 +#define Anum_pg_dist_shardgroup_shardgroupid 1 +#define Anum_pg_dist_shardgroup_colocationid 2 + +#endif /* PG_DIST_SHARDGROUP_H */ diff --git a/src/include/distributed/shardgroup.h b/src/include/distributed/shardgroup.h new file mode 100644 index 000000000..652389479 --- /dev/null +++ b/src/include/distributed/shardgroup.h @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * shardgroup.h + * Shardgroups are a logical unit of colocated shards from different + * tables belonging to the same colocation group. When shards belong + * to the same shardgroup they move as one logical unit during + * shardmoves, which are better described as shardgroupmoves. + * + * This header defines functions operating on shardgroups as well as + * helpers to work with ShardgroupID's that identify shardgroups. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDGROUP_H +#define SHARDGROUP_H + +#include "postgres.h" + +#include "fmgr.h" + +typedef int64 ShardgroupID; +#define InvalidShardgroupID ((ShardgroupID) 0) +#define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID) + +// helper functions to get a typed ShardgroupID to and from a Datum +#define DatumGetShardgroupID(datum) ((ShardgroupID) DatumGetInt64((datum))) +#define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64)(shardgroupID))) + +#define PG_GETARG_SHARDGROUPID(n) DatumGetShardgroupID(PG_GETARG_DATUM(n)) + +#endif /* SHARDGROUP_H */