From 27c44ef2169df75c69d326255306629c209209a5 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 12 Dec 2022 14:21:17 +0100 Subject: [PATCH] source shardgroup id's from its own sequence --- .../distributed/operations/create_shards.c | 6 +-- .../distributed/operations/node_protocol.c | 24 ++++++++++++ .../distributed/operations/shard_split.c | 38 ++++++++++++++++++- .../distributed/operations/stage_protocol.c | 2 +- .../distributed/sql/citus--11.2-1--11.3-1.sql | 5 +++ .../distributed/coordinator_protocol.h | 2 + 6 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 06e8f660a..ecccaa59d 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -194,9 +194,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); uint64 shardId = GetNextShardId(); - - /* we use shardid of the first shard in a shardgroup as the shardgroupid */ - int64 shardGroupId = (int64) shardId; + int64 shardGroupId = GetNextShardgroupId(); /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -399,7 +397,7 @@ CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId, /* get the next shard id */ uint64 shardId = GetNextShardId(); - int64 shardGroupId = (int64) shardId; + int64 shardGroupId = GetNextShardgroupId(); StringInfoData shardgroupQuery = { 0 }; initStringInfo(&shardgroupQuery); diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 172a2a303..c73440a36 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -252,6 +252,30 @@ GetNextShardId() } +int64 +GetNextShardgroupId() +{ + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + + text *sequenceName = cstring_to_text(SHARDGROUPID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName, false); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + Datum shardgroupIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + int64 shardgroupId = DatumGetInt64(shardgroupIdDatum); + + return shardgroupId; +} + + /* * master_get_new_placementid is a user facing wrapper function around * GetNextPlacementId() which allocates and returns a unique placement id for the diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index e1345cf5c..03c0332de 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -157,6 +157,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); +static int64 GetNextShardgroupIdForSplitChild(void); static void AcquireNonblockingSplitLock(Oid relationId); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); static void DropShardgroupMetadata(int64 shardgroupId); @@ -1063,7 +1064,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, Datum splitPoint = (Datum) lfirst(splitPointCell); Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); - shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild(); + shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild(); shardgroup->colocationId = colocationId; shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); shardgroup->maxShardValue = splitPoint; @@ -1080,7 +1081,7 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, */ Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); - shardgroup->shardgroupId = (int64) GetNextShardIdForSplitChild(); + shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild(); shardgroup->colocationId = colocationId; shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); shardgroup->maxShardValue = splitParentMaxValue; @@ -2228,3 +2229,36 @@ GetNextShardIdForSplitChild() return shardId; } + + +static int64 +GetNextShardgroupIdForSplitChild() +{ + StringInfo nextValueCommand = makeStringInfo(); + appendStringInfo(nextValueCommand, "SELECT nextval(%s);", + quote_literal_cstr("pg_catalog.pg_dist_shardgroupid_seq")); + + MultiConnection *connection = + GetConnectionForLocalQueriesOutsideTransaction(CitusExtensionOwnerName()); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, + &result); + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1 || + PQnfields(result) != 1) + { + PQclear(result); + ForgetResults(connection); + CloseConnection(connection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Could not generate next shardgroup id while executing " + "shard splits."))); + } + + int64 shardgroupId = SafeStringToInt64(PQgetvalue(result, 0, 0 /* nodeId column*/)); + PQclear(result); + ForgetResults(connection); + + return shardgroupId; +} diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index a7cb61882..c687812db 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -186,7 +186,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); - int64 shardgroupId = (int64) shardId; + int64 shardgroupId = GetNextShardgroupId(); InsertShardGroupRow(shardgroupId, tableEntry->colocationId, nullMinValue, nullMaxValue); diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index b410ffd3f..b862833c3 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -10,6 +10,11 @@ CREATE TABLE citus.pg_dist_shardgroup ( ); ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog; +CREATE SEQUENCE citus.pg_dist_shardgroupid_seq + MINVALUE 100000 -- TO BO DECIDED + NO CYCLE; +ALTER SEQUENCE citus.pg_dist_shardgroupid_seq SET SCHEMA pg_catalog; + INSERT INTO pg_catalog.pg_dist_shardgroup SELECT min(shardid) as shardgroupid, colocationid, diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 358303387..65c7958f7 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -53,6 +53,7 @@ #define TRANSFER_MODE_BLOCK_WRITES 'b' #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" +#define SHARDGROUPID_SEQUENCE_NAME "pg_dist_shardgroupid_seq" #define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq" /* Remote call definitions to help with data staging and deletion */ @@ -221,6 +222,7 @@ extern bool IsCoordinator(void); /* Function declarations local to the distributed module */ extern uint64 GetNextShardId(void); +extern int64 GetNextShardgroupId(void); extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName, bool missingOk); extern List * GetFullTableCreationCommands(Oid relationId,