From a596b1faf4556f7a360351ee77780092c8a058d9 Mon Sep 17 00:00:00 2001 From: Shabnam Khan Date: Wed, 21 Jun 2023 17:15:17 +0530 Subject: [PATCH] Added DistributedTableSize, Removed warnings, executing ErrorOnConcurrentOperation --- .../distributed/metadata/metadata_utility.c | 5 +- .../distributed/operations/auto_shard_split.c | 52 +++++++++++++------ src/backend/distributed/shared_library_init.c | 2 +- src/include/distributed/metadata_utility.h | 2 + 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 9fd4290ba..0c46e5b82 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -85,8 +85,7 @@ static uint64 * AllocateUint64(uint64 value); static void RecordDistributedRelationDependencies(Oid distributedRelationId); static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc, HeapTuple heapTuple); -static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, - bool failOnError, uint64 *tableSize); + static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, SizeQueryType sizeQueryType, bool failOnError, uint64 *tableSize); @@ -510,7 +509,7 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore * It first checks whether the table is distributed and size query can be run on * it. Connection to each node has to be established to get the size of the table. */ -static bool +bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError, uint64 *tableSize) { diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index a4cc5dd6d..0a090be67 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -12,12 +12,12 @@ #include "distributed/shard_split.h" #include "utils/lsyscache.h" #include "distributed/listutils.h" - +#include "distributed/metadata_utility.h" PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); uint64 MaxShardSize = 102400; -double TenantFrequency = 0.2; +double TenantFrequency = 0.3; /* * Struct to store all the information related to @@ -39,6 +39,14 @@ typedef struct ShardInfoData }ShardInfoData; typedef ShardInfoData *ShardInfo; +void ErrorOnConcurrentOperation(void); +StringInfo GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, + char *shardSplitMode); +void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, + char *shardSplitMode); +int64 ExecuteAvgHashQuery(ShardInfo shardinfo); +List * FindShardSplitPoints(ShardInfo shardinfo); +void ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode); /* * It throws an error if a concurrent automatic shard split or Rebalance operation is happening. @@ -85,18 +93,22 @@ GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode) appendStringInfo(splitQuery, "'%d'", splitpoint); if (index < length - 1) + { appendStringInfoString(splitQuery, ","); - + } + index++; } +/*All the shards after the split will be belonging to the same node */ appendStringInfo(splitQuery, "], ARRAY["); for (int i = 0; i < length; i++) { appendStringInfo(splitQuery, "%d,", shardinfo->nodeId); } - appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId , quote_literal_cstr(shardSplitMode)); + appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId, quote_literal_cstr( + shardSplitMode)); return splitQuery; } @@ -111,7 +123,7 @@ ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, { StringInfo splitQuery = makeStringInfo(); splitQuery = GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode); - // ereport(LOG, (errmsg(splitQuery->data))); + /* ereport(LOG, (errmsg(splitQuery->data))); */ int32 nodesInvolved[] = { shardinfo->nodeId }; Oid superUserId = CitusExtensionOwner(); BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, splitQuery->data, 0, @@ -127,11 +139,14 @@ int64 ExecuteAvgHashQuery(ShardInfo shardinfo) { StringInfo AvgHashQuery = makeStringInfo(); + uint64 tableSize = 0; + bool check = DistributedTableSize(shardinfo->tableId, TOTAL_RELATION_SIZE, true, + &tableSize); appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)" - " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))" + " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/%lu))" " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", shardinfo->distributionColumn, shardinfo->tableName, - quote_literal_cstr(shardinfo->tableName), + tableSize, shardinfo->distributionColumn, shardinfo->shardMinValue, shardinfo->distributionColumn, shardinfo->shardMaxValue ); @@ -171,9 +186,9 @@ FindShardSplitPoints(ShardInfo shardinfo) StringInfo CommonValueQuery = makeStringInfo(); /* - * The inner query for extracting the tenant value having frequency > 0.3 is executed on + * The inner query for extracting the tenant value having frequency > TenantFrequency is executed on * every shard of the table and outer query gives the shardid and tenant values as the - * output. + * output. Tenant Frequency is a GUC here. */ appendStringInfo(CommonValueQuery, "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" @@ -184,19 +199,22 @@ FindShardSplitPoints(ShardInfo shardinfo) shardinfo->dataType, quote_literal_cstr(shardinfo->shardName), quote_literal_cstr(shardinfo->distributionColumn), - quote_literal_cstr(get_namespace_name(get_rel_namespace(shardinfo->tableId))), + quote_literal_cstr(get_namespace_name(get_rel_namespace( + shardinfo->tableId))), TenantFrequency, shardinfo->shardId); ereport(LOG, errmsg("%s", CommonValueQuery->data)); List *splitPoints = NULL; + /* Saving the current memory context*/ MemoryContext originalContext = CurrentMemoryContext; SPI_connect(); SPI_exec(CommonValueQuery->data, 0); - /*Saving the SPI memory context for switching*/ - MemoryContext spiContext = CurrentMemoryContext; + + /*Saving the SPI memory context for switching*/ + MemoryContext spiContext = CurrentMemoryContext; int64 rowCount = SPI_processed; int64 average; @@ -252,7 +270,6 @@ FindShardSplitPoints(ShardInfo shardinfo) { splitPoints = lappend_int(splitPoints, average); } - } SPI_finish(); @@ -270,7 +287,6 @@ ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) List *splitPoints = FindShardSplitPoints(shardinfo); if (list_length(splitPoints) > 0) { - /* ErrorOnConcurrentOperation(); */ int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints, @@ -293,7 +309,9 @@ ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) Datum citus_auto_shard_split_start(PG_FUNCTION_ARGS) { + ErrorOnConcurrentOperation(); StringInfo query = makeStringInfo(); + /* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize * are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node * and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/ @@ -307,7 +325,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )" " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", - MaxShardSize*1024 + MaxShardSize * 1024 ); ereport(LOG, errmsg("%s", query->data)); @@ -351,8 +369,8 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6); shardinfo.tableName = SPI_getvalue(tuple, tupletable->tupdesc, 7); - shardinfo.shardName = SPI_getvalue(tuple,tupletable->tupdesc, 9); - AppendShardIdToName(&shardinfo.shardName,shardinfo.shardId); + shardinfo.shardName = SPI_getvalue(tuple, tupletable->tupdesc, 9); + AppendShardIdToName(&shardinfo.shardName, shardinfo.shardId); Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull); shardinfo.tableId = DatumGetObjectId(tableIdDatum); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 42a3be8a3..c77a623d7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2276,7 +2276,7 @@ RegisterCitusConfigVariables(void) gettext_noop("Sets the threshold tenant frequency for a Shard"), NULL, &TenantFrequency, - 0.2, 0, 1, + 0.3, 0, 1, PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 08d4896c1..710b04340 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -411,4 +411,6 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +extern bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, + bool failOnError, uint64 *tableSize); #endif /* METADATA_UTILITY_H */