source shardgroup id's from its own sequence

backup/feature/shardgroup
Nils Dijk 2022-12-12 14:21:17 +01:00
parent 405cc896f7
commit 27c44ef216
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
6 changed files with 70 additions and 7 deletions

View File

@ -194,9 +194,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
/* we use shardid of the first shard in a shardgroup as the shardgroupid */
int64 shardGroupId = (int64) shardId;
int64 shardGroupId = GetNextShardgroupId();
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
@ -399,7 +397,7 @@ CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId,
/* get the next shard id */
uint64 shardId = GetNextShardId();
int64 shardGroupId = (int64) shardId;
int64 shardGroupId = GetNextShardgroupId();
StringInfoData shardgroupQuery = { 0 };
initStringInfo(&shardgroupQuery);

View File

@ -252,6 +252,30 @@ GetNextShardId()
}
int64
GetNextShardgroupId()
{
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
text *sequenceName = cstring_to_text(SHARDGROUPID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
Datum shardgroupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
int64 shardgroupId = DatumGetInt64(shardgroupIdDatum);
return shardgroupId;
}
/*
* master_get_new_placementid is a user facing wrapper function around
* GetNextPlacementId() which allocates and returns a unique placement id for the

View File

@ -157,6 +157,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
targetNodeId,
ShardInterval *shardInterval);
static uint64 GetNextShardIdForSplitChild(void);
static int64 GetNextShardgroupIdForSplitChild(void);
static void AcquireNonblockingSplitLock(Oid relationId);
static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList);
static void DropShardgroupMetadata(int64 shardgroupId);
@ -1063,7 +1064,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
Datum splitPoint = (Datum) lfirst(splitPointCell);
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild();
shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitPoint;
@ -1080,7 +1081,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval,
*/
Shardgroup *shardgroup = palloc0(sizeof(Shardgroup));
shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild();
shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild();
shardgroup->colocationId = colocationId;
shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue);
shardgroup->maxShardValue = splitParentMaxValue;
@ -2228,3 +2229,36 @@ GetNextShardIdForSplitChild()
return shardId;
}
static int64
GetNextShardgroupIdForSplitChild()
{
StringInfo nextValueCommand = makeStringInfo();
appendStringInfo(nextValueCommand, "SELECT nextval(%s);",
quote_literal_cstr("pg_catalog.pg_dist_shardgroupid_seq"));
MultiConnection *connection =
GetConnectionForLocalQueriesOutsideTransaction(CitusExtensionOwnerName());
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1 ||
PQnfields(result) != 1)
{
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Could not generate next shardgroup id while executing "
"shard splits.")));
}
int64 shardgroupId = SafeStringToInt64(PQgetvalue(result, 0, 0 /* nodeId column*/));
PQclear(result);
ForgetResults(connection);
return shardgroupId;
}

View File

@ -186,7 +186,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId);
int64 shardgroupId = (int64) shardId;
int64 shardgroupId = GetNextShardgroupId();
InsertShardGroupRow(shardgroupId, tableEntry->colocationId,
nullMinValue, nullMaxValue);

View File

@ -10,6 +10,11 @@ CREATE TABLE citus.pg_dist_shardgroup (
);
ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog;
CREATE SEQUENCE citus.pg_dist_shardgroupid_seq
MINVALUE 100000 -- TO BO DECIDED
NO CYCLE;
ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog;
INSERT INTO pg_catalog.pg_dist_shardgroup
SELECT min(shardid) as shardgroupid,
colocationid,

View File

@ -53,6 +53,7 @@
#define TRANSFER_MODE_BLOCK_WRITES 'b'
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
#define SHARDGROUPID_SEQUENCE_NAME "pg_dist_shardgroupid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq"
/* Remote call definitions to help with data staging and deletion */
@ -221,6 +222,7 @@ extern bool IsCoordinator(void);
/* Function declarations local to the distributed module */
extern uint64 GetNextShardId(void);
extern int64 GetNextShardgroupId(void);
extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId,