mirror of https://github.com/citusdata/citus.git
Added tenant frequency as GUC and addressed some more review comments
parent
96de8322b0
commit
4a671f04b3
|
@ -11,11 +11,13 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "distributed/shard_split.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "distributed/listutils.h"
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
|
||||
|
||||
int MaxShardSize = 102400;
|
||||
uint64 MaxShardSize = 102400;
|
||||
double TenantFrequency = 0.2;
|
||||
|
||||
/*
|
||||
* Struct to store all the information related to
|
||||
|
@ -23,16 +25,15 @@ int MaxShardSize = 102400;
|
|||
*/
|
||||
typedef struct ShardInfoData
|
||||
{
|
||||
int64 shardsize;
|
||||
int64 shardminvalue;
|
||||
int64 shardmaxvalue;
|
||||
int64 shardid;
|
||||
int32 nodeid;
|
||||
char *tablename;
|
||||
int64 shardSize;
|
||||
int64 shardMinValue;
|
||||
int64 shardMaxValue;
|
||||
int64 shardId;
|
||||
int32 nodeId;
|
||||
char *tableName;
|
||||
char *distributionColumn;
|
||||
char *datatype;
|
||||
char *shardname;
|
||||
char *schemaname;
|
||||
char *dataType;
|
||||
char *shardName;
|
||||
Oid tableId;
|
||||
Oid distributionColumnId;
|
||||
}ShardInfoData;
|
||||
|
@ -69,27 +70,33 @@ ErrorOnConcurrentOperation()
|
|||
* For a given SplitPoints , it creates the SQL query for the Shard Splitting
|
||||
*/
|
||||
StringInfo
|
||||
GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode)
|
||||
GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode)
|
||||
{
|
||||
StringInfo splitQuery = makeStringInfo();
|
||||
|
||||
int64 length = list_length(SplitPoints);
|
||||
int64 length = list_length(splitPoints);
|
||||
appendStringInfo(splitQuery, "SELECT citus_split_shard_by_split_points(%ld, ARRAY[",
|
||||
shardinfo->shardid);
|
||||
shardinfo->shardId);
|
||||
|
||||
for (int i = 0; i < length - 1; i++)
|
||||
int32 splitpoint = 0;
|
||||
int index = 0;
|
||||
foreach_int(splitpoint, splitPoints)
|
||||
{
|
||||
appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i));
|
||||
appendStringInfo(splitQuery, "'%d'", splitpoint);
|
||||
|
||||
if (index < length - 1)
|
||||
appendStringInfoString(splitQuery, ",");
|
||||
|
||||
index++;
|
||||
}
|
||||
appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints,
|
||||
length - 1));
|
||||
|
||||
appendStringInfo(splitQuery, "], ARRAY[");
|
||||
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
appendStringInfo(splitQuery, "%d,", shardinfo->nodeid);
|
||||
appendStringInfo(splitQuery, "%d,", shardinfo->nodeId);
|
||||
}
|
||||
appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(
|
||||
shardSplitMode));
|
||||
appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeId , quote_literal_cstr(shardSplitMode));
|
||||
|
||||
return splitQuery;
|
||||
}
|
||||
|
@ -99,16 +106,15 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode)
|
|||
* It creates a background job for citus_split_shard_by_split_points and executes it in background.
|
||||
*/
|
||||
void
|
||||
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints,
|
||||
ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints,
|
||||
char *shardSplitMode)
|
||||
{
|
||||
StringInfo splitQuery = makeStringInfo();
|
||||
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints, shardSplitMode);
|
||||
ereport(LOG, (errmsg(splitQuery->data)));
|
||||
int32 nodesInvolved[1];
|
||||
nodesInvolved[0] = shardinfo->nodeid;
|
||||
splitQuery = GetShardSplitQuery(shardinfo, splitPoints, shardSplitMode);
|
||||
// ereport(LOG, (errmsg(splitQuery->data)));
|
||||
int32 nodesInvolved[] = { shardinfo->nodeId };
|
||||
Oid superUserId = CitusExtensionOwner();
|
||||
BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0,
|
||||
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, splitQuery->data, 0,
|
||||
NULL, 1, nodesInvolved);
|
||||
}
|
||||
|
||||
|
@ -124,10 +130,10 @@ ExecuteAvgHashQuery(ShardInfo shardinfo)
|
|||
appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)"
|
||||
" FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))"
|
||||
" WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s",
|
||||
shardinfo->distributionColumn, shardinfo->tablename,
|
||||
quote_literal_cstr(shardinfo->tablename),
|
||||
shardinfo->distributionColumn, shardinfo->shardminvalue,
|
||||
shardinfo->distributionColumn, shardinfo->shardmaxvalue
|
||||
shardinfo->distributionColumn, shardinfo->tableName,
|
||||
quote_literal_cstr(shardinfo->tableName),
|
||||
shardinfo->distributionColumn, shardinfo->shardMinValue,
|
||||
shardinfo->distributionColumn, shardinfo->shardMaxValue
|
||||
);
|
||||
ereport(LOG, errmsg("%s", AvgHashQuery->data));
|
||||
SPI_connect();
|
||||
|
@ -136,21 +142,21 @@ ExecuteAvgHashQuery(ShardInfo shardinfo)
|
|||
HeapTuple tuple = tupletable->vals[0];
|
||||
bool isnull;
|
||||
Datum average = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
|
||||
int64 IsResultNull = 1;
|
||||
int64 isResultNull = 1;
|
||||
if (!isnull)
|
||||
{
|
||||
IsResultNull = 0;
|
||||
isResultNull = 0;
|
||||
}
|
||||
SPI_freetuptable(tupletable);
|
||||
SPI_finish();
|
||||
|
||||
if (IsResultNull == 0)
|
||||
if (isResultNull == 0)
|
||||
{
|
||||
return DatumGetInt64(average);
|
||||
}
|
||||
else
|
||||
{
|
||||
return shardinfo->shardminvalue - 1;
|
||||
return shardinfo->shardMinValue - 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,24 +178,30 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
appendStringInfo(CommonValueQuery,
|
||||
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
|
||||
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
|
||||
" WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > 0.3 $$)"
|
||||
" WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %f $$)"
|
||||
" WHERE result <> '' AND shardid = %ld;",
|
||||
shardinfo->datatype, quote_literal_cstr(shardinfo->tablename),
|
||||
shardinfo->datatype,
|
||||
quote_literal_cstr(shardinfo->shardname),
|
||||
shardinfo->dataType, quote_literal_cstr(shardinfo->tableName),
|
||||
shardinfo->dataType,
|
||||
quote_literal_cstr(shardinfo->shardName),
|
||||
quote_literal_cstr(shardinfo->distributionColumn),
|
||||
quote_literal_cstr(shardinfo->schemaname),
|
||||
shardinfo->shardid);
|
||||
quote_literal_cstr(get_namespace_name(get_rel_namespace(shardinfo->tableId))),
|
||||
TenantFrequency,
|
||||
shardinfo->shardId);
|
||||
|
||||
ereport(LOG, errmsg("%s", CommonValueQuery->data));
|
||||
List *SplitPoints = NULL;
|
||||
List *splitPoints = NULL;
|
||||
/* Saving the current memory context*/
|
||||
MemoryContext originalContext = CurrentMemoryContext;
|
||||
|
||||
SPI_connect();
|
||||
SPI_exec(CommonValueQuery->data, 0);
|
||||
MemoryContext spiContext = CurrentMemoryContext;
|
||||
/*Saving the SPI memory context for switching*/
|
||||
MemoryContext spiContext = CurrentMemoryContext;
|
||||
|
||||
int64 rowCount = SPI_processed;
|
||||
int64 average;
|
||||
int32 hashedValue;
|
||||
|
||||
ereport(LOG, errmsg("%ld", rowCount));
|
||||
|
||||
if (rowCount > 0)
|
||||
|
@ -213,44 +225,38 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
/*Switching the memory context to store the unique SplitPoints in a list*/
|
||||
|
||||
MemoryContextSwitchTo(originalContext);
|
||||
if (hashedValue == shardinfo->shardminvalue)
|
||||
if (hashedValue == shardinfo->shardMinValue)
|
||||
{
|
||||
SplitPoints = list_append_unique_int(SplitPoints, hashedValue);
|
||||
splitPoints = list_append_unique_int(splitPoints, hashedValue);
|
||||
}
|
||||
else if (hashedValue == shardinfo->shardmaxvalue)
|
||||
else if (hashedValue == shardinfo->shardMaxValue)
|
||||
{
|
||||
SplitPoints = list_append_unique_int(SplitPoints, hashedValue - 1);
|
||||
splitPoints = list_append_unique_int(splitPoints, hashedValue - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
SplitPoints = list_append_unique_int(SplitPoints, hashedValue - 1);
|
||||
SplitPoints = list_append_unique_int(SplitPoints, hashedValue);
|
||||
splitPoints = list_append_unique_int(splitPoints, hashedValue - 1);
|
||||
splitPoints = list_append_unique_int(splitPoints, hashedValue);
|
||||
}
|
||||
MemoryContextSwitchTo(spiContext);
|
||||
}
|
||||
SPI_freetuptable(tupletable);
|
||||
list_sort(splitPoints, list_int_cmp);
|
||||
}
|
||||
else
|
||||
{
|
||||
average = ExecuteAvgHashQuery(shardinfo);
|
||||
ereport(LOG, errmsg("%ld", average));
|
||||
MemoryContextSwitchTo(originalContext);
|
||||
if (shardinfo->shardMinValue <= average)
|
||||
{
|
||||
splitPoints = lappend_int(splitPoints, average);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SPI_finish();
|
||||
|
||||
if (rowCount > 0)
|
||||
{
|
||||
list_sort(SplitPoints, list_int_cmp);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (shardinfo->shardminvalue <= average)
|
||||
{
|
||||
SplitPoints = lappend_int(SplitPoints, average);
|
||||
}
|
||||
}
|
||||
|
||||
return SplitPoints;
|
||||
return splitPoints;
|
||||
}
|
||||
|
||||
|
||||
|
@ -261,15 +267,15 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
void
|
||||
ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode)
|
||||
{
|
||||
List *SplitPoints = FindShardSplitPoints(shardinfo);
|
||||
if (list_length(SplitPoints) > 0)
|
||||
List *splitPoints = FindShardSplitPoints(shardinfo);
|
||||
if (list_length(splitPoints) > 0)
|
||||
{
|
||||
/* ErrorOnConcurrentOperation(); */
|
||||
int64 jobId = CreateBackgroundJob("Automatic Shard Split",
|
||||
"Split using SplitPoints List");
|
||||
ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, SplitPoints,
|
||||
ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints,
|
||||
shardSplitMode)->data));
|
||||
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints, shardSplitMode);
|
||||
ExecuteSplitBackgroundJob(jobId, shardinfo, splitPoints, shardSplitMode);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -288,21 +294,20 @@ Datum
|
|||
citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
||||
{
|
||||
StringInfo query = makeStringInfo();
|
||||
|
||||
/* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize
|
||||
* are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node
|
||||
* and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/
|
||||
|
||||
appendStringInfo(
|
||||
query,
|
||||
" SELECT cs.shardid,pd.shardminvalue,pd.shardmaxvalue,cs.shard_size,pn.nodeid,ct.distribution_column,ct.table_name,cs.shard_name,(SELECT relname FROM pg_class WHERE oid = (ct.table_name::regclass)::oid) "
|
||||
" SELECT cs.shardid,pd.shardminvalue,pd.shardmaxvalue,cs.shard_size,pn.nodeid,ct.distribution_column,ct.table_name,cs.shard_name,(SELECT relname FROM pg_class WHERE oid = ct.table_name)"
|
||||
" FROM pg_catalog.pg_dist_shard pd JOIN pg_catalog.citus_shards cs ON pd.shardid = cs.shardid JOIN pg_catalog.pg_dist_node pn ON cs.nodename = pn.nodename AND cs.nodeport= pn.nodeport"
|
||||
" JOIN"
|
||||
" ( select shardid , max_size from (SELECT distinct first_value(shardid) OVER w as shardid, sum(shard_size) OVER (PARTITION BY colocation_id, shardminvalue) as total_sum, max(shard_size) OVER w as max_size"
|
||||
" FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)"
|
||||
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )"
|
||||
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %lu )"
|
||||
" AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''",
|
||||
MaxShardSize * 1024
|
||||
MaxShardSize*1024
|
||||
);
|
||||
|
||||
ereport(LOG, errmsg("%s", query->data));
|
||||
|
@ -328,29 +333,26 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
|||
ShardInfoData shardinfo;
|
||||
HeapTuple tuple = tupletable->vals[rowIndex];
|
||||
|
||||
Datum shardId = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
|
||||
shardinfo.shardid = DatumGetInt64(shardId);
|
||||
Datum shardIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
|
||||
shardinfo.shardId = DatumGetInt64(shardIdDatum);
|
||||
|
||||
Datum shardSize = SPI_getbinval(tuple, tupletable->tupdesc, 4, &isnull);
|
||||
shardinfo.shardsize = DatumGetInt64(shardSize);
|
||||
Datum shardSizeDatum = SPI_getbinval(tuple, tupletable->tupdesc, 4, &isnull);
|
||||
shardinfo.shardSize = DatumGetInt64(shardSizeDatum);
|
||||
|
||||
Datum nodeId = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull);
|
||||
shardinfo.nodeid = DatumGetInt32(nodeId);
|
||||
Datum nodeIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull);
|
||||
shardinfo.nodeId = DatumGetInt32(nodeIdDatum);
|
||||
|
||||
char *shardMinVal = SPI_getvalue(tuple, tupletable->tupdesc, 2);
|
||||
shardinfo.shardminvalue = strtoi64(shardMinVal, NULL, 10);
|
||||
shardinfo.shardMinValue = strtoi64(shardMinVal, NULL, 10);
|
||||
|
||||
char *shardMaxVal = SPI_getvalue(tuple, tupletable->tupdesc, 3);
|
||||
shardinfo.shardmaxvalue = strtoi64(shardMaxVal, NULL, 10);
|
||||
shardinfo.shardMaxValue = strtoi64(shardMaxVal, NULL, 10);
|
||||
|
||||
shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6);
|
||||
shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7);
|
||||
shardinfo.tableName = SPI_getvalue(tuple, tupletable->tupdesc, 7);
|
||||
|
||||
StringInfo shardnameQuery = makeStringInfo();
|
||||
appendStringInfo(shardnameQuery, "%s_%ld", SPI_getvalue(tuple,
|
||||
tupletable->tupdesc, 9),
|
||||
shardinfo.shardid);
|
||||
shardinfo.shardname = shardnameQuery->data;
|
||||
shardinfo.shardName = SPI_getvalue(tuple,tupletable->tupdesc, 9);
|
||||
AppendShardIdToName(&shardinfo.shardName,shardinfo.shardId);
|
||||
|
||||
Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull);
|
||||
shardinfo.tableId = DatumGetObjectId(tableIdDatum);
|
||||
|
@ -358,17 +360,14 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
|||
shardinfo.tableId,
|
||||
shardinfo.
|
||||
distributionColumn);
|
||||
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
|
||||
|
||||
Oid schemaOid = get_rel_namespace(shardinfo.tableId);
|
||||
shardinfo.schemaname = get_namespace_name(schemaOid);
|
||||
shardinfo.dataType = format_type_be(shardinfo.distributionColumnId);
|
||||
|
||||
ScheduleShardSplit(&shardinfo, shardSplitMode);
|
||||
ereport(LOG, (errmsg(
|
||||
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d",
|
||||
shardinfo.shardid, shardinfo.shardminvalue,
|
||||
shardinfo.shardmaxvalue,
|
||||
shardinfo.shardsize, shardinfo.nodeid)));
|
||||
shardinfo.shardId, shardinfo.shardMinValue,
|
||||
shardinfo.shardMaxValue,
|
||||
shardinfo.shardSize, shardinfo.nodeId)));
|
||||
}
|
||||
|
||||
SPI_freetuptable(tupletable);
|
||||
|
|
|
@ -2271,6 +2271,16 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_UNIT_KB | GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomRealVariable(
|
||||
"citus.tenant_freq",
|
||||
gettext_noop("Sets the threshold tenant frequency for a Shard"),
|
||||
NULL,
|
||||
&TenantFrequency,
|
||||
0.2, 0, 1,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.shard_replication_factor",
|
||||
gettext_noop("Sets the replication factor for shards."),
|
||||
|
|
|
@ -214,7 +214,8 @@ extern int ShardCount;
|
|||
extern int ShardReplicationFactor;
|
||||
extern int NextShardId;
|
||||
extern int NextPlacementId;
|
||||
extern int MaxShardSize;
|
||||
extern uint64 MaxShardSize;
|
||||
extern double TenantFrequency;
|
||||
|
||||
|
||||
extern bool IsCoordinator(void);
|
||||
|
|
Loading…
Reference in New Issue