From 4a671f04b3fb2bbe5327d0e9337524d1cb5a1bd2 Mon Sep 17 00:00:00 2001 From: Shabnam Khan Date: Wed, 21 Jun 2023 12:55:27 +0530 Subject: [PATCH] Added tenant frequency as GUC and addressed some more review comments --- .../distributed/operations/auto_shard_split.c | 181 +++++++++--------- src/backend/distributed/shared_library_init.c | 10 + .../distributed/coordinator_protocol.h | 3 +- 3 files changed, 102 insertions(+), 92 deletions(-) diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index 4a9212ca5..a4cc5dd6d 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -11,11 +11,13 @@ #include "utils/builtins.h" #include "distributed/shard_split.h" #include "utils/lsyscache.h" +#include "distributed/listutils.h" PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); -int MaxShardSize = 102400; +uint64 MaxShardSize = 102400; +double TenantFrequency = 0.2; /* * Struct to store all the information related to @@ -23,16 +25,15 @@ int MaxShardSize = 102400; */ typedef struct ShardInfoData { - int64 shardsize; - int64 shardminvalue; - int64 shardmaxvalue; - int64 shardid; - int32 nodeid; - char *tablename; + int64 shardSize; + int64 shardMinValue; + int64 shardMaxValue; + int64 shardId; + int32 nodeId; + char *tableName; char *distributionColumn; - char *datatype; - char *shardname; - char *schemaname; + char *dataType; + char *shardName; Oid tableId; Oid distributionColumnId; }ShardInfoData; @@ -69,27 +70,33 @@ ErrorOnConcurrentOperation() * For a given SplitPoints , it creates the SQL query for the Shard Splitting */ StringInfo -GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode) +GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - int64 length = list_length(SplitPoints); + int64 length = list_length(splitPoints); appendStringInfo(splitQuery, "SELECT citus_split_shard_by_split_points(%ld, ARRAY[", - shardinfo->shardid); + shardinfo->shardId); - for (int i = 0; i < length - 1; i++) + int32 splitpoint = 0; + int index = 0; + foreach_int(splitpoint, splitPoints) { - appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i)); + appendStringInfo(splitQuery, "'%d'", splitpoint); + + if (index < length - 1) + appendStringInfoString(splitQuery, ","); + + index++; } - appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints, - length - 1)); + + appendStringInfo(splitQuery, "], ARRAY["); for (int i = 0; i < length; i++) { - appendStringInfo(splitQuery, "%d,", shardinfo->nodeid); + appendStringInfo(splitQuery, "%d,", shardinfo->nodeId); } - appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr( - shardSplitMode)); + appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId , quote_literal_cstr(shardSplitMode)); return splitQuery; } @@ -99,16 +106,15 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode) * It creates a background job for citus_split_shard_by_split_points and executes it in background. */ void -ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints, +ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - splitQuery = GetShardSplitQuery(shardinfo, SplitPoints, shardSplitMode); - ereport(LOG, (errmsg(splitQuery->data))); - int32 nodesInvolved[1]; - nodesInvolved[0] = shardinfo->nodeid; + splitQuery = GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode); + // ereport(LOG, (errmsg(splitQuery->data))); + int32 nodesInvolved[] = { shardinfo->nodeId }; Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0, + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, splitQuery->data, 0, NULL, 1, nodesInvolved); } @@ -124,10 +130,10 @@ ExecuteAvgHashQuery(ShardInfo shardinfo) appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)" " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))" " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", - shardinfo->distributionColumn, shardinfo->tablename, - quote_literal_cstr(shardinfo->tablename), - shardinfo->distributionColumn, shardinfo->shardminvalue, - shardinfo->distributionColumn, shardinfo->shardmaxvalue + shardinfo->distributionColumn, shardinfo->tableName, + quote_literal_cstr(shardinfo->tableName), + shardinfo->distributionColumn, shardinfo->shardMinValue, + shardinfo->distributionColumn, shardinfo->shardMaxValue ); ereport(LOG, errmsg("%s", AvgHashQuery->data)); SPI_connect(); @@ -136,21 +142,21 @@ ExecuteAvgHashQuery(ShardInfo shardinfo) HeapTuple tuple = tupletable->vals[0]; bool isnull; Datum average = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); - int64 IsResultNull = 1; + int64 isResultNull = 1; if (!isnull) { - IsResultNull = 0; + isResultNull = 0; } SPI_freetuptable(tupletable); SPI_finish(); - if (IsResultNull == 0) + if (isResultNull == 0) { return DatumGetInt64(average); } else { - return shardinfo->shardminvalue - 1; + return shardinfo->shardMinValue - 1; } } @@ -172,24 +178,30 @@ FindShardSplitPoints(ShardInfo shardinfo) appendStringInfo(CommonValueQuery, "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" - " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > 0.3 $$)" + " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %f $$)" " WHERE result <> '' AND shardid = %ld;", - shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), - shardinfo->datatype, - quote_literal_cstr(shardinfo->shardname), + shardinfo->dataType, quote_literal_cstr(shardinfo->tableName), + shardinfo->dataType, + quote_literal_cstr(shardinfo->shardName), quote_literal_cstr(shardinfo->distributionColumn), - quote_literal_cstr(shardinfo->schemaname), - shardinfo->shardid); + quote_literal_cstr(get_namespace_name(get_rel_namespace(shardinfo->tableId))), + TenantFrequency, + shardinfo->shardId); ereport(LOG, errmsg("%s", CommonValueQuery->data)); - List *SplitPoints = NULL; + List *splitPoints = NULL; + /* Saving the current memory context*/ MemoryContext originalContext = CurrentMemoryContext; + SPI_connect(); SPI_exec(CommonValueQuery->data, 0); - MemoryContext spiContext = CurrentMemoryContext; + /*Saving the SPI memory context for switching*/ + MemoryContext spiContext = CurrentMemoryContext; + int64 rowCount = SPI_processed; int64 average; int32 hashedValue; + ereport(LOG, errmsg("%ld", rowCount)); if (rowCount > 0) @@ -213,44 +225,38 @@ FindShardSplitPoints(ShardInfo shardinfo) /*Switching the memory context to store the unique SplitPoints in a list*/ MemoryContextSwitchTo(originalContext); - if (hashedValue == shardinfo->shardminvalue) + if (hashedValue == shardinfo->shardMinValue) { - SplitPoints = list_append_unique_int(SplitPoints, hashedValue); + splitPoints = list_append_unique_int(splitPoints, hashedValue); } - else if (hashedValue == shardinfo->shardmaxvalue) + else if (hashedValue == shardinfo->shardMaxValue) { - SplitPoints = list_append_unique_int(SplitPoints, hashedValue - 1); + splitPoints = list_append_unique_int(splitPoints, hashedValue - 1); } else { - SplitPoints = list_append_unique_int(SplitPoints, hashedValue - 1); - SplitPoints = list_append_unique_int(SplitPoints, hashedValue); + splitPoints = list_append_unique_int(splitPoints, hashedValue - 1); + splitPoints = list_append_unique_int(splitPoints, hashedValue); } MemoryContextSwitchTo(spiContext); } SPI_freetuptable(tupletable); + list_sort(splitPoints, list_int_cmp); } else { average = ExecuteAvgHashQuery(shardinfo); ereport(LOG, errmsg("%ld", average)); + MemoryContextSwitchTo(originalContext); + if (shardinfo->shardMinValue <= average) + { + splitPoints = lappend_int(splitPoints, average); + } + } SPI_finish(); - - if (rowCount > 0) - { - list_sort(SplitPoints, list_int_cmp); - } - else - { - if (shardinfo->shardminvalue <= average) - { - SplitPoints = lappend_int(SplitPoints, average); - } - } - - return SplitPoints; + return splitPoints; } @@ -261,15 +267,15 @@ FindShardSplitPoints(ShardInfo shardinfo) void ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) { - List *SplitPoints = FindShardSplitPoints(shardinfo); - if (list_length(SplitPoints) > 0) + List *splitPoints = FindShardSplitPoints(shardinfo); + if (list_length(splitPoints) > 0) { /* ErrorOnConcurrentOperation(); */ int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); - ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, SplitPoints, + ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode)->data)); - ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints, shardSplitMode); + ExecuteSplitBackgroundJob(jobId, shardinfo, splitPoints, shardSplitMode); } else { @@ -288,21 +294,20 @@ Datum citus_auto_shard_split_start(PG_FUNCTION_ARGS) { StringInfo query = makeStringInfo(); - /* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize * are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node * and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/ appendStringInfo( query, - " SELECT cs.shardid,pd.shardminvalue,pd.shardmaxvalue,cs.shard_size,pn.nodeid,ct.distribution_column,ct.table_name,cs.shard_name,(SELECT relname FROM pg_class WHERE oid = (ct.table_name::regclass)::oid) " + " SELECT cs.shardid,pd.shardminvalue,pd.shardmaxvalue,cs.shard_size,pn.nodeid,ct.distribution_column,ct.table_name,cs.shard_name,(SELECT relname FROM pg_class WHERE oid = ct.table_name)" " FROM pg_catalog.pg_dist_shard pd JOIN pg_catalog.citus_shards cs ON pd.shardid = cs.shardid JOIN pg_catalog.pg_dist_node pn ON cs.nodename = pn.nodename AND cs.nodeport= pn.nodeport" " JOIN" " ( select shardid , max_size from (SELECT distinct first_value(shardid) OVER w as shardid, sum(shard_size) OVER (PARTITION BY colocation_id, shardminvalue) as total_sum, max(shard_size) OVER w as max_size" " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" - " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )" + " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )" " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", - MaxShardSize * 1024 + MaxShardSize*1024 ); ereport(LOG, errmsg("%s", query->data)); @@ -328,29 +333,26 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) ShardInfoData shardinfo; HeapTuple tuple = tupletable->vals[rowIndex]; - Datum shardId = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); - shardinfo.shardid = DatumGetInt64(shardId); + Datum shardIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); + shardinfo.shardId = DatumGetInt64(shardIdDatum); - Datum shardSize = SPI_getbinval(tuple, tupletable->tupdesc, 4, &isnull); - shardinfo.shardsize = DatumGetInt64(shardSize); + Datum shardSizeDatum = SPI_getbinval(tuple, tupletable->tupdesc, 4, &isnull); + shardinfo.shardSize = DatumGetInt64(shardSizeDatum); - Datum nodeId = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull); - shardinfo.nodeid = DatumGetInt32(nodeId); + Datum nodeIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull); + shardinfo.nodeId = DatumGetInt32(nodeIdDatum); char *shardMinVal = SPI_getvalue(tuple, tupletable->tupdesc, 2); - shardinfo.shardminvalue = strtoi64(shardMinVal, NULL, 10); + shardinfo.shardMinValue = strtoi64(shardMinVal, NULL, 10); char *shardMaxVal = SPI_getvalue(tuple, tupletable->tupdesc, 3); - shardinfo.shardmaxvalue = strtoi64(shardMaxVal, NULL, 10); + shardinfo.shardMaxValue = strtoi64(shardMaxVal, NULL, 10); shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6); - shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7); + shardinfo.tableName = SPI_getvalue(tuple, tupletable->tupdesc, 7); - StringInfo shardnameQuery = makeStringInfo(); - appendStringInfo(shardnameQuery, "%s_%ld", SPI_getvalue(tuple, - tupletable->tupdesc, 9), - shardinfo.shardid); - shardinfo.shardname = shardnameQuery->data; + shardinfo.shardName = SPI_getvalue(tuple,tupletable->tupdesc, 9); + AppendShardIdToName(&shardinfo.shardName,shardinfo.shardId); Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull); shardinfo.tableId = DatumGetObjectId(tableIdDatum); @@ -358,17 +360,14 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) shardinfo.tableId, shardinfo. distributionColumn); - shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); - - Oid schemaOid = get_rel_namespace(shardinfo.tableId); - shardinfo.schemaname = get_namespace_name(schemaOid); + shardinfo.dataType = format_type_be(shardinfo.distributionColumnId); ScheduleShardSplit(&shardinfo, shardSplitMode); ereport(LOG, (errmsg( "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", - shardinfo.shardid, shardinfo.shardminvalue, - shardinfo.shardmaxvalue, - shardinfo.shardsize, shardinfo.nodeid))); + shardinfo.shardId, shardinfo.shardMinValue, + shardinfo.shardMaxValue, + shardinfo.shardSize, shardinfo.nodeId))); } SPI_freetuptable(tupletable); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 9fb818530..42a3be8a3 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2271,6 +2271,16 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB | GUC_STANDARD, NULL, NULL, NULL); + DefineCustomRealVariable( + "citus.tenant_freq", + gettext_noop("Sets the threshold tenant frequency for a Shard"), + NULL, + &TenantFrequency, + 0.2, 0, 1, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.shard_replication_factor", gettext_noop("Sets the replication factor for shards."), diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index a895f105d..8813b7b32 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -214,7 +214,8 @@ extern int ShardCount; extern int ShardReplicationFactor; extern int NextShardId; extern int NextPlacementId; -extern int MaxShardSize; +extern uint64 MaxShardSize; +extern double TenantFrequency; extern bool IsCoordinator(void);