Addressed all review comments

pull/7013/head
Shabnam Khan 2023-06-28 10:14:22 +05:30
parent de57bd1240
commit aef9865b9b
6 changed files with 48 additions and 67 deletions

View File

@ -37,12 +37,12 @@ typedef struct ShardInfoData
typedef ShardInfoData *ShardInfo; typedef ShardInfoData *ShardInfo;
void ErrorOnConcurrentOperation(void); void ErrorOnConcurrentOperation(void);
StringInfo GetShardSplitQuery(ShardInfo shardinfo, Datum datum, StringInfo GetShardSplitQuery(ShardInfo shardinfo, Datum splitPointArrayDatum,
char *shardSplitMode); char *shardSplitMode);
void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum datum, void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum splitPointArrayDatum,
char *shardSplitMode); char *shardSplitMode);
List * FindShardSplitPoints(int64 shardId); List * FindShardSplitPoints(int64 shardId);
int64 ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId); bool ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId);
/* /*
* It throws an error if a concurrent automatic shard split or Rebalance operation is happening. * It throws an error if a concurrent automatic shard split or Rebalance operation is happening.
@ -74,11 +74,11 @@ ErrorOnConcurrentOperation()
* For a given SplitPoints , it creates the SQL query for the Shard Splitting * For a given SplitPoints , it creates the SQL query for the Shard Splitting
*/ */
StringInfo StringInfo
GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode) GetShardSplitQuery(ShardInfo shardinfo, Datum splitPointArrayDatum, char *shardSplitMode)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
ArrayType *array = DatumGetArrayTypeP(datum); ArrayType *array = DatumGetArrayTypeP(splitPointArrayDatum);
Datum *values; Datum *values;
int nelems; int nelems;
deconstruct_array(array, deconstruct_array(array,
@ -98,7 +98,7 @@ GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode)
} }
} }
/*All the shards after the split will be belonging to the same node */ /*All the shards after the split will be belonging to the same node */
appendStringInfo(splitQuery, "], ARRAY["); appendStringInfo(splitQuery, "], ARRAY[");
for (int i = 0; i < nelems; i++) for (int i = 0; i < nelems; i++)
@ -116,11 +116,11 @@ GetShardSplitQuery(ShardInfo shardinfo, Datum datum, char *shardSplitMode)
* It creates a background job for citus_split_shard_by_split_points and executes it in background. * It creates a background job for citus_split_shard_by_split_points and executes it in background.
*/ */
void void
ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum datum, ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, Datum splitPointArrayDatum,
char *shardSplitMode) char *shardSplitMode)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
splitQuery = GetShardSplitQuery(shardinfo, datum, shardSplitMode); splitQuery = GetShardSplitQuery(shardinfo, splitPointArrayDatum, shardSplitMode);
/* ereport(LOG, (errmsg(splitQuery->data))); */ /* ereport(LOG, (errmsg(splitQuery->data))); */
int32 nodesInvolved[] = { shardinfo->nodeId }; int32 nodesInvolved[] = { shardinfo->nodeId };
@ -138,11 +138,12 @@ Datum
citus_find_shard_split_points(PG_FUNCTION_ARGS) citus_find_shard_split_points(PG_FUNCTION_ARGS)
{ {
int64 shardId = PG_GETARG_INT64(0); int64 shardId = PG_GETARG_INT64(0);
int64 shardGroupSize = PG_GETARG_INT64(1); int64 shardSize = PG_GETARG_INT64(1);
int64 shardGroupSize = PG_GETARG_INT64(2);
ereport(DEBUG4, errmsg("%ld", shardGroupSize)); ereport(DEBUG4, errmsg("%ld", shardGroupSize));
/*Filtering Shards with total GroupSize greater than MaxShardSize*1024 i.e Size based Policy*/ /*Filtering Shards with total GroupSize greater than MaxShardSize*1024 i.e Size based Policy*/
if (shardGroupSize < MaxShardSize * 1024) if (shardGroupSize < MaxShardSize*1024)
{ {
PG_RETURN_NULL(); PG_RETURN_NULL();
} }
@ -161,6 +162,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS)
int64 shardMinValue = shardrange->minValue; int64 shardMinValue = shardrange->minValue;
int64 shardMaxValue = shardrange->maxValue; int64 shardMaxValue = shardrange->maxValue;
char *tableName = generate_qualified_relation_name(tableId); char *tableName = generate_qualified_relation_name(tableId);
char *schemaName = get_namespace_name(get_rel_namespace(tableId));
StringInfo CommonValueQuery = makeStringInfo(); StringInfo CommonValueQuery = makeStringInfo();
/* /*
@ -177,8 +179,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS)
dataType, dataType,
quote_literal_cstr(shardName), quote_literal_cstr(shardName),
quote_literal_cstr(distributionColumnName), quote_literal_cstr(distributionColumnName),
quote_literal_cstr(get_namespace_name(get_rel_namespace( quote_literal_cstr(schemaName),
tableId))),
TenantFrequency, TenantFrequency,
shardId); shardId);
@ -246,10 +247,6 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS)
else else
{ {
StringInfo AvgHashQuery = makeStringInfo(); StringInfo AvgHashQuery = makeStringInfo();
uint64 tableSize = 0;
bool check = DistributedTableSize(tableId, TOTAL_RELATION_SIZE, true,
&tableSize);
/* /*
* It executes a query to find the average hash value in a shard considering rows with a limit of 10GB . * It executes a query to find the average hash value in a shard considering rows with a limit of 10GB .
* If there exists a hash value it is returned otherwise NULL is returned. * If there exists a hash value it is returned otherwise NULL is returned.
@ -258,7 +255,7 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS)
" FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/%lu))" " FROM (SELECT worker_hash(%s) h FROM %s TABLESAMPLE SYSTEM(least(10, 100*10000000000/%lu))"
" WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s", " WHERE worker_hash(%s)>=%ld AND worker_hash(%s)<=%ld) s",
distributionColumnName, tableName, distributionColumnName, tableName,
tableSize, shardSize,
distributionColumnName, shardMinValue, distributionColumnName, shardMinValue,
distributionColumnName, shardMaxValue distributionColumnName, shardMaxValue
); );
@ -310,35 +307,36 @@ citus_find_shard_split_points(PG_FUNCTION_ARGS)
* This function calculates the split points of the shard to * This function calculates the split points of the shard to
* split and then executes the background job for the shard split. * split and then executes the background job for the shard split.
*/ */
int64 bool
ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId) ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode, int64 jobId)
{ {
SPI_connect(); SPI_connect();
StringInfo findSplitPointsQuery = makeStringInfo(); StringInfo findSplitPointsQuery = makeStringInfo();
appendStringInfo(findSplitPointsQuery, appendStringInfo(findSplitPointsQuery,
"SELECT citus_find_shard_split_points(%ld , %ld)", "SELECT citus_find_shard_split_points(%ld , %ld , %ld)",
shardinfo->shardId, shardinfo->shardId,
shardinfo->shardSize,
shardinfo->shardGroupSize); shardinfo->shardGroupSize);
SPI_exec(findSplitPointsQuery->data, 0); SPI_exec(findSplitPointsQuery->data, 0);
SPITupleTable *tupletable = SPI_tuptable; SPITupleTable *tupletable = SPI_tuptable;
HeapTuple tuple = tupletable->vals[0]; HeapTuple tuple = tupletable->vals[0];
bool isnull; bool isnull;
Datum resultDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull); Datum splitPointArrayDatum = SPI_getbinval(tuple, tupletable->tupdesc, 1, &isnull);
if (!isnull) if (!isnull)
{ {
ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, resultDatum, ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, splitPointArrayDatum,
shardSplitMode)->data)); shardSplitMode)->data));
ExecuteSplitBackgroundJob(jobId, shardinfo, resultDatum, shardSplitMode); ExecuteSplitBackgroundJob(jobId, shardinfo, splitPointArrayDatum, shardSplitMode);
SPI_finish(); SPI_finish();
return 1; return true;
} }
else else
{ {
ereport(LOG, errmsg("No Splitpoints for shard split")); ereport(LOG, errmsg("No Splitpoints for shard split"));
SPI_finish(); SPI_finish();
return 0; return false;
} }
} }
@ -389,7 +387,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
bool isnull; bool isnull;
int64 jobId = CreateBackgroundJob("Automatic Shard Split", int64 jobId = CreateBackgroundJob("Automatic Shard Split",
"Split using SplitPoints List"); "Split using SplitPoints List");
int64 count = 0; int64 scheduledSplitCount = 0;
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
@ -408,14 +406,16 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
char *shardGroupSizeValue = SPI_getvalue(tuple, tupletable->tupdesc, 6); char *shardGroupSizeValue = SPI_getvalue(tuple, tupletable->tupdesc, 6);
shardinfo.shardGroupSize = strtoi64(shardGroupSizeValue, NULL, 10); shardinfo.shardGroupSize = strtoi64(shardGroupSizeValue, NULL, 10);
count = count + ScheduleShardSplit(&shardinfo, shardSplitMode, jobId); if(ScheduleShardSplit(&shardinfo, shardSplitMode, jobId)){
scheduledSplitCount++;
}
} }
SPI_freetuptable(tupletable); SPI_freetuptable(tupletable);
SPI_finish(); SPI_finish();
if (count == 0) if (scheduledSplitCount == 0)
{ {
DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId)); CancelJob(jobId);
} }
PG_RETURN_INT64(jobId); PG_RETURN_INT64(jobId);

View File

@ -2262,8 +2262,8 @@ RegisterCitusConfigVariables(void)
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_shard_size", "citus.split_shard_group_size_threshold",
gettext_noop("Sets the max size of a Shard"), gettext_noop("Sets the max colocation group size of a Shard"),
NULL, NULL,
&MaxShardSize, &MaxShardSize,
102400, 100, INT32_MAX, 102400, 100, INT32_MAX,
@ -2272,7 +2272,7 @@ RegisterCitusConfigVariables(void)
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomRealVariable( DefineCustomRealVariable(
"citus.tenant_freq", "citus.tenant_isolation_frequency_threshold",
gettext_noop("Sets the threshold tenant frequency for a Shard"), gettext_noop("Sets the threshold tenant frequency for a Shard"),
NULL, NULL,
&TenantFrequency, &TenantFrequency,

View File

@ -1,32 +1,21 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
shard_transfer_mode citus.shard_transfer_mode default 'auto' shard_transfer_mode citus.shard_transfer_mode default 'auto'
) )
RETURNS bigint RETURNS bigint
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE; LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode)
IS 'automatically split the necessary shards in the cluster in the background'; IS 'automatically split the necessary shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC; GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points( CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points(
shard_id bigint, shard_id bigint,
shard_size bigint,
shard_group_size bigint shard_group_size bigint
) )
RETURNS SETOF bigint[] RETURNS SETOF bigint[]
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE; LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint)
COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint)
IS 'creates split points for shards'; IS 'creates split points for shards';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) TO PUBLIC;

View File

@ -1,32 +1,21 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start( CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
shard_transfer_mode citus.shard_transfer_mode default 'auto' shard_transfer_mode citus.shard_transfer_mode default 'auto'
) )
RETURNS bigint RETURNS bigint
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE; LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) COMMENT ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode)
IS 'automatically split the necessary shards in the cluster in the background'; IS 'automatically split the necessary shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC; GRANT EXECUTE ON FUNCTION pg_catalog.citus_auto_shard_split_start(citus.shard_transfer_mode) TO PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points( CREATE OR REPLACE FUNCTION pg_catalog.citus_find_shard_split_points(
shard_id bigint, shard_id bigint,
shard_size bigint,
shard_group_size bigint shard_group_size bigint
) )
RETURNS SETOF bigint[] RETURNS SETOF bigint[]
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE; LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint)
COMMENT ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint)
IS 'creates split points for shards'; IS 'creates split points for shards';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_size bigint , shard_group_size bigint) TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_find_shard_split_points(shard_id bigint , shard_group_size bigint) TO PUBLIC;

View File

@ -145,14 +145,11 @@ PG_FUNCTION_INFO_V1(citus_task_wait);
* We apply the same policy checks as pg_cancel_backend to check if a user can cancel a * We apply the same policy checks as pg_cancel_backend to check if a user can cancel a
* job. * job.
*/ */
Datum
citus_job_cancel(PG_FUNCTION_ARGS) void
{ CancelJob(int64 jobid){
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
int64 jobid = PG_GETARG_INT64(0);
/* Cancel all tasks that were scheduled before */ /* Cancel all tasks that were scheduled before */
List *pids = CancelTasksForJob(jobid); List *pids = CancelTasksForJob(jobid);
@ -170,10 +167,15 @@ citus_job_cancel(PG_FUNCTION_ARGS)
} }
UpdateBackgroundJob(jobid); UpdateBackgroundJob(jobid);
PG_RETURN_VOID();
} }
Datum
citus_job_cancel(PG_FUNCTION_ARGS)
{
int64 jobid = PG_GETARG_INT64(0);
CancelJob(jobid);
PG_RETURN_VOID();
}
/* /*
* pg_catalog.citus_job_wait(jobid bigint, * pg_catalog.citus_job_wait(jobid bigint,

View File

@ -111,5 +111,6 @@ extern Datum citus_task_wait(PG_FUNCTION_ARGS);
extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus); extern void citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus);
extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus); extern void citus_task_wait_internal(int64 taskid, BackgroundTaskStatus *desiredStatus);
extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task); extern bool IncrementParallelTaskCountForNodesInvolved(BackgroundTask *task);
extern void CancelJob(int64 jobid);
#endif /*CITUS_BACKGROUND_JOBS_H */ #endif /*CITUS_BACKGROUND_JOBS_H */