diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e672cb3e7..57c0da666 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1386,6 +1386,44 @@ ShardListInsertCommand(List *shardIntervalList) } +const char * +ShardgroupInsertCommand(List *shardgroups) +{ + StringInfoData command = { 0 }; + initStringInfo(&command); + + appendStringInfoString(&command, + "WITH shardgroup_data(shardgroupid, colocationid, " + "shardminvalue, shardmaxvalue) AS (VALUES "); + + Shardgroup *shardgroup = NULL; + bool firstShardGroup = true; + foreach_ptr(shardgroup, shardgroups) + { + if (!firstShardGroup) + { + appendStringInfoString(&command, ", "); + } + firstShardGroup = false; + + appendStringInfo(&command, "(%ld, %d, %s, %s)", + shardgroup->shardgroupId, + shardgroup->colocationId, + TextToSQLLiteral(IntegerToText( + DatumGetInt32(shardgroup->minShardValue))), + TextToSQLLiteral(IntegerToText( + DatumGetInt32(shardgroup->maxShardValue)))); + } + appendStringInfo(&command, ") "); + + appendStringInfo(&command, + "SELECT citus_internal_add_shardgroup_metadata(shardgroupid, " + "colocationid, shardminvalue, shardmaxvalue) FROM shardgroup_data;"); + + return command.data; +} + + /* * ShardListDeleteCommand generates a command list that can be executed to delete * shard and shard placement metadata for the given shard. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 71a78ef5f..06e8f660a 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -68,7 +68,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } -static char * +char * TextToSQLLiteral(text *value) { if (!value) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index facc1dcaa..e1345cf5c 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1207,7 +1207,11 @@ InsertSplitChildrenShardgroupMetadata(List *newShardgroups) IntegerToText(DatumGetInt32(shardgroup->maxShardValue))); } - /* TODO sync new shardgroups to workers */ + if (/* TODO check if shardgroup needs to propagate */ true) + { + const char *command = ShardgroupInsertCommand(newShardgroups); + SendCommandToWorkersWithMetadata(command); + } } diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 5aff81487..358303387 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -257,6 +257,7 @@ extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); +extern char * TextToSQLLiteral(text *value); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor, bool useExclusiveConnections); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 73fe7c8b3..9c7786326 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -77,6 +77,7 @@ extern char * DistributionDeleteMetadataCommand(Oid relationId); extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); +extern const char * ShardgroupInsertCommand(List *shardgroups); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);