From d2c73bcbbfadda4796309e5d206e0cb80f122618 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Thu, 24 Nov 2022 16:11:30 +0100 Subject: [PATCH] implement creation of new distributed tables --- .../distributed/metadata/metadata_sync.c | 42 +++++++++++++++++++ .../distributed/operations/create_shards.c | 37 ++++++++++++++++ .../distributed/sql/citus--11.2-1--11.3-1.sql | 2 + .../11.3-1.sql | 8 ++++ .../latest.sql | 8 ++++ 5 files changed, 97 insertions(+) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 04a4f63e6..f9f1fd609 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -160,6 +160,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); +PG_FUNCTION_INFO_V1(citus_internal_add_shardgroup_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); @@ -3289,6 +3290,47 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) } +/* + * citus_internal_add_shardgroup_metadata is an internal UDF to + * add a row to pg_dist_shardgroup. + */ +Datum +citus_internal_add_shardgroup_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + PG_ENSURE_ARGNOTNULL(0, "shard group id"); + int64 shardgroupId = PG_GETARG_INT64(0); + + PG_ENSURE_ARGNOTNULL(1, "colocation id"); + int32 colocationId = PG_GETARG_INT32(1); + + text *shardMinValue = NULL; + if (!PG_ARGISNULL(2)) + { + shardMinValue = PG_GETARG_TEXT_P(2); + } + + text *shardMaxValue = NULL; + if (!PG_ARGISNULL(3)) + { + shardMaxValue = PG_GETARG_TEXT_P(3); + } + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + + /* TODO add some sanity checks */ + } + + InsertShardGroupRow(shardgroupId, colocationId, shardMinValue, shardMaxValue); + + PG_RETURN_VOID(); +} + + /* * EnsureCoordinatorInitiatedOperation is a helper function which ensures that * the execution is initiated by the coordinator on a worker node. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index ec9e41cbf..18978c671 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -68,6 +68,17 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } +static char * +TextToSQLLiteral(text *value) +{ + if (!value) + { + return "NULL"; + } + return quote_literal_cstr(text_to_cstring(value)); +} + + /* * CreateShardsWithRoundRobinPolicy creates empty shards for the given table * based on the specified number of initial shards. The function first updates @@ -168,6 +179,13 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + StringInfoData shardgroupQuery = { 0 }; + initStringInfo(&shardgroupQuery); + + appendStringInfoString(&shardgroupQuery, + "WITH shardgroup_data(shardgroupid, colocationid, " + "shardminvalue, shardmaxvalue) AS (VALUES "); + for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; @@ -190,8 +208,19 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); + if (shardIndex > 0) + { + appendStringInfoString(&shardgroupQuery, ", "); + } + InsertShardGroupRow(shardGroupId, cacheEntry->colocationId, minHashTokenText, maxHashTokenText); + appendStringInfo(&shardgroupQuery, "(%ld, %d, %s, %s)", + shardGroupId, + cacheEntry->colocationId, + TextToSQLLiteral(minHashTokenText), + TextToSQLLiteral(maxHashTokenText)); + InsertShardRow(distributedTableId, shardId, shardStorageType, minHashTokenText, maxHashTokenText, &shardGroupId); @@ -206,6 +235,14 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, currentInsertedShardPlacements); } + /* create the shardgroups on workers with metadata */ + appendStringInfoString(&shardgroupQuery, ") "); + appendStringInfoString(&shardgroupQuery, + "SELECT pg_catalog.citus_internal_add_shardgroup_metadata(" + "shardgroupid, colocationid, shardminvalue, shardmaxvalue)" + "FROM shardgroup_data;"); + SendCommandToWorkersWithMetadata(shardgroupQuery.data); + CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, useExclusiveConnections, colocatedShard); } diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index b02f0d65c..75f75efd1 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -59,3 +59,5 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata( storage_type "char", shard_min_value text, shard_max_value text ); + +#include "udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql new file mode 100644 index 000000000..af1602166 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata( + shardgroupid bigint, colocationid integer, + shardminvalue text, shardmaxvalue text) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text) IS + 'Inserts into pg_dist_shardgroup with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql new file mode 100644 index 000000000..af1602166 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_shardgroup_metadata/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata( + shardgroupid bigint, colocationid integer, + shardminvalue text, shardmaxvalue text) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text) IS + 'Inserts into pg_dist_shardgroup with user checks';