implement creating colocated tables

backup/feature/shardgroup
Nils Dijk 2022-11-23 13:37:08 +01:00
parent 2768b80fd5
commit c276ae9587
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
13 changed files with 57 additions and 17 deletions

View File

@ -1417,7 +1417,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,
text *shardMinValue = NULL;
text *shardMaxValue = NULL;
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue);
shardMinValue, shardMaxValue, NULL);
List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());

View File

@ -5235,6 +5235,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
Oid relationId =
DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]);
int64 shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]);
int64 shardGroupId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardgroupid - 1]);
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
@ -5274,6 +5275,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
shardInterval->minValue = minValue;
shardInterval->maxValue = maxValue;
shardInterval->shardId = shardId;
shardInterval->shardGroupId = shardGroupId;
return shardInterval;
}

View File

@ -1304,11 +1304,12 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo();
appendStringInfo(insertShardCommand,
"WITH shard_data(relationname, shardid, storagetype, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
"shardminvalue, shardmaxvalue, shardgroup_id) AS (VALUES ");
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
uint64 shardGroupId = shardInterval->shardGroupId;
Oid distributedRelationId = shardInterval->relationId;
char *qualifiedRelationName = generate_qualified_relation_name(
distributedRelationId);
@ -1336,12 +1337,13 @@ ShardListInsertCommand(List *shardIntervalList)
}
appendStringInfo(insertShardCommand,
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
"(%s::regclass, %ld, '%c'::\"char\", %s, %s, %ld)",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
minHashToken->data,
maxHashToken->data);
maxHashToken->data,
shardGroupId);
if (llast(shardIntervalList) != shardInterval)
{
@ -1353,7 +1355,7 @@ ShardListInsertCommand(List *shardIntervalList)
appendStringInfo(insertShardCommand,
"SELECT citus_internal_add_shard_metadata(relationname, shardid, "
"storagetype, shardminvalue, shardmaxvalue) "
"storagetype, shardminvalue, shardmaxvalue, shardgroup_id) "
"FROM shard_data;");
/*
@ -3257,6 +3259,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue = PG_GETARG_TEXT_P(4);
}
PG_ENSURE_ARGNOTNULL(5, "shard group id");
uint64 shardGroupId = (uint64) PG_GETARG_INT64(5);
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);
@ -3277,7 +3282,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue);
}
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue,
&shardGroupId);
PG_RETURN_VOID();
}

View File

@ -1276,6 +1276,7 @@ CopyShardInterval(ShardInterval *srcInterval)
destInterval->minValueExists = srcInterval->minValueExists;
destInterval->maxValueExists = srcInterval->maxValueExists;
destInterval->shardId = srcInterval->shardId;
destInterval->shardGroupId = srcInterval->shardGroupId;
destInterval->shardIndex = srcInterval->shardIndex;
destInterval->minValue = 0;
@ -1677,7 +1678,7 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
*/
void
InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue)
text *shardMinValue, text *shardMaxValue, uint64 *shardGroupId)
{
Datum values[Natts_pg_dist_shard];
bool isNulls[Natts_pg_dist_shard];
@ -1705,7 +1706,14 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
}
isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true;
if (shardGroupId)
{
values[Anum_pg_dist_shard_shardgroupid - 1] = UInt64GetDatum(*shardGroupId);
}
else
{
isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true;
}
/* open shard relation and insert new tuple */
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);

View File

@ -188,7 +188,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
minHashTokenText, maxHashTokenText, NULL);
List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
@ -258,6 +258,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*newShardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, newShardIdPtr);
uint64 sourceShardGroupId = sourceShardInterval->shardGroupId;
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
@ -267,7 +268,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
sourceShardId);
InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType,
shardMinValueText, shardMaxValueText);
shardMinValueText, shardMaxValueText, &sourceShardGroupId);
ShardPlacement *sourcePlacement = NULL;
foreach_ptr(sourcePlacement, sourceShardPlacementList)
@ -351,7 +352,7 @@ CreateReferenceTableShard(Oid distributedTableId)
uint64 shardId = GetNextShardId();
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);
shardMaxValue, NULL);
List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,

View File

@ -1074,6 +1074,8 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
splitChildShardInterval->shardIndex = -1;
splitChildShardInterval->shardId = GetNextShardIdForSplitChild();
/* TODO find shardgroup id */
splitChildShardInterval->minValueExists = true;
splitChildShardInterval->minValue = currentSplitChildMinValue;
splitChildShardInterval->maxValueExists = true;
@ -1175,7 +1177,8 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
shardInterval->shardId,
shardInterval->storageType,
IntegerToText(DatumGetInt32(shardInterval->minValue)),
IntegerToText(DatumGetInt32(shardInterval->maxValue)));
IntegerToText(DatumGetInt32(shardInterval->maxValue)),
NULL);
InsertShardPlacementRow(
shardInterval->shardId,

View File

@ -184,7 +184,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++;
}
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, NULL);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor);

View File

@ -4494,6 +4494,7 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
shardInterval->maxValue = Int32GetDatum(shardMaxHashToken);
shardInterval->shardId = INVALID_SHARD_ID;
shardInterval->shardGroupId = INVALID_SHARD_ID;
shardInterval->valueTypeId = INT4OID;
shardIntervalArray[shardIndex] = shardInterval;

View File

@ -52,3 +52,10 @@ WHERE shard.logicalrelid = shardgroup.logicalrelid
-- shardgroup we would not want the setup to use the new Citus version as it hard relies on the shardgroups being
-- correctly associated.
ALTER TABLE pg_catalog.pg_dist_shard ALTER COLUMN shardgroupid SET NOT NULL;
#include "udfs/citus_internal_add_shard_metadata/11.3-1.sql"
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text
);

View File

@ -0,0 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text, shardgroup_id bigint
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS
'Inserts into pg_dist_shard with user checks';

View File

@ -1,10 +1,10 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shard_metadata(
relation_id regclass, shard_id bigint,
storage_type "char", shard_min_value text,
shard_max_value text
shard_max_value text, shardgroup_id bigint
)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text) IS
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, bigint) IS
'Inserts into pg_dist_shard with user checks';

View File

@ -225,7 +225,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
text *maxInfoText = cstring_to_text(maxInfo->data);
InsertShardRow(distributedTableId, newShardId, SHARD_STORAGE_TABLE, minInfoText,
maxInfoText);
maxInfoText, NULL);
PG_RETURN_INT64(newShardId);
}

View File

@ -66,6 +66,7 @@ typedef struct ShardInterval
Datum minValue; /* a shard's typed min value datum */
Datum maxValue; /* a shard's typed max value datum */
uint64 shardId;
uint64 shardGroupId;
int shardIndex;
} ShardInterval;
@ -306,7 +307,8 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue);
text *shardMinValue, text *shardMaxValue,
uint64 *shardGroupId);
extern void DeleteShardRow(uint64 shardId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);