From aef9865b9b0f8b8f7fce600ac3369b26fcf64c27 Mon Sep 17 00:00:00 2001 From: Shabnam Khan Date: Wed, 28 Jun 2023 10:14:22 +0530 Subject: [PATCH] Addressed all review comments --- .../distributed/operations/auto_shard_split.c | 56 +++++++++---------- src/backend/distributed/shared_library_init.c | 6 +- .../citus_auto_shard_split_start/12.0-1.sql | 17 +----- .../citus_auto_shard_split_start/latest.sql | 17 +----- .../distributed/utils/background_jobs.c | 18 +++--- src/include/distributed/background_jobs.h | 1 + 6 files changed, 48 insertions(+), 67 deletions(-) diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index 406f19606..00ee371d5 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -37,12 +37,12 @@ typedef struct ShardInfoData typedef ShardInfoData *ShardInfo; void ErrorOnConcurrentOperation(void); -StringInfo GetShardSplitQuery(ShardInfo shardinfo, Datum datum, +StringInfo GetShardSplitQuery(ShardInfo shardinfo, Datum splitPointArrayDatum, char *shardSplitMode); -void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum datum, +void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum splitPointArrayDatum, char *shardSplitMode); List * FindShardSplitPoints(int64 shardId); -int64 ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId); +bool ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId); /* * It throws an error if a concurrent automatic shard split or Rebalance operation is happening. @@ -74,11 +74,11 @@ ErrorOnConcurrentOperation() * For a given SplitPoints , it creates the SQL query for the Shard Splitting */ StringInfo -GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode) +GetShardSplitQuery(ShardInfo shardinfo, Datum splitPointArrayDatum, char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - ArrayType *array = DatumGetArrayTypeP(datum); + ArrayType *array = DatumGetArrayTypeP(splitPointArrayDatum); Datum *values; int nelems; deconstruct_array(array, @@ -98,7 +98,7 @@ GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode) } } -/*All the shards after the split will be belonging to the same node */ + /*All the shards after the split will be belonging to the same node */ appendStringInfo(splitQuery, "], ARRAY["); for (int i = 0; i < nelems; i++) @@ -116,11 +116,11 @@ GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode) * It creates a background job for citus_split_shard_by_split_points and executes it in background. */ void -ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum datum, +ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum splitPointArrayDatum, char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - splitQuery = GetShardSplitQuery(shardinfo, datum, shardSplitMode); + splitQuery = GetShardSplitQuery(shardinfo, splitPointArrayDatum, shardSplitMode); /* ereport(LOG, (errmsg(splitQuery->data))); */ int32 nodesInvolved[] = { shardinfo->nodeId }; @@ -138,11 +138,12 @@ Datum citus_find_shard_split_points(PG_FUNCTION_ARGS) { int64 shardId = PG_GETARG_INT64(0); - int64 shardGroupSize = PG_GETARG_INT64(1); + int64 shardSize = PG_GETARG_INT64(1); + int64 shardGroupSize = PG_GETARG_INT64(2); ereport(DEBUG4, errmsg("%ld", shardGroupSize)); /*Filtering Shards with total GroupSize greater than MaxShardSize*1024 i.e Size based Policy*/ - if (shardGroupSize < MaxShardSize * 1024) + if (shardGroupSize < MaxShardSize*1024) { PG_RETURN_NULL(); } @@ -161,6 +162,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS) int64 shardMinValue = shardrange->minValue; int64 shardMaxValue = shardrange->maxValue; char *tableName = generate_qualified_relation_name(tableId); + char *schemaName = get_namespace_name(get_rel_namespace(tableId)); StringInfo CommonValueQuery = makeStringInfo(); /* @@ -177,8 +179,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS) dataType, quote_literal_cstr(shardName), quote_literal_cstr(distributionColumnName), - quote_literal_cstr(get_namespace_name(get_rel_namespace( - tableId))), + quote_literal_cstr(schemaName), TenantFrequency, shardId); @@ -246,10 +247,6 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS) else { StringInfo AvgHashQuery = makeStringInfo(); - uint64 tableSize = 0; - bool check = DistributedTableSize(tableId, TOTAL_RELATION_SIZE, true, - &tableSize); - /* * It executes a query to find the average hash value in a shard considering rows with a limit of 10GB . * If there exists a hash value it is returned otherwise NULL is returned. @@ -258,7 +255,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS) " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/%lu))" " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", distributionColumnName, tableName, - tableSize, + shardSize, distributionColumnName, shardMinValue, distributionColumnName, shardMaxValue ); @@ -310,35 +307,36 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS) * This function calculates the split points of the shard to * split and then executes the background job for the shard split. */ -int64 +bool ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId) { SPI_connect(); StringInfo findSplitPointsQuery = makeStringInfo(); appendStringInfo(findSplitPointsQuery, - "SELECT citus_find_shard_split_points(%ld , %ld)", + "SELECT citus_find_shard_split_points(%ld , %ld , %ld)", shardinfo->shardId, + shardinfo->shardSize, shardinfo->shardGroupSize); SPI_exec(findSplitPointsQuery->data, 0); SPITupleTable *tupletable = SPI_tuptable; HeapTuple tuple = tupletable->vals[0]; bool isnull; - Datum resultDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); + Datum splitPointArrayDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); if (!isnull) { - ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, resultDatum, + ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, splitPointArrayDatum, shardSplitMode)->data)); - ExecuteSplitBackgroundJob(jobId, shardinfo, resultDatum, shardSplitMode); + ExecuteSplitBackgroundJob(jobId, shardinfo, splitPointArrayDatum, shardSplitMode); SPI_finish(); - return 1; + return true; } else { ereport(LOG, errmsg("No Splitpoints for shard split")); SPI_finish(); - return 0; + return false; } } @@ -389,7 +387,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) bool isnull; int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); - int64 count = 0; + int64 scheduledSplitCount = 0; for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { @@ -408,14 +406,16 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) char *shardGroupSizeValue = SPI_getvalue(tuple, tupletable->tupdesc, 6); shardinfo.shardGroupSize = strtoi64(shardGroupSizeValue, NULL, 10); - count = count + ScheduleShardSplit(&shardinfo, shardSplitMode, jobId); + if(ScheduleShardSplit(&shardinfo, shardSplitMode, jobId)){ + scheduledSplitCount++; + } } SPI_freetuptable(tupletable); SPI_finish(); - if (count == 0) + if (scheduledSplitCount == 0) { - DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId)); + CancelJob(jobId); } PG_RETURN_INT64(jobId); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c77a623d7..07b2aeaff 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2262,8 +2262,8 @@ RegisterCitusConfigVariables(void) NULL, NULL, NULL); DefineCustomIntVariable( - "citus.max_shard_size", - gettext_noop("Sets the max size of a Shard"), + "citus.split_shard_group_size_threshold", + gettext_noop("Sets the max colocation group size of a Shard"), NULL, &MaxShardSize, 102400, 100, INT32_MAX, @@ -2272,7 +2272,7 @@ RegisterCitusConfigVariables(void) NULL, NULL, NULL); DefineCustomRealVariable( - "citus.tenant_freq", + "citus.tenant_isolation_frequency_threshold", gettext_noop("Sets the threshold tenant frequency for a Shard"), NULL, &TenantFrequency, diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql index 0f3a7bb26..2c548eec8 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql @@ -1,32 +1,21 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( shard_transfer_mode citus.shard_transfer_mode default 'auto' - ) RETURNS bigint - AS 'MODULE_PATHNAME' - LANGUAGE C VOLATILE; - COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) - IS 'automatically split the necessary shards in the cluster in the background'; - GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC; CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points( shard_id bigint, + shard_size bigint, shard_group_size bigint - ) RETURNS SETOF bigint[] - AS 'MODULE_PATHNAME' - LANGUAGE C VOLATILE; - -COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) - +COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) IS 'creates split points for shards'; - -GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql index 0f3a7bb26..2c548eec8 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql @@ -1,32 +1,21 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( shard_transfer_mode citus.shard_transfer_mode default 'auto' - ) RETURNS bigint - AS 'MODULE_PATHNAME' - LANGUAGE C VOLATILE; - COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) - IS 'automatically split the necessary shards in the cluster in the background'; - GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC; CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points( shard_id bigint, + shard_size bigint, shard_group_size bigint - ) RETURNS SETOF bigint[] - AS 'MODULE_PATHNAME' - LANGUAGE C VOLATILE; - -COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) - +COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) IS 'creates split points for shards'; - -GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 84ef4229f..712003403 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -145,14 +145,11 @@ PG_FUNCTION_INFO_V1(citus_task_wait); * We apply the same policy checks as pg_cancel_backend to check if a user can cancel a * job. */ -Datum -citus_job_cancel(PG_FUNCTION_ARGS) -{ + +void +CancelJob(int64 jobid){ CheckCitusVersion(ERROR); EnsureCoordinator(); - - int64 jobid = PG_GETARG_INT64(0); - /* Cancel all tasks that were scheduled before */ List *pids = CancelTasksForJob(jobid); @@ -170,10 +167,15 @@ citus_job_cancel(PG_FUNCTION_ARGS) } UpdateBackgroundJob(jobid); - - PG_RETURN_VOID(); } +Datum +citus_job_cancel(PG_FUNCTION_ARGS) +{ + int64 jobid = PG_GETARG_INT64(0); + CancelJob(jobid); + PG_RETURN_VOID(); +} /* * pg_catalog.citus_job_wait(jobid bigint, diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index 35745c014..3e88bdc22 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -111,5 +111,6 @@ extern Datum citus_task_wait(PG_FUNCTION_ARGS); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task); +extern void CancelJob(int64 jobid); #endif /*CITUS_BACKGROUND_JOBS_H */