Added custom schema , FindShardSplitPoints and ErroronconcurrentOperation

pull/7013/head
Shabnam Khan 2023-06-19 13:55:02 +05:30
parent f9a5be59b9
commit 8bac212941
9 changed files with 8065 additions and 1529 deletions

3052
configure vendored

File diff suppressed because it is too large Load Diff

6164
configure~ Executable file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,329 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "executor/spi.h"
#include "distributed/lock_graph.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_rebalance_strategy.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_protocol.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "distributed/distribution_column.h"
#include "utils/builtins.h"
#include "distributed/shard_split.h"
PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
int MaxShardSize = 104857600;
typedef struct ShardInfoData
{
int64 shardsize;
int64 shardminvalue;
int64 shardmaxvalue;
int64 shardid;
int64 nodeid;
char *tablename;
char *distributionColumn;
char *datatype;
char *shardname;
Oid tableId;
Oid distributionColumnId;
}ShardInfoData;
typedef ShardInfoData *ShardInfo;
void ErrorOnConcurrentOperation(){
int64 jobId = 0;
if (HasNonTerminalJobOfType("rebalance", &jobId))
{
ereport(ERROR, (
errmsg("A rebalance is already running as job %ld", jobId),
errdetail("A rebalance was already scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM "
"citus_rebalance_status();")));
}
if (HasNonTerminalJobOfType("Automatic Shard Split", &jobId))
{
ereport(ERROR, (
errmsg("An automatic shard split is already running as job %ld", jobId),
errdetail("An automatic shard split was already scheduled as background job")));
}
}
StringInfo
GetShardSplitQuery(ShardInfo shardinfo, List* SplitPoints)
{
StringInfo splitQuery = makeStringInfo();
int64 length = list_length(SplitPoints);
appendStringInfo(splitQuery,"SELECT citus_split_shard_by_split_points(%ld, ARRAY[",shardinfo->shardid);
for (int i =0; i < length-1; i++)
{
appendStringInfo(splitQuery,"'%ld',",DatumGetInt64(list_nth(SplitPoints,i)));
}
appendStringInfo(splitQuery,"'%ld'], ARRAY[",DatumGetInt64(list_nth(SplitPoints,length-1)));
for (int i =0; i < length; i++)
{
appendStringInfo(splitQuery,"%ld,",shardinfo->nodeid);
}
appendStringInfo(splitQuery,"%ld], 'block_writes')",shardinfo->nodeid);
return splitQuery;
}
void
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List* SplitPoints)
{
StringInfo splitQuery = makeStringInfo();
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints);
ereport(LOG, (errmsg(splitQuery->data)));
int32 nodesInvolved[1];
nodesInvolved[0] = shardinfo->nodeid;
Oid superUserId = CitusExtensionOwner();
ErrorOnConcurrentOperation();
BackgroundTask *task = ScheduleBackgroundTask(jobid, superUserId, splitQuery->data, 0,
NULL, 1, nodesInvolved);
}
/*
* It executes a query to find the average hash value in a shard considering rows with a limit of 10GB .
*
*/
int64
ExecuteAvgHashQuery(ShardInfo shardinfo)
{
StringInfo AvgHashQuery = makeStringInfo();
appendStringInfo(AvgHashQuery, "SELECT avg(h)::int,count(*)"
" FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/citus_total_relation_size(%s)))"
" WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s",
shardinfo->distributionColumn, shardinfo->tablename,quote_literal_cstr(shardinfo->tablename),
shardinfo->distributionColumn, shardinfo->shardminvalue,
shardinfo->distributionColumn, shardinfo->shardmaxvalue
);
ereport(LOG, errmsg("%s", AvgHashQuery->data));
SPI_connect();
SPI_exec(AvgHashQuery->data, 0);
SPITupleTable *tupletable = SPI_tuptable;
HeapTuple tuple = tupletable->vals[0];
bool isnull;
Datum average = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
int64 IsResultNull = 1;
if (!isnull)
{
IsResultNull = 0;
}
SPI_freetuptable(tupletable);
SPI_finish();
if (IsResultNull == 0)
{
return DatumGetInt64(average);
}
else
{
return shardinfo->shardminvalue - 1;
}
}
/*
* This function executes a query and then decides whether a shard is subjected for isolation or average hash 2 way split.
* If a tenant is found splitpoints for isolation is returned otherwise average hash value is returned.
*/
List *
FindShardSplitPoints(ShardInfo shardinfo)
{
StringInfo CommonValueQuery = makeStringInfo();
appendStringInfo(CommonValueQuery,
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
" WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)"
" WHERE result <> '' AND shardid = %ld;",
shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), shardinfo->datatype,
quote_literal_cstr(shardinfo->shardname),
quote_literal_cstr(shardinfo->distributionColumn), shardinfo->shardid);
ereport(LOG, errmsg("%s", CommonValueQuery->data));
List *SplitPoints = NULL;
MemoryContext originalContext = CurrentMemoryContext;
SPI_connect();
SPI_exec(CommonValueQuery->data, 0);
MemoryContext spiContext = CurrentMemoryContext;
int64 rowCount = SPI_processed;
int64 average,hashedValue;
ereport(LOG, errmsg("%ld", rowCount));
if (rowCount > 0)
{
SPITupleTable *tupletable = SPI_tuptable;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId);
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
HeapTuple tuple = tupletable->vals[rowIndex];
char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2);
ereport(LOG, errmsg("%s", commonValue));
Datum tenantIdDatum = StringToDatum(commonValue, shardinfo->distributionColumnId);
Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction,
cacheEntry->partitionColumn->varcollid,
tenantIdDatum);
hashedValue = DatumGetInt32(hashedValueDatum);
ereport(LOG,errmsg("%ld",hashedValue));
MemoryContextSwitchTo(originalContext);
if (hashedValue == shardinfo->shardminvalue)
{
SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue));
}
else if (hashedValue == shardinfo->shardmaxvalue)
{
SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue-1));
}
else
{
SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue-1));
SplitPoints = lappend(SplitPoints,Int64GetDatum(hashedValue));;
}
MemoryContextSwitchTo(spiContext);
}
SPI_freetuptable(tupletable);
}
else
{
average = ExecuteAvgHashQuery(shardinfo);
ereport(LOG, errmsg("%ld", average));
}
SPI_finish();
if(rowCount>0){
list_sort(SplitPoints, list_int_cmp);
}else{
if(shardinfo->shardminvalue<=average){
SplitPoints = lappend(SplitPoints,Int64GetDatum(average));
}
}
return SplitPoints;
}
/*
* This function calculates the split points of the shard to split and then executes the background job.
*/
void
ScheduleShardSplit(ShardInfo shardinfo){
List* SplitPoints = FindShardSplitPoints(shardinfo);
if(list_length(SplitPoints)>0){
// int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List");
ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints)->data));
}else{
ereport(LOG,errmsg("No Splitpoints for shard split"));
}
}
Datum
citus_auto_shard_split_start(PG_FUNCTION_ARGS)
{
StringInfo query = makeStringInfo();
/* This query is written to group the shards on the basis of colocation id and shardminvalue and get the groups whose sum of shardsize
* are greater than a threshold and than extract the shard in them which has the maximum size. So for that first pg_dist_shard and citus_shards are joined followed by the joining of pg_dist_node
* and citus_shards and finally joined by the table obtained by the grouping of colocation id and shardminvalue and shardsize exceeding the threshold.*/
appendStringInfo(
query,
" SELECT cs.shardid,pd.shardminvalue,pd.shardmaxvalue,cs.shard_size,pn.nodeid,ct.distribution_column,ct.table_name,cs.shard_name,(SELECT relname FROM pg_class WHERE oid = (ct.table_name::regclass)::oid) "
" FROM pg_catalog.pg_dist_shard pd JOIN pg_catalog.citus_shards cs ON pd.shardid = cs.shardid JOIN pg_catalog.pg_dist_node pn ON cs.nodename = pn.nodename AND cs.nodeport= pn.nodeport"
" JOIN"
" ( select shardid , max_size from (SELECT distinct first_value(shardid) OVER w as shardid, sum(shard_size) OVER (PARTITION BY colocation_id, shardminvalue) as total_sum, max(shard_size) OVER w as max_size"
" FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)"
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )"
" AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''",
0
);
ereport(LOG, errmsg("%s", query->data));
if (SPI_connect() != SPI_OK_CONNECT)
{
elog(ERROR, "SPI_connect to the query failed");
}
if (SPI_exec(query->data, 0) != SPI_OK_SELECT)
{
elog(ERROR, "SPI_exec for the execution failed");
}
SPITupleTable *tupletable = SPI_tuptable;
int rowCount = SPI_processed;
bool isnull;
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ShardInfoData shardinfo;
HeapTuple tuple = tupletable->vals[rowIndex];
Datum shardId = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
shardinfo.shardid = DatumGetInt64(shardId);
Datum shardSize = SPI_getbinval(tuple, tupletable->tupdesc, 4, &isnull);
shardinfo.shardsize = DatumGetInt64(shardSize);
Datum nodeId = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull);
shardinfo.nodeid = DatumGetInt64(nodeId);
char *shardMinVal = SPI_getvalue(tuple, tupletable->tupdesc, 2);
shardinfo.shardminvalue = strtoi64(shardMinVal, NULL, 10);
char *shardMaxVal = SPI_getvalue(tuple, tupletable->tupdesc, 3);
shardinfo.shardmaxvalue = strtoi64(shardMaxVal, NULL, 10);
shardinfo.distributionColumn = SPI_getvalue(tuple, tupletable->tupdesc, 6);
shardinfo.tablename = SPI_getvalue(tuple, tupletable->tupdesc, 7);
StringInfo shardnameQuery = makeStringInfo();
appendStringInfo(shardnameQuery,"%s_%ld",SPI_getvalue(tuple,tupletable->tupdesc,9),shardinfo.shardid);
shardinfo.shardname = shardnameQuery->data;
Datum tableIdDatum = SPI_getbinval(tuple, tupletable->tupdesc, 7, &isnull);
shardinfo.tableId = DatumGetObjectId(tableIdDatum);
shardinfo.distributionColumnId = ColumnTypeIdForRelationColumnName(shardinfo.tableId,
shardinfo.
distributionColumn);
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
char * shardSplitMode;
// Oid shardTransferModeOid = PG_GETARG_OID(0);
// Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid);
// char *enumLabel = DatumGetCString(enumLabelDatum);
// ereport(LOG,errmsg("%s",enumLabel));
ScheduleShardSplit(&shardinfo);
ereport(LOG, (errmsg(
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %ld",
shardinfo.shardid, shardinfo.shardminvalue,
shardinfo.shardmaxvalue,
shardinfo.shardsize, shardinfo.nodeid)));
}
SPI_freetuptable(tupletable);
SPI_finish();
PG_RETURN_VOID();
}

View File

@ -2261,6 +2261,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_shard_size",
gettext_noop("Sets the max size of a Shard"),
NULL,
&MaxShardSize,
104857600, 102400, INT32_MAX,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.shard_replication_factor", "citus.shard_replication_factor",
gettext_noop("Sets the replication factor for shards."), gettext_noop("Sets the replication factor for shards."),

View File

@ -1,3 +1,4 @@
-- citus--11.3-1--12.0-1 -- citus--11.3-1--12.0-1
#include "udfs/citus_auto_shard_split_start/12.0-1.sql"
-- bump version to 12.0-1 -- bump version to 12.0-1

View File

@ -0,0 +1,13 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start()
IS 'automatically split the necessary shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC;

View File

@ -0,0 +1,14 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
)
RETURNS VOID
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start()
IS 'automatically split the necessary shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start() TO PUBLIC;

View File

@ -46,12 +46,12 @@
/* Define to 1 if you have the `zstd' library (-lzstd). */ /* Define to 1 if you have the `zstd' library (-lzstd). */
#undef HAVE_LIBZSTD #undef HAVE_LIBZSTD
/* Define to 1 if you have the <memory.h> header file. */
#undef HAVE_MEMORY_H
/* Define to 1 if you have the <stdint.h> header file. */ /* Define to 1 if you have the <stdint.h> header file. */
#undef HAVE_STDINT_H #undef HAVE_STDINT_H
/* Define to 1 if you have the <stdio.h> header file. */
#undef HAVE_STDIO_H
/* Define to 1 if you have the <stdlib.h> header file. */ /* Define to 1 if you have the <stdlib.h> header file. */
#undef HAVE_STDLIB_H #undef HAVE_STDLIB_H
@ -94,5 +94,7 @@
/* The size of `void *', as computed by sizeof. */ /* The size of `void *', as computed by sizeof. */
#undef SIZEOF_VOID_P #undef SIZEOF_VOID_P
/* Define to 1 if you have the ANSI C header files. */ /* Define to 1 if all of the C90 standard headers exist (not just the ones
required in a freestanding environment). This macro is provided for
backward compatibility; new code need not use it. */
#undef STDC_HEADERS #undef STDC_HEADERS

View File

@ -214,6 +214,7 @@ extern int ShardCount;
extern int ShardReplicationFactor; extern int ShardReplicationFactor;
extern int NextShardId; extern int NextShardId;
extern int NextPlacementId; extern int NextPlacementId;
extern int MaxShardSize;
extern bool IsCoordinator(void); extern bool IsCoordinator(void);