Added DistributedTableSize, Removed warnings, executing ErrorOnConcurrentOperation

pull/7013/head
Shabnam Khan 2023-06-21 17:15:17 +05:30
parent 4a671f04b3
commit a596b1faf4
4 changed files with 40 additions and 21 deletions

View File

@ -85,8 +85,7 @@ static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId); static void RecordDistributedRelationDependencies(Oid distributedRelationId);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc, static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple); HeapTuple heapTuple);
static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize);
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize); uint64 *tableSize);
@ -510,7 +509,7 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
* It first checks whether the table is distributed and size query can be run on * It first checks whether the table is distributed and size query can be run on
* it. Connection to each node has to be established to get the size of the table. * it. Connection to each node has to be established to get the size of the table.
*/ */
static bool bool
DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError, DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize) uint64 *tableSize)
{ {

View File

@ -12,12 +12,12 @@
#include "distributed/shard_split.h" #include "distributed/shard_split.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
uint64 MaxShardSize = 102400; uint64 MaxShardSize = 102400;
double TenantFrequency = 0.2; double TenantFrequency = 0.3;
/* /*
* Struct to store all the information related to * Struct to store all the information related to
@ -39,6 +39,14 @@ typedef struct ShardInfoData
}ShardInfoData; }ShardInfoData;
typedef ShardInfoData *ShardInfo; typedef ShardInfoData *ShardInfo;
void ErrorOnConcurrentOperation(void);
StringInfo GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints,
char *shardSplitMode);
void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints,
char *shardSplitMode);
int64 ExecuteAvgHashQuery(ShardInfo shardinfo);
List * FindShardSplitPoints(ShardInfo shardinfo);
void ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode);
/* /*
* It throws an error if a concurrent automatic shard split or Rebalance operation is happening. * It throws an error if a concurrent automatic shard split or Rebalance operation is happening.
@ -85,18 +93,22 @@ GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode)
appendStringInfo(splitQuery, "'%d'", splitpoint); appendStringInfo(splitQuery, "'%d'", splitpoint);
if (index < length - 1) if (index < length - 1)
{
appendStringInfoString(splitQuery, ","); appendStringInfoString(splitQuery, ",");
}
index++; index++;
} }
/*All the shards after the split will be belonging to the same node */
appendStringInfo(splitQuery, "], ARRAY["); appendStringInfo(splitQuery, "], ARRAY[");
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
appendStringInfo(splitQuery, "%d,", shardinfo->nodeId); appendStringInfo(splitQuery, "%d,", shardinfo->nodeId);
} }
appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId , quote_literal_cstr(shardSplitMode)); appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId, quote_literal_cstr(
shardSplitMode));
return splitQuery; return splitQuery;
} }
@ -111,7 +123,7 @@ ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints,
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
splitQuery = GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode); splitQuery = GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode);
// ereport(LOG, (errmsg(splitQuery->data))); /* ereport(LOG, (errmsg(splitQuery->data))); */
int32 nodesInvolved[] = { shardinfo->nodeId }; int32 nodesInvolved[] = { shardinfo->nodeId };
Oid superUserId = CitusExtensionOwner(); Oid superUserId = CitusExtensionOwner();
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, splitQuery->data, 0, BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, splitQuery->data, 0,
@ -127,11 +139,14 @@ int64
ExecuteAvgHashQuery(ShardInfo shardinfo) ExecuteAvgHashQuery(ShardInfo shardinfo)
{ {
StringInfo AvgHashQuery = makeStringInfo(); StringInfo AvgHashQuery = makeStringInfo();
uint64 tableSize = 0;
bool check = DistributedTableSize(shardinfo->tableId, TOTAL_RELATION_SIZE, true,
&tableSize);
appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)" appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)"
" FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))" " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/%lu))"
" WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s",
shardinfo->distributionColumn, shardinfo->tableName, shardinfo->distributionColumn, shardinfo->tableName,
quote_literal_cstr(shardinfo->tableName), tableSize,
shardinfo->distributionColumn, shardinfo->shardMinValue, shardinfo->distributionColumn, shardinfo->shardMinValue,
shardinfo->distributionColumn, shardinfo->shardMaxValue shardinfo->distributionColumn, shardinfo->shardMaxValue
); );
@ -171,9 +186,9 @@ FindShardSplitPoints(ShardInfo shardinfo)
StringInfo CommonValueQuery = makeStringInfo(); StringInfo CommonValueQuery = makeStringInfo();
/* /*
* The inner query for extracting the tenant value having frequency > 0.3 is executed on * The inner query for extracting the tenant value having frequency > TenantFrequency is executed on
* every shard of the table and outer query gives the shardid and tenant values as the * every shard of the table and outer query gives the shardid and tenant values as the
* output. * output. Tenant Frequency is a GUC here.
*/ */
appendStringInfo(CommonValueQuery, appendStringInfo(CommonValueQuery,
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
@ -184,18 +199,21 @@ FindShardSplitPoints(ShardInfo shardinfo)
shardinfo->dataType, shardinfo->dataType,
quote_literal_cstr(shardinfo->shardName), quote_literal_cstr(shardinfo->shardName),
quote_literal_cstr(shardinfo->distributionColumn), quote_literal_cstr(shardinfo->distributionColumn),
quote_literal_cstr(get_namespace_name(get_rel_namespace(shardinfo->tableId))), quote_literal_cstr(get_namespace_name(get_rel_namespace(
shardinfo->tableId))),
TenantFrequency, TenantFrequency,
shardinfo->shardId); shardinfo->shardId);
ereport(LOG, errmsg("%s", CommonValueQuery->data)); ereport(LOG, errmsg("%s", CommonValueQuery->data));
List *splitPoints = NULL; List *splitPoints = NULL;
/* Saving the current memory context*/ /* Saving the current memory context*/
MemoryContext originalContext = CurrentMemoryContext; MemoryContext originalContext = CurrentMemoryContext;
SPI_connect(); SPI_connect();
SPI_exec(CommonValueQuery->data, 0); SPI_exec(CommonValueQuery->data, 0);
/*Saving the SPI memory context for switching*/
/*Saving the SPI memory context for switching*/
MemoryContext spiContext = CurrentMemoryContext; MemoryContext spiContext = CurrentMemoryContext;
int64 rowCount = SPI_processed; int64 rowCount = SPI_processed;
@ -252,7 +270,6 @@ FindShardSplitPoints(ShardInfo shardinfo)
{ {
splitPoints = lappend_int(splitPoints, average); splitPoints = lappend_int(splitPoints, average);
} }
} }
SPI_finish(); SPI_finish();
@ -270,7 +287,6 @@ ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode)
List *splitPoints = FindShardSplitPoints(shardinfo); List *splitPoints = FindShardSplitPoints(shardinfo);
if (list_length(splitPoints) > 0) if (list_length(splitPoints) > 0)
{ {
/* ErrorOnConcurrentOperation(); */
int64 jobId = CreateBackgroundJob("Automatic Shard Split", int64 jobId = CreateBackgroundJob("Automatic Shard Split",
"Split using SplitPoints List"); "Split using SplitPoints List");
ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints, ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints,
@ -293,7 +309,9 @@ ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode)
Datum Datum
citus_auto_shard_split_start(PG_FUNCTION_ARGS) citus_auto_shard_split_start(PG_FUNCTION_ARGS)
{ {
ErrorOnConcurrentOperation();
StringInfo query = makeStringInfo(); StringInfo query = makeStringInfo();
/* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize /* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize
* are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node * are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node
* and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/ * and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/
@ -307,7 +325,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
" FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)"
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )"
" AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''",
MaxShardSize*1024 MaxShardSize * 1024
); );
ereport(LOG, errmsg("%s", query->data)); ereport(LOG, errmsg("%s", query->data));
@ -351,8 +369,8 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6); shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6);
shardinfo.tableName = SPI_getvalue(tuple, tupletable->tupdesc, 7); shardinfo.tableName = SPI_getvalue(tuple, tupletable->tupdesc, 7);
shardinfo.shardName = SPI_getvalue(tuple,tupletable->tupdesc, 9); shardinfo.shardName = SPI_getvalue(tuple, tupletable->tupdesc, 9);
AppendShardIdToName(&shardinfo.shardName,shardinfo.shardId); AppendShardIdToName(&shardinfo.shardName, shardinfo.shardId);
Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull); Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull);
shardinfo.tableId = DatumGetObjectId(tableIdDatum); shardinfo.tableId = DatumGetObjectId(tableIdDatum);

View File

@ -2276,7 +2276,7 @@ RegisterCitusConfigVariables(void)
gettext_noop("Sets the threshold tenant frequency for a Shard"), gettext_noop("Sets the threshold tenant frequency for a Shard"),
NULL, NULL,
&TenantFrequency, &TenantFrequency,
0.2, 0, 1, 0.3, 0, 1,
PGC_USERSET, PGC_USERSET,
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@ -411,4 +411,6 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
extern bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize);
#endif /* METADATA_UTILITY_H */ #endif /* METADATA_UTILITY_H */