diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index bb4ab7473..b94b44cac 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1417,7 +1417,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, text *shardMinValue = NULL; text *shardMaxValue = NULL; InsertShardRow(citusLocalTableId, shardId, shardStorageType, - shardMinValue, shardMaxValue); + shardMinValue, shardMaxValue, NULL); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index b7753108c..c2302a0dd 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -5235,6 +5235,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, Oid relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]); int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]); + int64 shardGroupId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardgroupid - 1]); char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; @@ -5274,6 +5275,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, shardInterval->minValue = minValue; shardInterval->maxValue = maxValue; shardInterval->shardId = shardId; + shardInterval->shardGroupId = shardGroupId; return shardInterval; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index df9104efd..04a4f63e6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1304,11 +1304,12 @@ ShardListInsertCommand(List *shardIntervalList) StringInfo insertShardCommand = makeStringInfo(); appendStringInfo(insertShardCommand, "WITH shard_data(relationname, shardid, storagetype, " - "shardminvalue, shardmaxvalue) AS (VALUES "); + "shardminvalue, shardmaxvalue, shardgroup_id) AS (VALUES "); foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; + uint64 shardGroupId = shardInterval->shardGroupId; Oid distributedRelationId = shardInterval->relationId; char *qualifiedRelationName = generate_qualified_relation_name( distributedRelationId); @@ -1336,12 +1337,13 @@ ShardListInsertCommand(List *shardIntervalList) } appendStringInfo(insertShardCommand, - "(%s::regclass, %ld, '%c'::\"char\", %s, %s)", + "(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, minHashToken->data, - maxHashToken->data); + maxHashToken->data, + shardGroupId); if (llast(shardIntervalList) != shardInterval) { @@ -1353,7 +1355,7 @@ ShardListInsertCommand(List *shardIntervalList) appendStringInfo(insertShardCommand, "SELECT citus_internal_add_shard_metadata(relationname, shardid, " - "storagetype, shardminvalue, shardmaxvalue) " + "storagetype, shardminvalue, shardmaxvalue, shardgroup_id) " "FROM shard_data;"); /* @@ -3257,6 +3259,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) shardMaxValue = PG_GETARG_TEXT_P(4); } + PG_ENSURE_ARGNOTNULL(5, "shard group id"); + uint64 shardGroupId = (uint64) PG_GETARG_INT64(5); + /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); @@ -3277,7 +3282,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); + InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, + &shardGroupId); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 59c39a030..e867b0cf9 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1276,6 +1276,7 @@ CopyShardInterval(ShardInterval *srcInterval) destInterval->minValueExists = srcInterval->minValueExists; destInterval->maxValueExists = srcInterval->maxValueExists; destInterval->shardId = srcInterval->shardId; + destInterval->shardGroupId = srcInterval->shardGroupId; destInterval->shardIndex = srcInterval->shardIndex; destInterval->minValue = 0; @@ -1677,7 +1678,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) */ void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue) + text *shardMinValue, text *shardMaxValue, uint64 *shardGroupId) { Datum values[Natts_pg_dist_shard]; bool isNulls[Natts_pg_dist_shard]; @@ -1705,7 +1706,14 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; } - isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true; + if (shardGroupId) + { + values[Anum_pg_dist_shard_shardgroupid - 1] = UInt64GetDatum(*shardGroupId); + } + else + { + isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true; + } /* open shard relation and insert new tuple */ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 3edab94e9..ce962a1d8 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -188,7 +188,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *maxHashTokenText = IntegerToText(shardMaxHashToken); InsertShardRow(distributedTableId, shardId, shardStorageType, - minHashTokenText, maxHashTokenText); + minHashTokenText, maxHashTokenText, NULL); List *currentInsertedShardPlacements = InsertShardPlacementRows( distributedTableId, @@ -258,6 +258,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64)); *newShardIdPtr = GetNextShardId(); insertedShardIds = lappend(insertedShardIds, newShardIdPtr); + uint64 sourceShardGroupId = sourceShardInterval->shardGroupId; int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); @@ -267,7 +268,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool sourceShardId); InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType, - shardMinValueText, shardMaxValueText); + shardMinValueText, shardMaxValueText, &sourceShardGroupId); ShardPlacement *sourcePlacement = NULL; foreach_ptr(sourcePlacement, sourceShardPlacementList) @@ -351,7 +352,7 @@ CreateReferenceTableShard(Oid distributedTableId) uint64 shardId = GetNextShardId(); InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, - shardMaxValue); + shardMaxValue, NULL); List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, nodeList, workerStartIndex, diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 564e9420d..6678fbdb1 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1074,6 +1074,8 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, splitChildShardInterval->shardIndex = -1; splitChildShardInterval->shardId = GetNextShardIdForSplitChild(); + /* TODO find shardgroup id */ + splitChildShardInterval->minValueExists = true; splitChildShardInterval->minValue = currentSplitChildMinValue; splitChildShardInterval->maxValueExists = true; @@ -1175,7 +1177,8 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->shardId, shardInterval->storageType, IntegerToText(DatumGetInt32(shardInterval->minValue)), - IntegerToText(DatumGetInt32(shardInterval->maxValue))); + IntegerToText(DatumGetInt32(shardInterval->maxValue)), + NULL); InsertShardPlacementRow( shardInterval->shardId, diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index ce9fe3f31..2ac847471 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -184,7 +184,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, NULL); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index be6caf0e2..0c48cfd35 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4494,6 +4494,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount) shardInterval->maxValue = Int32GetDatum(shardMaxHashToken); shardInterval->shardId = INVALID_SHARD_ID; + shardInterval->shardGroupId = INVALID_SHARD_ID; shardInterval->valueTypeId = INT4OID; shardIntervalArray[shardIndex] = shardInterval; diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 8ba6739ce..b02f0d65c 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -52,3 +52,10 @@ WHERE shard.logicalrelid = shardgroup.logicalrelid -- shardgroup we would not want the setup to use the new Citus version as it hard relies on the shardgroups being -- correctly associated. ALTER TABLE pg_catalog.pg_dist_shard ALTER COLUMN shardgroupid SET NOT NULL; + +#include "udfs/citus_internal_add_shard_metadata/11.3-1.sql" +DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shard_min_value text, + shard_max_value text +); diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.3-1.sql new file mode 100644 index 000000000..cfb21f4d5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/11.3-1.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( + relation_id regclass, shard_id bigint, + storage_type "char", shard_min_value text, + shard_max_value text, shardgroup_id bigint + ) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS + 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql index 7411d9179..cfb21f4d5 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shard_metadata/latest.sql @@ -1,10 +1,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata( relation_id regclass, shard_id bigint, storage_type "char", shard_min_value text, - shard_max_value text + shard_max_value text, shardgroup_id bigint ) RETURNS void LANGUAGE C AS 'MODULE_PATHNAME'; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS 'Inserts into pg_dist_shard with user checks'; diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index cb378e9e3..e715bc9b7 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -225,7 +225,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS) text *maxInfoText = cstring_to_text(maxInfo->data); InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText, - maxInfoText); + maxInfoText, NULL); PG_RETURN_INT64(newShardId); } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index acb4ae5da..9868d0392 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -66,6 +66,7 @@ typedef struct ShardInterval Datum minValue; /* a shard's typed min value datum */ Datum maxValue; /* a shard's typed max value datum */ uint64 shardId; + uint64 shardGroupId; int shardIndex; } ShardInterval; @@ -306,7 +307,8 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, - text *shardMinValue, text *shardMaxValue); + text *shardMinValue, text *shardMaxValue, + uint64 *shardGroupId); extern void DeleteShardRow(uint64 shardId); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId);