From 35d65893aca00f89a0bf1e04c4d3b59427295395 Mon Sep 17 00:00:00 2001 From: Shabnam Khan Date: Mon, 19 Jun 2023 14:16:06 +0530 Subject: [PATCH] Added comments , removed values only used for testing --- .../distributed/operations/auto_shard_split.c | 217 ++++++++++-------- .../citus_auto_shard_split_start/12.0-1.sql | 2 +- .../citus_auto_shard_split_start/latest.sql | 4 +- 3 files changed, 130 insertions(+), 93 deletions(-) diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index ef96f39d2..c92f4a958 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -37,14 +37,18 @@ typedef struct ShardInfoData char *distributionColumn; char *datatype; char *shardname; - Oid tableId; - Oid distributionColumnId; - + Oid tableId; + Oid distributionColumnId; }ShardInfoData; typedef ShardInfoData *ShardInfo; -void ErrorOnConcurrentOperation(){ +/* + * It throws an error if a concurrent automatic shard split or Rebalance operation is happening. + */ +void +ErrorOnConcurrentOperation() +{ int64 jobId = 0; if (HasNonTerminalJobOfType("rebalance", &jobId)) { @@ -57,39 +61,49 @@ void ErrorOnConcurrentOperation(){ if (HasNonTerminalJobOfType("Automatic Shard Split", &jobId)) { ereport(ERROR, ( - errmsg("An automatic shard split is already running as job %ld", jobId), - errdetail("An automatic shard split was already scheduled as background job"))); + errmsg("An automatic shard split is already running as job %ld", + jobId), + errdetail( + "An automatic shard split was already scheduled as background job"))); } } + +/* + * For a given SplitPoints , it creates the SQL query for the shard Splitting + */ StringInfo -GetShardSplitQuery(ShardInfo shardinfo, List* SplitPoints) +GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints) { StringInfo splitQuery = makeStringInfo(); - int64 length = list_length(SplitPoints); - appendStringInfo(splitQuery,"SELECT citus_split_shard_by_split_points(%ld, ARRAY[",shardinfo->shardid); + int64 length = list_length(SplitPoints); + appendStringInfo(splitQuery, "SELECT citus_split_shard_by_split_points(%ld, ARRAY[", + shardinfo->shardid); - for (int i =0; i < length-1; i++) - { - appendStringInfo(splitQuery,"'%ld',",DatumGetInt64(list_nth(SplitPoints,i))); + for (int i = 0; i < length - 1; i++) + { + appendStringInfo(splitQuery, "'%ld',", DatumGetInt64(list_nth(SplitPoints, i))); + } + appendStringInfo(splitQuery, "'%ld'], ARRAY[", DatumGetInt64(list_nth(SplitPoints, + length - 1))); + + for (int i = 0; i < length; i++) + { + appendStringInfo(splitQuery, "%ld,", shardinfo->nodeid); + } + appendStringInfo(splitQuery, "%ld], 'block_writes')", shardinfo->nodeid); - } - appendStringInfo(splitQuery,"'%ld'], ARRAY[",DatumGetInt64(list_nth(SplitPoints,length-1))); - - for (int i =0; i < length; i++) - { - appendStringInfo(splitQuery,"%ld,",shardinfo->nodeid); - } - appendStringInfo(splitQuery,"%ld], 'block_writes')",shardinfo->nodeid); - return splitQuery; } -void -ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List* SplitPoints) -{ +/* + * It creates a background job for citus_split_shard_by_split_points and executes it in background. + */ +void +ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints) +{ StringInfo splitQuery = makeStringInfo(); splitQuery = GetShardSplitQuery(shardinfo, SplitPoints); ereport(LOG, (errmsg(splitQuery->data))); @@ -100,10 +114,12 @@ ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List* SplitPoints) BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0, NULL, 1, nodesInvolved); } + + /* -* It executes a query to find the average hash value in a shard considering rows with a limit of 10GB . -* -*/ + * It executes a query to find the average hash value in a shard considering rows with a limit of 10GB . + * If there exists a hash value it is returned otherwise shardminvalue-1 is returned. + */ int64 ExecuteAvgHashQuery(ShardInfo shardinfo) { @@ -111,7 +127,8 @@ ExecuteAvgHashQuery(ShardInfo shardinfo) appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)" " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))" " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", - shardinfo->distributionColumn, shardinfo->tablename,quote_literal_cstr(shardinfo->tablename), + shardinfo->distributionColumn, shardinfo->tablename, + quote_literal_cstr(shardinfo->tablename), shardinfo->distributionColumn, shardinfo->shardminvalue, shardinfo->distributionColumn, shardinfo->shardmaxvalue ); @@ -139,10 +156,12 @@ ExecuteAvgHashQuery(ShardInfo shardinfo) return shardinfo->shardminvalue - 1; } } + + /* * This function executes a query and then decides whether a shard is subjected for isolation or average hash 2 way split. * If a tenant is found splitpoints for isolation is returned otherwise average hash value is returned. -*/ + */ List * FindShardSplitPoints(ShardInfo shardinfo) { @@ -152,50 +171,54 @@ FindShardSplitPoints(ShardInfo shardinfo) " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" " WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)" " WHERE result <> '' AND shardid = %ld;", - shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), shardinfo->datatype, + shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), + shardinfo->datatype, quote_literal_cstr(shardinfo->shardname), - quote_literal_cstr(shardinfo->distributionColumn), shardinfo->shardid); + quote_literal_cstr(shardinfo->distributionColumn), + shardinfo->shardid); ereport(LOG, errmsg("%s", CommonValueQuery->data)); - List *SplitPoints = NULL; - MemoryContext originalContext = CurrentMemoryContext; + List *SplitPoints = NULL; + MemoryContext originalContext = CurrentMemoryContext; SPI_connect(); SPI_exec(CommonValueQuery->data, 0); - MemoryContext spiContext = CurrentMemoryContext; + MemoryContext spiContext = CurrentMemoryContext; int64 rowCount = SPI_processed; - int64 average,hashedValue; + int64 average, hashedValue; ereport(LOG, errmsg("%ld", rowCount)); if (rowCount > 0) { SPITupleTable *tupletable = SPI_tuptable; - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId); for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { HeapTuple tuple = tupletable->vals[rowIndex]; char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2); ereport(LOG, errmsg("%s", commonValue)); - Datum tenantIdDatum = StringToDatum(commonValue, shardinfo->distributionColumnId); - Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction, - cacheEntry->partitionColumn->varcollid, - tenantIdDatum); - hashedValue = DatumGetInt32(hashedValueDatum); - ereport(LOG,errmsg("%ld",hashedValue)); - MemoryContextSwitchTo(originalContext); - if (hashedValue == shardinfo->shardminvalue) - { - SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue)); - } - else if (hashedValue == shardinfo->shardmaxvalue) - { - SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue-1)); - } - else - { - SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue-1)); - SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue));; - } - MemoryContextSwitchTo(spiContext); + Datum tenantIdDatum = StringToDatum(commonValue, + shardinfo->distributionColumnId); + Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction, + cacheEntry->partitionColumn-> + varcollid, + tenantIdDatum); + hashedValue = DatumGetInt32(hashedValueDatum); + ereport(LOG, errmsg("%ld", hashedValue)); + MemoryContextSwitchTo(originalContext); + if (hashedValue == shardinfo->shardminvalue) + { + SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue)); + } + else if (hashedValue == shardinfo->shardmaxvalue) + { + SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1)); + } + else + { + SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1)); + SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue)); + } + MemoryContextSwitchTo(spiContext); } SPI_freetuptable(tupletable); } @@ -203,41 +226,52 @@ FindShardSplitPoints(ShardInfo shardinfo) { average = ExecuteAvgHashQuery(shardinfo); ereport(LOG, errmsg("%ld", average)); - } SPI_finish(); - if(rowCount>0){ - list_sort(SplitPoints, list_int_cmp); - }else{ - if(shardinfo->shardminvalue<=average){ - SplitPoints = lappend(SplitPoints,Int64GetDatum(average)); - } - } - - return SplitPoints; - + if (rowCount > 0) + { + list_sort(SplitPoints, list_int_cmp); + } + else + { + if (shardinfo->shardminvalue <= average) + { + SplitPoints = lappend(SplitPoints, Int64GetDatum(average)); + } + } + return SplitPoints; } + + /* * This function calculates the split points of the shard to split and then executes the background job. -*/ + */ void -ScheduleShardSplit(ShardInfo shardinfo){ - - List* SplitPoints = FindShardSplitPoints(shardinfo); - if(list_length(SplitPoints)>0){ - // int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); - ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints)->data)); - }else{ - ereport(LOG,errmsg("No Splitpoints for shard split")); - } - - +ScheduleShardSplit(ShardInfo shardinfo) +{ + List *SplitPoints = FindShardSplitPoints(shardinfo); + if (list_length(SplitPoints) > 0) + { + int64 jobId = CreateBackgroundJob("Automatic Shard Split", + "Split using SplitPoints List"); + ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints); + } + else + { + ereport(LOG, errmsg("No Splitpoints for shard split")); + } } +/* + * citus_auto_shard_split_start finds shards whose colocation group has total size greater than threshold + * and from that extracts the shard with the maximum size . It isolates the shards if the shard has common + * tenant values otherwise it 2 way splits it on the basis of average hash value. + * + */ Datum citus_auto_shard_split_start(PG_FUNCTION_ARGS) { @@ -256,7 +290,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )" " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", - 0 + MaxShardSize ); ereport(LOG, errmsg("%s", query->data)); @@ -298,21 +332,24 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7); StringInfo shardnameQuery = makeStringInfo(); - appendStringInfo(shardnameQuery,"%s_%ld",SPI_getvalue(tuple,tupletable->tupdesc,9),shardinfo.shardid); + appendStringInfo(shardnameQuery, "%s_%ld", SPI_getvalue(tuple, + tupletable->tupdesc, 9), + shardinfo.shardid); shardinfo.shardname = shardnameQuery->data; Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull); shardinfo.tableId = DatumGetObjectId(tableIdDatum); - shardinfo.distributionColumnId = ColumnTypeIdForRelationColumnName(shardinfo.tableId, - shardinfo. - distributionColumn); + shardinfo.distributionColumnId = ColumnTypeIdForRelationColumnName( + shardinfo.tableId, + shardinfo. + distributionColumn); shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); - char * shardSplitMode; - // Oid shardTransferModeOid = PG_GETARG_OID(0); - // Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); - // char *enumLabel = DatumGetCString(enumLabelDatum); - // ereport(LOG,errmsg("%s",enumLabel)); + char *shardSplitMode; + /* Oid shardTransferModeOid = PG_GETARG_OID(0); */ + /* Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); */ + /* char *enumLabel = DatumGetCString(enumLabelDatum); */ + /* ereport(LOG,errmsg("%s",enumLabel)); */ ScheduleShardSplit(&shardinfo); ereport(LOG, (errmsg( diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql index ba526d9df..98dc6e01a 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/12.0-1.sql @@ -10,4 +10,4 @@ COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start() IS 'automatically split the necessary shards in the cluster in the background'; -GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC; \ No newline at end of file +GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql index 74e82ac84..0750c05ea 100644 --- a/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_auto_shard_split_start/latest.sql @@ -1,5 +1,5 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( - + ) RETURNS VOID @@ -11,4 +11,4 @@ COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start() IS 'automatically split the necessary shards in the cluster in the background'; -GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC; \ No newline at end of file +GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC;