diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index c3c320ef0..733f8cd8f 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1951,7 +1951,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, uint32 colocati */ CreateShardsWithRoundRobinPolicy(relationId, colocationId, - shardCount, + shardCount, ShardReplicationFactor, useExclusiveConnection); } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 16494c984..815b89607 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -5400,7 +5400,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]); Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1]; Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1]; - ShardgroupID shardgoupdId = DatumGetShardgroupID(datumArray[Anum_pg_dist_shard_shardgroupid - 1]); + ShardgroupID shardgoupdId = DatumGetShardgroupID( + datumArray[Anum_pg_dist_shard_shardgroupid - 1]); bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1]; bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1]; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 13c23953e..5e50e4709 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1222,9 +1222,9 @@ ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals) List * -ShardgroupListDeleteCommand(List *shardIntervalList) +ShardgroupListDeleteCommand(List *shardgroupList) { - if (list_length(shardIntervalList) == 0) + if (list_length(shardgroupList) == 0) { return NIL; } @@ -1234,17 +1234,20 @@ ShardgroupListDeleteCommand(List *shardIntervalList) appendStringInfo(deleteShardgroupsCommand, "WITH shardgroup_data(shardgroupid) AS (VALUES "); - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, shardIntervalList) - { - appendStringInfo(deleteShardgroupsCommand, - "(" SHARDGROUPID_FORMAT "::bigint)", - shardInterval->shardgroupId); + ShardgroupID *shardgroupID = NULL; - if (llast(shardIntervalList) != shardInterval) + bool first = true; + foreach_ptr(shardgroupID, shardgroupList) + { + if (!first) { appendStringInfo(deleteShardgroupsCommand, ", "); } + first = false; + + appendStringInfo(deleteShardgroupsCommand, + "(" SHARDGROUPID_SQL_FORMAT ")", + *shardgroupID); } appendStringInfo(deleteShardgroupsCommand, ") "); @@ -1256,6 +1259,7 @@ ShardgroupListDeleteCommand(List *shardIntervalList) return list_make1(deleteShardgroupsCommand->data); } + /* * ShardListInsertCommand generates a single command that can be * executed to replicate shard and shard placement metadata for the @@ -3437,7 +3441,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) shardMaxValue); } - InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, shardgroupID); + InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue, + shardgroupID); PG_RETURN_VOID(); } @@ -4228,12 +4233,13 @@ SyncNewShardgoupsToNodes(ShardgroupID *shardgroupIDs, int shardCount, uint32 col static char * ShardgroupsCreateCommand(ShardgroupID *shardgroupIDs, int shardCount, uint32 colocationId) { - StringInfoData buf = {0}; + StringInfoData buf = { 0 }; initStringInfo(&buf); + /* now add shards to insertShardCommand */ - appendStringInfo(&buf, + appendStringInfo(&buf, "WITH shardgroup_data(shardgroupid, colocationid) AS (VALUES "); - for (int i=0; i 0) { diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index d48a8cbfb..faf995aa5 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1804,7 +1804,9 @@ InsertShardgroupRow(ShardgroupID shardgroupId, uint32 colocationId) { if (!IsShardgroupIDValid(shardgroupId)) { - elog(ERROR, "cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT, shardgroupId); + elog(ERROR, + "cannot insert invalid shardgroupid: " SHARDGROUPID_FORMAT, + shardgroupId); } Datum values[Natts_pg_dist_shardgroup]; diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 98f0d3399..ca964feef 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -201,7 +201,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, uint32 colocationId, replicationFactor); } - // TODO guess we should check if metadatasync is on + /* TODO guess we should check if metadatasync is on */ SyncNewShardgoupsToNodes(shardgroupIDs, shardCount, colocationId); /* diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 721b7f5d3..bb0aacfa0 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1071,7 +1071,7 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, ShardgroupID *shardgroupIDs = palloc0(sizeof(ShardgroupID) * shardcount); for (int i = 0; i < shardcount; i++) { - shardgroupIDs[i] = GetNextColocationId(); + shardgroupIDs[i] = GetNextShardgroupId(); shardgroupIdsList = lappend(shardgroupIdsList, &shardgroupIDs[i]); } } @@ -1438,7 +1438,8 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void DropShardgroupListMetadata(List *shardIntervalList) { - List *syncedShardIntervalList = NIL; + HTAB *uniqueShardgroups = CreateSimpleHashSet(ShardgroupID); + HTAB *uniqueSyncedShardgroups = CreateSimpleHashSet(ShardgroupID); ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -1454,19 +1455,43 @@ DropShardgroupListMetadata(List *shardIntervalList) continue; } - DeleteShardgroupRow(shardInterval->shardgroupId); + hash_search(uniqueShardgroups, + &shardInterval->shardgroupId, + HASH_ENTER, + NULL); - Oid relationId = shardInterval->relationId; - - /* delete metadata from synced nodes */ - if (ShouldSyncTableMetadata(relationId)) + /* if the relation is synced we want to drop the shardgroup from the workers */ + if (ShouldSyncTableMetadata(shardInterval->relationId)) { - syncedShardIntervalList = lappend(syncedShardIntervalList, shardInterval); + hash_search(uniqueSyncedShardgroups, + &shardInterval->shardgroupId, + HASH_ENTER, + NULL); } } + /* iterate over all entries in uniqueShardgroups to delete the shardgroup locally */ + HASH_SEQ_STATUS hashSeqStatus = { 0 }; + hash_seq_init(&hashSeqStatus, uniqueShardgroups); + ShardgroupID *shardgroupId = NULL; + while ((shardgroupId = hash_seq_search(&hashSeqStatus)) != NULL) + { + DeleteShardgroupRow(*shardgroupId); + } + + /* + * Iterate over all entries in uniqueSyncedShardgroups to turn into a list that can be + * used to propagate the deletion of the shardgroups + */ + List *syncedShardgroupList = NIL; + hash_seq_init(&hashSeqStatus, uniqueSyncedShardgroups); + while ((shardgroupId = hash_seq_search(&hashSeqStatus)) != NULL) + { + syncedShardgroupList = lappend(syncedShardgroupList, shardgroupId); + } + /* delete metadata from all workers with metadata available */ - List *commands = ShardgroupListDeleteCommand(syncedShardIntervalList); + List *commands = ShardgroupListDeleteCommand(syncedShardgroupList); char *command = NULL; foreach_ptr(command, commands) { diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 8222fd290..38d774cea 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -193,7 +193,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, InvalidShardgroupID); + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, + InvalidShardgroupID); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index aa52ecfc7..07291a577 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -100,7 +100,7 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); char * NodeListIdempotentInsertCommand(List *workerNodeList); extern List * ShardgroupListInsertCommand(uint32 colocationId, List *shardIntervals); -extern List * ShardgroupListDeleteCommand(List *shardIntervalList); +extern List * ShardgroupListDeleteCommand(List *shardgroupList); extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); diff --git a/src/include/distributed/shardgroup.h b/src/include/distributed/shardgroup.h index 3ba24c546..df48c1bfb 100644 --- a/src/include/distributed/shardgroup.h +++ b/src/include/distributed/shardgroup.h @@ -5,7 +5,7 @@ * tables belonging to the same colocation group. When shards belong * to the same shardgroup they move as one logical unit during * shardmoves, which are better described as shardgroupmoves. - * + * * This header defines functions operating on shardgroups as well as * helpers to work with ShardgroupID's that identify shardgroups. * @@ -23,12 +23,13 @@ typedef int64 ShardgroupID; #define SHARDGROUPID_FORMAT INT64_FORMAT +#define SHARDGROUPID_SQL_FORMAT SHARDGROUPID_FORMAT "::bigint" #define InvalidShardgroupID ((ShardgroupID) 0) #define IsShardgroupIDValid(shardgroupID) ((shardgroupID) != InvalidShardgroupID) -// helper functions to get a typed ShardgroupID to and from a Datum +/* helper functions to get a typed ShardgroupID to and from a Datum */ #define DatumGetShardgroupID(datum) ((ShardgroupID) DatumGetInt64((datum))) -#define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64)(shardgroupID))) +#define ShardgroupIDGetDatum(shardgroupID) Int64GetDatum(((int64) (shardgroupID))) #define PG_GETARG_SHARDGROUPID(n) DatumGetShardgroupID(PG_GETARG_DATUM(n))