implement creation of new distributed tables

backup/feature/shardgroup
Nils Dijk 2022-11-24 16:11:30 +01:00
parent dfb0b31a84
commit d2c73bcbbf
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
5 changed files with 97 additions and 0 deletions

View File

@ -160,6 +160,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
@ -3289,6 +3290,47 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
} }
/*
* citus_internal_add_shardgroup_metadata is an internal UDF to
* add a row to pg_dist_shardgroup.
*/
Datum
citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_ENSURE_ARGNOTNULL(0, "shard group id");
int64 shardgroupId = PG_GETARG_INT64(0);
PG_ENSURE_ARGNOTNULL(1, "colocation id");
int32 colocationId = PG_GETARG_INT32(1);
text *shardMinValue = NULL;
if (!PG_ARGISNULL(2))
{
shardMinValue = PG_GETARG_TEXT_P(2);
}
text *shardMaxValue = NULL;
if (!PG_ARGISNULL(3))
{
shardMaxValue = PG_GETARG_TEXT_P(3);
}
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
/* TODO add some sanity checks */
}
InsertShardGroupRow(shardgroupId, colocationId, shardMinValue, shardMaxValue);
PG_RETURN_VOID();
}
/* /*
* EnsureCoordinatorInitiatedOperation is a helper function which ensures that * EnsureCoordinatorInitiatedOperation is a helper function which ensures that
* the execution is initiated by the coordinator on a worker node. * the execution is initiated by the coordinator on a worker node.

View File

@ -68,6 +68,17 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
} }
static char *
TextToSQLLiteral(text *value)
{
if (!value)
{
return "NULL";
}
return quote_literal_cstr(text_to_cstring(value));
}
/* /*
* CreateShardsWithRoundRobinPolicy creates empty shards for the given table * CreateShardsWithRoundRobinPolicy creates empty shards for the given table
* based on the specified number of initial shards. The function first updates * based on the specified number of initial shards. The function first updates
@ -168,6 +179,13 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId); char shardStorageType = ShardStorageType(distributedTableId);
StringInfoData shardgroupQuery = { 0 };
initStringInfo(&shardgroupQuery);
appendStringInfoString(&shardgroupQuery,
"WITH shardgroup_data(shardgroupid, colocationid, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{ {
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
@ -190,8 +208,19 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken); text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken);
if (shardIndex > 0)
{
appendStringInfoString(&shardgroupQuery, ", ");
}
InsertShardGroupRow(shardGroupId, cacheEntry->colocationId, InsertShardGroupRow(shardGroupId, cacheEntry->colocationId,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
appendStringInfo(&shardgroupQuery, "(%ld, %d, %s, %s)",
shardGroupId,
cacheEntry->colocationId,
TextToSQLLiteral(minHashTokenText),
TextToSQLLiteral(maxHashTokenText));
InsertShardRow(distributedTableId, shardId, shardStorageType, InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText, &shardGroupId); minHashTokenText, maxHashTokenText, &shardGroupId);
@ -206,6 +235,14 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
currentInsertedShardPlacements); currentInsertedShardPlacements);
} }
/* create the shardgroups on workers with metadata */
appendStringInfoString(&shardgroupQuery, ") ");
appendStringInfoString(&shardgroupQuery,
"SELECT pg_catalog.citus_internal_add_shardgroup_metadata("
"shardgroupid, colocationid, shardminvalue, shardmaxvalue)"
"FROM shardgroup_data;");
SendCommandToWorkersWithMetadata(shardgroupQuery.data);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnections, colocatedShard); useExclusiveConnections, colocatedShard);
} }

View File

@ -59,3 +59,5 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(
storage_type "char", shard_min_value text, storage_type "char", shard_min_value text,
shard_max_value text shard_max_value text
); );
#include "udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql"

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(
shardgroupid bigint, colocationid integer,
shardminvalue text, shardmaxvalue text)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text) IS
'Inserts into pg_dist_shardgroup with user checks';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(
shardgroupid bigint, colocationid integer,
shardminvalue text, shardmaxvalue text)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text) IS
'Inserts into pg_dist_shardgroup with user checks';