fix splitting colocated tables

feature/shardgroup
Nils Dijk 2023-12-01 13:33:05 +00:00
parent 3b777287aa
commit aa84309948
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
9 changed files with 67 additions and 31 deletions

View File

@ -1951,7 +1951,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, uint32 colocati
*/ */
CreateShardsWithRoundRobinPolicy(relationId, CreateShardsWithRoundRobinPolicy(relationId,
colocationId, colocationId,
shardCount, shardCount,
ShardReplicationFactor, ShardReplicationFactor,
useExclusiveConnection); useExclusiveConnection);
} }

View File

@ -5400,7 +5400,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
ShardgroupID shardgoupdId = DatumGetShardgroupID(datumArray[Anum_pg_dist_shard_shardgroupid - 1]); ShardgroupID shardgoupdId = DatumGetShardgroupID(
datumArray[Anum_pg_dist_shard_shardgroupid - 1]);
bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1]; bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1]; bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];

View File

@ -1222,9 +1222,9 @@ ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals)
List * List *
ShardgroupListDeleteCommand(List *shardIntervalList) ShardgroupListDeleteCommand(List *shardgroupList)
{ {
if (list_length(shardIntervalList) == 0) if (list_length(shardgroupList) == 0)
{ {
return NIL; return NIL;
} }
@ -1234,17 +1234,20 @@ ShardgroupListDeleteCommand(List *shardIntervalList)
appendStringInfo(deleteShardgroupsCommand, appendStringInfo(deleteShardgroupsCommand,
"WITH shardgroup_data(shardgroupid) AS (VALUES "); "WITH shardgroup_data(shardgroupid) AS (VALUES ");
ShardInterval *shardInterval = NULL; ShardgroupID *shardgroupID = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
appendStringInfo(deleteShardgroupsCommand,
"(" SHARDGROUPID_FORMAT "::bigint)",
shardInterval->shardgroupId);
if (llast(shardIntervalList) != shardInterval) bool first = true;
foreach_ptr(shardgroupID, shardgroupList)
{
if (!first)
{ {
appendStringInfo(deleteShardgroupsCommand, ", "); appendStringInfo(deleteShardgroupsCommand, ", ");
} }
first = false;
appendStringInfo(deleteShardgroupsCommand,
"(" SHARDGROUPID_SQL_FORMAT ")",
*shardgroupID);
} }
appendStringInfo(deleteShardgroupsCommand, ") "); appendStringInfo(deleteShardgroupsCommand, ") ");
@ -1256,6 +1259,7 @@ ShardgroupListDeleteCommand(List *shardIntervalList)
return list_make1(deleteShardgroupsCommand->data); return list_make1(deleteShardgroupsCommand->data);
} }
/* /*
* ShardListInsertCommand generates a single command that can be * ShardListInsertCommand generates a single command that can be
* executed to replicate shard and shard placement metadata for the * executed to replicate shard and shard placement metadata for the
@ -3437,7 +3441,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue); shardMaxValue);
} }
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, shardgroupID); InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue,
shardgroupID);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -4228,12 +4233,13 @@ SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount, uint32 col
static char * static char *
ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId) ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId)
{ {
StringInfoData buf = {0}; StringInfoData buf = { 0 };
initStringInfo(&buf); initStringInfo(&buf);
/* now add shards to insertShardCommand */ /* now add shards to insertShardCommand */
appendStringInfo(&buf, appendStringInfo(&buf,
"WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES "); "WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES ");
for (int i=0; i<shardCount; i++) for (int i = 0; i < shardCount; i++)
{ {
if (i > 0) if (i > 0)
{ {

View File

@ -1804,7 +1804,9 @@ InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId)
{ {
if (!IsShardgroupIDValid(shardgroupId)) if (!IsShardgroupIDValid(shardgroupId))
{ {
elog(ERROR, "cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT, shardgroupId); elog(ERROR,
"cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT,
shardgroupId);
} }
Datum values[Natts_pg_dist_shardgroup]; Datum values[Natts_pg_dist_shardgroup];

View File

@ -201,7 +201,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, uint32 colocationId,
replicationFactor); replicationFactor);
} }
// TODO guess we should check if metadatasync is on /* TODO guess we should check if metadatasync is on */
SyncNewShardgoupsToNodes(shardgroupIDs, shardCount, colocationId); SyncNewShardgoupsToNodes(shardgroupIDs, shardCount, colocationId);
/* /*

View File

@ -1071,7 +1071,7 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
ShardgroupID *shardgroupIDs = palloc0(sizeof(ShardgroupID) * shardcount); ShardgroupID *shardgroupIDs = palloc0(sizeof(ShardgroupID) * shardcount);
for (int i = 0; i < shardcount; i++) for (int i = 0; i < shardcount; i++)
{ {
shardgroupIDs[i] = GetNextColocationId(); shardgroupIDs[i] = GetNextShardgroupId();
shardgroupIdsList = lappend(shardgroupIdsList, &shardgroupIDs[i]); shardgroupIdsList = lappend(shardgroupIdsList, &shardgroupIDs[i]);
} }
} }
@ -1438,7 +1438,8 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
static void static void
DropShardgroupListMetadata(List *shardIntervalList) DropShardgroupListMetadata(List *shardIntervalList)
{ {
List *syncedShardIntervalList = NIL; HTAB *uniqueShardgroups = CreateSimpleHashSet(ShardgroupID);
HTAB *uniqueSyncedShardgroups = CreateSimpleHashSet(ShardgroupID);
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
@ -1454,19 +1455,43 @@ DropShardgroupListMetadata(List *shardIntervalList)
continue; continue;
} }
DeleteShardgroupRow(shardInterval->shardgroupId); hash_search(uniqueShardgroups,
&shardInterval->shardgroupId,
HASH_ENTER,
NULL);
Oid relationId = shardInterval->relationId; /* if the relation is synced we want to drop the shardgroup from the workers */
if (ShouldSyncTableMetadata(shardInterval->relationId))
/* delete metadata from synced nodes */
if (ShouldSyncTableMetadata(relationId))
{ {
syncedShardIntervalList = lappend(syncedShardIntervalList, shardInterval); hash_search(uniqueSyncedShardgroups,
&shardInterval->shardgroupId,
HASH_ENTER,
NULL);
} }
} }
/* iterate over all entries in uniqueShardgroups to delete the shardgroup locally */
HASH_SEQ_STATUS hashSeqStatus = { 0 };
hash_seq_init(&hashSeqStatus, uniqueShardgroups);
ShardgroupID *shardgroupId = NULL;
while ((shardgroupId = hash_seq_search(&hashSeqStatus)) != NULL)
{
DeleteShardgroupRow(*shardgroupId);
}
/*
* Iterate over all entries in uniqueSyncedShardgroups to turn into a list that can be
* used to propagate the deletion of the shardgroups
*/
List *syncedShardgroupList = NIL;
hash_seq_init(&hashSeqStatus, uniqueSyncedShardgroups);
while ((shardgroupId = hash_seq_search(&hashSeqStatus)) != NULL)
{
syncedShardgroupList = lappend(syncedShardgroupList, shardgroupId);
}
/* delete metadata from all workers with metadata available */ /* delete metadata from all workers with metadata available */
List *commands = ShardgroupListDeleteCommand(syncedShardIntervalList); List *commands = ShardgroupListDeleteCommand(syncedShardgroupList);
char *command = NULL; char *command = NULL;
foreach_ptr(command, commands) foreach_ptr(command, commands)
{ {

View File

@ -193,7 +193,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++; candidateNodeIndex++;
} }
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, InvalidShardgroupID); InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue,
InvalidShardgroupID);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor); ShardReplicationFactor);

View File

@ -100,7 +100,7 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId);
extern char * NodeListInsertCommand(List *workerNodeList); extern char * NodeListInsertCommand(List *workerNodeList);
char * NodeListIdempotentInsertCommand(List *workerNodeList); char * NodeListIdempotentInsertCommand(List *workerNodeList);
extern List * ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals); extern List * ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals);
extern List * ShardgroupListDeleteCommand(List *shardIntervalList); extern List * ShardgroupListDeleteCommand(List *shardgroupList);
extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardListInsertCommand(List *shardIntervalList);
extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeDeleteCommand(uint32 nodeId);

View File

@ -5,7 +5,7 @@
* tables belonging to the same colocation group. When shards belong * tables belonging to the same colocation group. When shards belong
* to the same shardgroup they move as one logical unit during * to the same shardgroup they move as one logical unit during
* shardmoves, which are better described as shardgroupmoves. * shardmoves, which are better described as shardgroupmoves.
* *
* This header defines functions operating on shardgroups as well as * This header defines functions operating on shardgroups as well as
* helpers to work with ShardgroupID's that identify shardgroups. * helpers to work with ShardgroupID's that identify shardgroups.
* *
@ -23,12 +23,13 @@
typedef int64 ShardgroupID; typedef int64 ShardgroupID;
#define SHARDGROUPID_FORMAT INT64_FORMAT #define SHARDGROUPID_FORMAT INT64_FORMAT
#define SHARDGROUPID_SQL_FORMAT SHARDGROUPID_FORMAT "::bigint"
#define InvalidShardgroupID ((ShardgroupID) 0) #define InvalidShardgroupID ((ShardgroupID) 0)
#define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID) #define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID)
// helper functions to get a typed ShardgroupID to and from a Datum /* helper functions to get a typed ShardgroupID to and from a Datum */
#define DatumGetShardgroupID(datum) ((ShardgroupID) DatumGetInt64((datum))) #define DatumGetShardgroupID(datum) ((ShardgroupID) DatumGetInt64((datum)))
#define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64)(shardgroupID))) #define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64) (shardgroupID)))
#define PG_GETARG_SHARDGROUPID(n) DatumGetShardgroupID(PG_GETARG_DATUM(n)) #define PG_GETARG_SHARDGROUPID(n) DatumGetShardgroupID(PG_GETARG_DATUM(n))