insert shardgroups during shardsplit

backup/feature/shardgroup
Nils Dijk 2022-12-12 13:59:01 +01:00
parent 8e9acf0add
commit 405cc896f7
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
5 changed files with 46 additions and 2 deletions

View File

@ -1386,6 +1386,44 @@ ShardListInsertCommand(List *shardIntervalList)
}
const char *
ShardgroupInsertCommand(List *shardgroups)
{
StringInfoData command = { 0 };
initStringInfo(&command);
appendStringInfoString(&command,
"WITH shardgroup_data(shardgroupid, colocationid, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
Shardgroup *shardgroup = NULL;
bool firstShardGroup = true;
foreach_ptr(shardgroup, shardgroups)
{
if (!firstShardGroup)
{
appendStringInfoString(&command, ", ");
}
firstShardGroup = false;
appendStringInfo(&command, "(%ld, %d, %s, %s)",
shardgroup->shardgroupId,
shardgroup->colocationId,
TextToSQLLiteral(IntegerToText(
DatumGetInt32(shardgroup->minShardValue))),
TextToSQLLiteral(IntegerToText(
DatumGetInt32(shardgroup->maxShardValue))));
}
appendStringInfo(&command, ") ");
appendStringInfo(&command,
"SELECT citus_internal_add_shardgroup_metadata(shardgroupid, "
"colocationid, shardminvalue, shardmaxvalue) FROM shardgroup_data;");
return command.data;
}
/*
* ShardListDeleteCommand generates a command list that can be executed to delete
* shard and shard placement metadata for the given shard.

View File

@ -68,7 +68,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
}
static char *
char *
TextToSQLLiteral(text *value)
{
if (!value)

View File

@ -1207,7 +1207,11 @@ InsertSplitChildrenShardgroupMetadata(List *newShardgroups)
IntegerToText(DatumGetInt32(shardgroup->maxShardValue)));
}
/* TODO sync new shardgroups to workers */
if (/* TODO check if shardgroup needs to propagate */ true)
{
const char *command = ShardgroupInsertCommand(newShardgroups);
SendCommandToWorkersWithMetadata(command);
}
}

View File

@ -257,6 +257,7 @@ extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern char * TextToSQLLiteral(text *value);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
bool useExclusiveConnections);

View File

@ -77,6 +77,7 @@ extern char * DistributionDeleteMetadataCommand(Oid relationId);
extern char * TableOwnerResetCommand(Oid distributedRelationId);
extern char * NodeListInsertCommand(List *workerNodeList);
extern List * ShardListInsertCommand(List *shardIntervalList);
extern const char * ShardgroupInsertCommand(List *shardgroups);
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
extern char * NodeDeleteCommand(uint32 nodeId);
extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);