Added comments , removed values only used for testing

pull/7013/head
Shabnam Khan 2023-06-19 14:16:06 +05:30
parent 8bac212941
commit 35d65893ac
3 changed files with 130 additions and 93 deletions

View File

@ -39,12 +39,16 @@ typedef struct ShardInfoData
char *shardname; char *shardname;
Oid tableId; Oid tableId;
Oid distributionColumnId; Oid distributionColumnId;
}ShardInfoData; }ShardInfoData;
typedef ShardInfoData *ShardInfo; typedef ShardInfoData *ShardInfo;
void ErrorOnConcurrentOperation(){
/*
* It throws an error if a concurrent automatic shard split or Rebalance operation is happening.
*/
void
ErrorOnConcurrentOperation()
{
int64 jobId = 0; int64 jobId = 0;
if (HasNonTerminalJobOfType("rebalance", &jobId)) if (HasNonTerminalJobOfType("rebalance", &jobId))
{ {
@ -57,25 +61,32 @@ void ErrorOnConcurrentOperation(){
if (HasNonTerminalJobOfType("Automatic Shard Split", &jobId)) if (HasNonTerminalJobOfType("Automatic Shard Split", &jobId))
{ {
ereport(ERROR, ( ereport(ERROR, (
errmsg("An automatic shard split is already running as job %ld", jobId), errmsg("An automatic shard split is already running as job %ld",
errdetail("An automatic shard split was already scheduled as background job"))); jobId),
errdetail(
"An automatic shard split was already scheduled as background job")));
} }
} }
/*
* For a given SplitPoints , it creates the SQL query for the shard Splitting
*/
StringInfo StringInfo
GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints) GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
int64 length = list_length(SplitPoints); int64 length = list_length(SplitPoints);
appendStringInfo(splitQuery,"SELECT citus_split_shard_by_split_points(%ld, ARRAY[",shardinfo->shardid); appendStringInfo(splitQuery, "SELECT citus_split_shard_by_split_points(%ld, ARRAY[",
shardinfo->shardid);
for (int i = 0; i < length - 1; i++) for (int i = 0; i < length - 1; i++)
{ {
appendStringInfo(splitQuery, "'%ld',", DatumGetInt64(list_nth(SplitPoints, i))); appendStringInfo(splitQuery, "'%ld',", DatumGetInt64(list_nth(SplitPoints, i)));
} }
appendStringInfo(splitQuery,"'%ld'], ARRAY[",DatumGetInt64(list_nth(SplitPoints,length-1))); appendStringInfo(splitQuery, "'%ld'], ARRAY[", DatumGetInt64(list_nth(SplitPoints,
length - 1)));
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
@ -86,10 +97,13 @@ GetShardSplitQuery(ShardInfo shardinfo, List* SplitPoints)
return splitQuery; return splitQuery;
} }
/*
* It creates a background job for citus_split_shard_by_split_points and executes it in background.
*/
void void
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints) ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints); splitQuery = GetShardSplitQuery(shardinfo, SplitPoints);
ereport(LOG, (errmsg(splitQuery->data))); ereport(LOG, (errmsg(splitQuery->data)));
@ -100,9 +114,11 @@ ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List* SplitPoints)
BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0, BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0,
NULL, 1, nodesInvolved); NULL, 1, nodesInvolved);
} }
/* /*
* It executes a query to find the average hash value in a shard considering rows with a limit of 10GB . * It executes a query to find the average hash value in a shard considering rows with a limit of 10GB .
* * If there exists a hash value it is returned otherwise shardminvalue-1 is returned.
*/ */
int64 int64
ExecuteAvgHashQuery(ShardInfo shardinfo) ExecuteAvgHashQuery(ShardInfo shardinfo)
@ -111,7 +127,8 @@ ExecuteAvgHashQuery(ShardInfo shardinfo)
appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)" appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)"
" FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))" " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))"
" WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s",
shardinfo->distributionColumn, shardinfo->tablename,quote_literal_cstr(shardinfo->tablename), shardinfo->distributionColumn, shardinfo->tablename,
quote_literal_cstr(shardinfo->tablename),
shardinfo->distributionColumn, shardinfo->shardminvalue, shardinfo->distributionColumn, shardinfo->shardminvalue,
shardinfo->distributionColumn, shardinfo->shardmaxvalue shardinfo->distributionColumn, shardinfo->shardmaxvalue
); );
@ -139,6 +156,8 @@ ExecuteAvgHashQuery(ShardInfo shardinfo)
return shardinfo->shardminvalue - 1; return shardinfo->shardminvalue - 1;
} }
} }
/* /*
* This function executes a query and then decides whether a shard is subjected for isolation or average hash 2 way split. * This function executes a query and then decides whether a shard is subjected for isolation or average hash 2 way split.
* If a tenant is found splitpoints for isolation is returned otherwise average hash value is returned. * If a tenant is found splitpoints for isolation is returned otherwise average hash value is returned.
@ -152,9 +171,11 @@ FindShardSplitPoints(ShardInfo shardinfo)
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
" WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)" " WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)"
" WHERE result <> '' AND shardid = %ld;", " WHERE result <> '' AND shardid = %ld;",
shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), shardinfo->datatype, shardinfo->datatype, quote_literal_cstr(shardinfo->tablename),
shardinfo->datatype,
quote_literal_cstr(shardinfo->shardname), quote_literal_cstr(shardinfo->shardname),
quote_literal_cstr(shardinfo->distributionColumn), shardinfo->shardid); quote_literal_cstr(shardinfo->distributionColumn),
shardinfo->shardid);
ereport(LOG, errmsg("%s", CommonValueQuery->data)); ereport(LOG, errmsg("%s", CommonValueQuery->data));
List *SplitPoints = NULL; List *SplitPoints = NULL;
@ -175,9 +196,11 @@ FindShardSplitPoints(ShardInfo shardinfo)
HeapTuple tuple = tupletable->vals[rowIndex]; HeapTuple tuple = tupletable->vals[rowIndex];
char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2); char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2);
ereport(LOG, errmsg("%s", commonValue)); ereport(LOG, errmsg("%s", commonValue));
Datum tenantIdDatum = StringToDatum(commonValue, shardinfo->distributionColumnId); Datum tenantIdDatum = StringToDatum(commonValue,
shardinfo->distributionColumnId);
Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction, Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction,
cacheEntry->partitionColumn->varcollid, cacheEntry->partitionColumn->
varcollid,
tenantIdDatum); tenantIdDatum);
hashedValue = DatumGetInt32(hashedValueDatum); hashedValue = DatumGetInt32(hashedValueDatum);
ereport(LOG, errmsg("%ld", hashedValue)); ereport(LOG, errmsg("%ld", hashedValue));
@ -193,7 +216,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
else else
{ {
SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1)); SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1));
SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue));; SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue));
} }
MemoryContextSwitchTo(spiContext); MemoryContextSwitchTo(spiContext);
} }
@ -203,41 +226,52 @@ FindShardSplitPoints(ShardInfo shardinfo)
{ {
average = ExecuteAvgHashQuery(shardinfo); average = ExecuteAvgHashQuery(shardinfo);
ereport(LOG, errmsg("%ld", average)); ereport(LOG, errmsg("%ld", average));
} }
SPI_finish(); SPI_finish();
if(rowCount>0){ if (rowCount > 0)
{
list_sort(SplitPoints, list_int_cmp); list_sort(SplitPoints, list_int_cmp);
}else{ }
if(shardinfo->shardminvalue<=average){ else
{
if (shardinfo->shardminvalue <= average)
{
SplitPoints = lappend(SplitPoints, Int64GetDatum(average)); SplitPoints = lappend(SplitPoints, Int64GetDatum(average));
} }
} }
return SplitPoints; return SplitPoints;
} }
/* /*
* This function calculates the split points of the shard to split and then executes the background job. * This function calculates the split points of the shard to split and then executes the background job.
*/ */
void void
ScheduleShardSplit(ShardInfo shardinfo){ ScheduleShardSplit(ShardInfo shardinfo)
{
List *SplitPoints = FindShardSplitPoints(shardinfo); List *SplitPoints = FindShardSplitPoints(shardinfo);
if(list_length(SplitPoints)>0){ if (list_length(SplitPoints) > 0)
// int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); {
ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints)->data)); int64 jobId = CreateBackgroundJob("Automatic Shard Split",
}else{ "Split using SplitPoints List");
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints);
}
else
{
ereport(LOG, errmsg("No Splitpoints for shard split")); ereport(LOG, errmsg("No Splitpoints for shard split"));
} }
} }
/*
* citus_auto_shard_split_start finds shards whose colocation group has total size greater than threshold
* and from that extracts the shard with the maximum size . It isolates the shards if the shard has common
* tenant values otherwise it 2 way splits it on the basis of average hash value.
*
*/
Datum Datum
citus_auto_shard_split_start(PG_FUNCTION_ARGS) citus_auto_shard_split_start(PG_FUNCTION_ARGS)
{ {
@ -256,7 +290,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
" FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)"
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )"
" AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''",
0 MaxShardSize
); );
ereport(LOG, errmsg("%s", query->data)); ereport(LOG, errmsg("%s", query->data));
@ -298,21 +332,24 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7); shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7);
StringInfo shardnameQuery = makeStringInfo(); StringInfo shardnameQuery = makeStringInfo();
appendStringInfo(shardnameQuery,"%s_%ld",SPI_getvalue(tuple,tupletable->tupdesc,9),shardinfo.shardid); appendStringInfo(shardnameQuery, "%s_%ld", SPI_getvalue(tuple,
tupletable->tupdesc, 9),
shardinfo.shardid);
shardinfo.shardname = shardnameQuery->data; shardinfo.shardname = shardnameQuery->data;
Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull); Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull);
shardinfo.tableId = DatumGetObjectId(tableIdDatum); shardinfo.tableId = DatumGetObjectId(tableIdDatum);
shardinfo.distributionColumnId = ColumnTypeIdForRelationColumnName(shardinfo.tableId, shardinfo.distributionColumnId = ColumnTypeIdForRelationColumnName(
shardinfo.tableId,
shardinfo. shardinfo.
distributionColumn); distributionColumn);
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
char *shardSplitMode; char *shardSplitMode;
// Oid shardTransferModeOid = PG_GETARG_OID(0); /* Oid shardTransferModeOid = PG_GETARG_OID(0); */
// Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); /* Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); */
// char *enumLabel = DatumGetCString(enumLabelDatum); /* char *enumLabel = DatumGetCString(enumLabelDatum); */
// ereport(LOG,errmsg("%s",enumLabel)); /* ereport(LOG,errmsg("%s",enumLabel)); */
ScheduleShardSplit(&shardinfo); ScheduleShardSplit(&shardinfo);
ereport(LOG, (errmsg( ereport(LOG, (errmsg(