Returning JobId as result

pull/7013/head
Shabnam Khan 2023-06-26 12:56:25 +05:30
parent a596b1faf4
commit e9d807c117
3 changed files with 36 additions and 25 deletions

View File

@ -13,6 +13,7 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/background_jobs.h"
PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
@ -44,9 +45,9 @@ StringInfo GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints,
char *shardSplitMode); char *shardSplitMode);
void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints, void ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints,
char *shardSplitMode); char *shardSplitMode);
int64 ExecuteAvgHashQuery(ShardInfo shardinfo); int64 ExecuteAverageHashQuery(ShardInfo shardinfo);
List * FindShardSplitPoints(ShardInfo shardinfo); List * FindShardSplitPoints(ShardInfo shardinfo);
void ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode); int64 ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode , int64 jobId);
/* /*
* It throws an error if a concurrent automatic shard split or Rebalance operation is happening. * It throws an error if a concurrent automatic shard split or Rebalance operation is happening.
@ -87,7 +88,7 @@ GetShardSplitQuery(ShardInfo shardinfo, List *splitPoints, char *shardSplitMode)
shardinfo->shardId); shardinfo->shardId);
int32 splitpoint = 0; int32 splitpoint = 0;
int index = 0; uint64 index = 0;
foreach_int(splitpoint, splitPoints) foreach_int(splitpoint, splitPoints)
{ {
appendStringInfo(splitQuery, "'%d'", splitpoint); appendStringInfo(splitQuery, "'%d'", splitpoint);
@ -136,7 +137,7 @@ ExecuteSplitBackgroundJob(int64 jobId, ShardInfo shardinfo, List *splitPoints,
* If there exists a hash value it is returned otherwise shardminvalue-1 is returned. * If there exists a hash value it is returned otherwise shardminvalue-1 is returned.
*/ */
int64 int64
ExecuteAvgHashQuery(ShardInfo shardinfo) ExecuteAverageHashQuery(ShardInfo shardinfo)
{ {
StringInfo AvgHashQuery = makeStringInfo(); StringInfo AvgHashQuery = makeStringInfo();
uint64 tableSize = 0; uint64 tableSize = 0;
@ -150,7 +151,7 @@ ExecuteAvgHashQuery(ShardInfo shardinfo)
shardinfo->distributionColumn, shardinfo->shardMinValue, shardinfo->distributionColumn, shardinfo->shardMinValue,
shardinfo->distributionColumn, shardinfo->shardMaxValue shardinfo->distributionColumn, shardinfo->shardMaxValue
); );
ereport(LOG, errmsg("%s", AvgHashQuery->data)); ereport(DEBUG4, errmsg("%s", AvgHashQuery->data));
SPI_connect(); SPI_connect();
SPI_exec(AvgHashQuery->data, 0); SPI_exec(AvgHashQuery->data, 0);
SPITupleTable *tupletable = SPI_tuptable; SPITupleTable *tupletable = SPI_tuptable;
@ -193,7 +194,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
appendStringInfo(CommonValueQuery, appendStringInfo(CommonValueQuery,
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
" WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %f $$)" " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > %lf $$)"
" WHERE result <> '' AND shardid = %ld;", " WHERE result <> '' AND shardid = %ld;",
shardinfo->dataType, quote_literal_cstr(shardinfo->tableName), shardinfo->dataType, quote_literal_cstr(shardinfo->tableName),
shardinfo->dataType, shardinfo->dataType,
@ -204,7 +205,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
TenantFrequency, TenantFrequency,
shardinfo->shardId); shardinfo->shardId);
ereport(LOG, errmsg("%s", CommonValueQuery->data)); ereport(DEBUG4, errmsg("%s", CommonValueQuery->data));
List *splitPoints = NULL; List *splitPoints = NULL;
/* Saving the current memory context*/ /* Saving the current memory context*/
@ -220,17 +221,21 @@ FindShardSplitPoints(ShardInfo shardinfo)
int64 average; int64 average;
int32 hashedValue; int32 hashedValue;
ereport(LOG, errmsg("%ld", rowCount)); ereport(DEBUG4, errmsg("%ld", rowCount));
if (rowCount > 0) if (rowCount > 0)
{ {
/*For every common tenant value split point is calculated on the basis of
* the hashed value and the unique split points are appended to the list
* and the resulting is then sorted and returned.
*/
SPITupleTable *tupletable = SPI_tuptable; SPITupleTable *tupletable = SPI_tuptable;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardinfo->tableId);
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
HeapTuple tuple = tupletable->vals[rowIndex]; HeapTuple tuple = tupletable->vals[rowIndex];
char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2); char *commonValue = SPI_getvalue(tuple, tupletable->tupdesc, 2);
ereport(LOG, errmsg("%s", commonValue)); ereport(DEBUG4, errmsg("%s", commonValue));
Datum tenantIdDatum = StringToDatum(commonValue, Datum tenantIdDatum = StringToDatum(commonValue,
shardinfo->distributionColumnId); shardinfo->distributionColumnId);
Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction, Datum hashedValueDatum = FunctionCall1Coll(cacheEntry->hashFunction,
@ -238,7 +243,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
varcollid, varcollid,
tenantIdDatum); tenantIdDatum);
hashedValue = DatumGetInt32(hashedValueDatum); hashedValue = DatumGetInt32(hashedValueDatum);
ereport(LOG, errmsg("%d", hashedValue)); ereport(DEBUG4, errmsg("%d", hashedValue));
/*Switching the memory context to store the unique SplitPoints in a list*/ /*Switching the memory context to store the unique SplitPoints in a list*/
@ -263,8 +268,8 @@ FindShardSplitPoints(ShardInfo shardinfo)
} }
else else
{ {
average = ExecuteAvgHashQuery(shardinfo); average = ExecuteAverageHashQuery(shardinfo);
ereport(LOG, errmsg("%ld", average)); ereport(DEBUG4, errmsg("%ld", average));
MemoryContextSwitchTo(originalContext); MemoryContextSwitchTo(originalContext);
if (shardinfo->shardMinValue <= average) if (shardinfo->shardMinValue <= average)
{ {
@ -281,21 +286,21 @@ FindShardSplitPoints(ShardInfo shardinfo)
* This function calculates the split points of the shard to * This function calculates the split points of the shard to
* split and then executes the background job for the shard split. * split and then executes the background job for the shard split.
*/ */
void int64
ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode , int64 jobId)
{ {
List *splitPoints = FindShardSplitPoints(shardinfo); List *splitPoints = FindShardSplitPoints(shardinfo);
if (list_length(splitPoints) > 0) if (list_length(splitPoints) > 0)
{ {
int64 jobId = CreateBackgroundJob("Automatic Shard Split", ereport(DEBUG4, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints,
"Split using SplitPoints List");
ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, splitPoints,
shardSplitMode)->data)); shardSplitMode)->data));
ExecuteSplitBackgroundJob(jobId, shardinfo, splitPoints, shardSplitMode); ExecuteSplitBackgroundJob(jobId, shardinfo, splitPoints, shardSplitMode);
return 1;
} }
else else
{ {
ereport(LOG, errmsg("No Splitpoints for shard split")); ereport(LOG, errmsg("No Splitpoints for shard split"));
return 0;
} }
} }
@ -328,11 +333,11 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
MaxShardSize*1024 MaxShardSize*1024
); );
ereport(LOG, errmsg("%s", query->data)); ereport(DEBUG4 ,errmsg("%s", query->data));
Oid shardTransferModeOid = PG_GETARG_OID(0); Oid shardTransferModeOid = PG_GETARG_OID(0);
Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid);
char *shardSplitMode = DatumGetCString(enumLabelDatum); char *shardSplitMode = DatumGetCString(enumLabelDatum);
ereport(LOG, errmsg("%s", shardSplitMode)); ereport(DEBUG4, errmsg("%s", shardSplitMode));
if (SPI_connect() != SPI_OK_CONNECT) if (SPI_connect() != SPI_OK_CONNECT)
{ {
elog(ERROR, "SPI_connect to the query failed"); elog(ERROR, "SPI_connect to the query failed");
@ -345,6 +350,9 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
SPITupleTable *tupletable = SPI_tuptable; SPITupleTable *tupletable = SPI_tuptable;
int rowCount = SPI_processed; int rowCount = SPI_processed;
bool isnull; bool isnull;
int64 jobId = CreateBackgroundJob("Automatic Shard Split",
"Split using SplitPoints List");
int64 count = 0;
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
@ -380,8 +388,8 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
distributionColumn); distributionColumn);
shardinfo.dataType = format_type_be(shardinfo.distributionColumnId); shardinfo.dataType = format_type_be(shardinfo.distributionColumnId);
ScheduleShardSplit(&shardinfo, shardSplitMode); count = count + ScheduleShardSplit(&shardinfo, shardSplitMode , jobId);
ereport(LOG, (errmsg( ereport(DEBUG4, (errmsg(
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d",
shardinfo.shardId, shardinfo.shardMinValue, shardinfo.shardId, shardinfo.shardMinValue,
shardinfo.shardMaxValue, shardinfo.shardMaxValue,
@ -390,6 +398,9 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
SPI_freetuptable(tupletable); SPI_freetuptable(tupletable);
SPI_finish(); SPI_finish();
if(count==0){
PG_RETURN_VOID(); DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobId));
}
PG_RETURN_INT64(jobId);
} }

View File

@ -2,7 +2,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
shard_transfer_mode citus.shard_transfer_mode default 'auto' shard_transfer_mode citus.shard_transfer_mode default 'auto'
) )
RETURNS VOID RETURNS bigint
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'

View File

@ -2,7 +2,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_auto_shard_split_start(
shard_transfer_mode citus.shard_transfer_mode default 'auto' shard_transfer_mode citus.shard_transfer_mode default 'auto'
) )
RETURNS VOID RETURNS bigint
AS 'MODULE_PATHNAME' AS 'MODULE_PATHNAME'