diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index 0a090be67..41668801c 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -13,6 +13,7 @@ #include "utils/lsyscache.h" #include "distributed/listutils.h" #include "distributed/metadata_utility.h" +#include "distributed/background_jobs.h" PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); @@ -44,9 +45,9 @@ StringInfo GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode); void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, char *shardSplitMode); -int64 ExecuteAvgHashQuery(ShardInfo shardinfo); +int64 ExecuteAverageHashQuery(ShardInfo shardinfo); List * FindShardSplitPoints(ShardInfo shardinfo); -void ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode); +int64 ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode , int64 jobId); /* * It throws an error if a concurrent automatic shard split or Rebalance operation is happening. @@ -87,7 +88,7 @@ GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode) shardinfo->shardId); int32 splitpoint = 0; - int index = 0; + uint64 index = 0; foreach_int(splitpoint, splitPoints) { appendStringInfo(splitQuery, "'%d'", splitpoint); @@ -136,7 +137,7 @@ ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, * If there exists a hash value it is returned otherwise shardminvalue-1 is returned. */ int64 -ExecuteAvgHashQuery(ShardInfo shardinfo) +ExecuteAverageHashQuery(ShardInfo shardinfo) { StringInfo AvgHashQuery = makeStringInfo(); uint64 tableSize = 0; @@ -150,7 +151,7 @@ ExecuteAvgHashQuery(ShardInfo shardinfo) shardinfo->distributionColumn, shardinfo->shardMinValue, shardinfo->distributionColumn, shardinfo->shardMaxValue ); - ereport(LOG, errmsg("%s", AvgHashQuery->data)); + ereport(DEBUG4, errmsg("%s", AvgHashQuery->data)); SPI_connect(); SPI_exec(AvgHashQuery->data, 0); SPITupleTable *tupletable = SPI_tuptable; @@ -193,7 +194,7 @@ FindShardSplitPoints(ShardInfo shardinfo) appendStringInfo(CommonValueQuery, "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" - " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %f $$)" + " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %lf $$)" " WHERE result <> '' AND shardid = %ld;", shardinfo->dataType, quote_literal_cstr(shardinfo->tableName), shardinfo->dataType, @@ -204,7 +205,7 @@ FindShardSplitPoints(ShardInfo shardinfo) TenantFrequency, shardinfo->shardId); - ereport(LOG, errmsg("%s", CommonValueQuery->data)); + ereport(DEBUG4, errmsg("%s", CommonValueQuery->data)); List *splitPoints = NULL; /* Saving the current memory context*/ @@ -220,17 +221,21 @@ FindShardSplitPoints(ShardInfo shardinfo) int64 average; int32 hashedValue; - ereport(LOG, errmsg("%ld", rowCount)); + ereport(DEBUG4, errmsg("%ld", rowCount)); if (rowCount > 0) { + /*For every common tenant value split point is calculated on the basis of + * the hashed value and the unique split points are appended to the list + * and the resulting is then sorted and returned. + */ SPITupleTable *tupletable = SPI_tuptable; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId); for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { HeapTuple tuple = tupletable->vals[rowIndex]; char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2); - ereport(LOG, errmsg("%s", commonValue)); + ereport(DEBUG4, errmsg("%s", commonValue)); Datum tenantIdDatum = StringToDatum(commonValue, shardinfo->distributionColumnId); Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction, @@ -238,7 +243,7 @@ FindShardSplitPoints(ShardInfo shardinfo) varcollid, tenantIdDatum); hashedValue = DatumGetInt32(hashedValueDatum); - ereport(LOG, errmsg("%d", hashedValue)); + ereport(DEBUG4, errmsg("%d", hashedValue)); /*Switching the memory context to store the unique SplitPoints in a list*/ @@ -263,8 +268,8 @@ FindShardSplitPoints(ShardInfo shardinfo) } else { - average = ExecuteAvgHashQuery(shardinfo); - ereport(LOG, errmsg("%ld", average)); + average = ExecuteAverageHashQuery(shardinfo); + ereport(DEBUG4, errmsg("%ld", average)); MemoryContextSwitchTo(originalContext); if (shardinfo->shardMinValue <= average) { @@ -281,21 +286,21 @@ FindShardSplitPoints(ShardInfo shardinfo) * This function calculates the split points of the shard to * split and then executes the background job for the shard split. */ -void -ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) +int64 +ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode , int64 jobId) { List *splitPoints = FindShardSplitPoints(shardinfo); if (list_length(splitPoints) > 0) { - int64 jobId = CreateBackgroundJob("Automatic Shard Split", - "Split using SplitPoints List"); - ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints, + ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode)->data)); ExecuteSplitBackgroundJob(jobId, shardinfo, splitPoints, shardSplitMode); + return 1; } else { ereport(LOG, errmsg("No Splitpoints for shard split")); + return 0; } } @@ -325,14 +330,14 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )" " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", - MaxShardSize * 1024 + MaxShardSize*1024 ); - ereport(LOG, errmsg("%s", query->data)); + ereport(DEBUG4 ,errmsg("%s", query->data)); Oid shardTransferModeOid = PG_GETARG_OID(0); Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); char *shardSplitMode = DatumGetCString(enumLabelDatum); - ereport(LOG, errmsg("%s", shardSplitMode)); + ereport(DEBUG4, errmsg("%s", shardSplitMode)); if (SPI_connect() != SPI_OK_CONNECT) { elog(ERROR, "SPI_connect to the query failed"); @@ -345,6 +350,9 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) SPITupleTable *tupletable = SPI_tuptable; int rowCount = SPI_processed; bool isnull; + int64 jobId = CreateBackgroundJob("Automatic Shard Split", + "Split using SplitPoints List"); + int64 count = 0; for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { @@ -380,8 +388,8 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) distributionColumn); shardinfo.dataType = format_type_be(shardinfo.distributionColumnId); - ScheduleShardSplit(&shardinfo, shardSplitMode); - ereport(LOG, (errmsg( + count = count + ScheduleShardSplit(&shardinfo, shardSplitMode , jobId); + ereport(DEBUG4, (errmsg( "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", shardinfo.shardId, shardinfo.shardMinValue, shardinfo.shardMaxValue, @@ -390,6 +398,9 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) SPI_freetuptable(tupletable); SPI_finish(); + if(count==0){ + DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId)); + } - PG_RETURN_VOID(); + PG_RETURN_INT64(jobId); } diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql index 876aaa7bc..aba25f34d 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql @@ -2,7 +2,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( shard_transfer_mode citus.shard_transfer_mode default 'auto' ) - RETURNS VOID + RETURNS bigint AS 'MODULE_PATHNAME' diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql index 2a1aea49f..b300a2cc7 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql @@ -2,7 +2,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( shard_transfer_mode citus.shard_transfer_mode default 'auto' ) - RETURNS VOID + RETURNS bigint AS 'MODULE_PATHNAME'