Added schemaname in Query

pull/7013/head
Shabnam Khan 2023-06-20 16:04:35 +05:30
parent 08d4a72867
commit 474739ba31
1 changed files with 29 additions and 21 deletions

View File

@ -10,6 +10,7 @@
#include "distributed/distribution_column.h" #include "distributed/distribution_column.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "distributed/shard_split.h" #include "distributed/shard_split.h"
#include "utils/lsyscache.h"
PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
@ -31,6 +32,7 @@ typedef struct ShardInfoData
char *distributionColumn; char *distributionColumn;
char *datatype; char *datatype;
char *shardname; char *shardname;
char *schemaname;
Oid tableId; Oid tableId;
Oid distributionColumnId; Oid distributionColumnId;
}ShardInfoData; }ShardInfoData;
@ -64,10 +66,10 @@ ErrorOnConcurrentOperation()
/* /*
* For a given SplitPoints , it creates the SQL query for the shard Splitting * For a given SplitPoints , it creates the SQL query for the Shard Splitting
*/ */
StringInfo StringInfo
GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
@ -80,13 +82,14 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode
appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i)); appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i));
} }
appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints, appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints,
length - 1)); length - 1));
for (int i = 0; i < length; i++) for (int i = 0; i < length; i++)
{ {
appendStringInfo(splitQuery, "%d,", shardinfo->nodeid); appendStringInfo(splitQuery, "%d,", shardinfo->nodeid);
} }
appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(shardSplitMode)); appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(
shardSplitMode));
return splitQuery; return splitQuery;
} }
@ -96,10 +99,11 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode
* It creates a background job for citus_split_shard_by_split_points and executes it in background. * It creates a background job for citus_split_shard_by_split_points and executes it in background.
*/ */
void void
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints,
char *shardSplitMode)
{ {
StringInfo splitQuery = makeStringInfo(); StringInfo splitQuery = makeStringInfo();
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints , shardSplitMode); splitQuery = GetShardSplitQuery(shardinfo, SplitPoints, shardSplitMode);
ereport(LOG, (errmsg(splitQuery->data))); ereport(LOG, (errmsg(splitQuery->data)));
int32 nodesInvolved[1]; int32 nodesInvolved[1];
nodesInvolved[0] = shardinfo->nodeid; nodesInvolved[0] = shardinfo->nodeid;
@ -168,12 +172,13 @@ FindShardSplitPoints(ShardInfo shardinfo)
appendStringInfo(CommonValueQuery, appendStringInfo(CommonValueQuery,
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
" WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)" " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > 0.3 $$)"
" WHERE result <> '' AND shardid = %ld;", " WHERE result <> '' AND shardid = %ld;",
shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), shardinfo->datatype, quote_literal_cstr(shardinfo->tablename),
shardinfo->datatype, shardinfo->datatype,
quote_literal_cstr(shardinfo->shardname), quote_literal_cstr(shardinfo->shardname),
quote_literal_cstr(shardinfo->distributionColumn), quote_literal_cstr(shardinfo->distributionColumn),
quote_literal_cstr(shardinfo->schemaname),
shardinfo->shardid); shardinfo->shardid);
ereport(LOG, errmsg("%s", CommonValueQuery->data)); ereport(LOG, errmsg("%s", CommonValueQuery->data));
@ -204,12 +209,13 @@ FindShardSplitPoints(ShardInfo shardinfo)
tenantIdDatum); tenantIdDatum);
hashedValue = DatumGetInt32(hashedValueDatum); hashedValue = DatumGetInt32(hashedValueDatum);
ereport(LOG, errmsg("%d", hashedValue)); ereport(LOG, errmsg("%d", hashedValue));
/*Switching the memory context to store the unique SplitPoints in a list*/ /*Switching the memory context to store the unique SplitPoints in a list*/
MemoryContextSwitchTo(originalContext); MemoryContextSwitchTo(originalContext);
if (hashedValue == shardinfo->shardminvalue) if (hashedValue == shardinfo->shardminvalue)
{ {
SplitPoints = list_append_unique_int(SplitPoints,hashedValue); SplitPoints = list_append_unique_int(SplitPoints, hashedValue);
} }
else if (hashedValue == shardinfo->shardmaxvalue) else if (hashedValue == shardinfo->shardmaxvalue)
{ {
@ -253,16 +259,17 @@ FindShardSplitPoints(ShardInfo shardinfo)
* split and then executes the background job for the shard split. * split and then executes the background job for the shard split.
*/ */
void void
ScheduleShardSplit(ShardInfo shardinfo , char* shardSplitMode) ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode)
{ {
List *SplitPoints = FindShardSplitPoints(shardinfo); List *SplitPoints = FindShardSplitPoints(shardinfo);
if (list_length(SplitPoints) > 0) if (list_length(SplitPoints) > 0)
{ {
// ErrorOnConcurrentOperation(); /* ErrorOnConcurrentOperation(); */
int64 jobId = CreateBackgroundJob("Automatic Shard Split", int64 jobId = CreateBackgroundJob("Automatic Shard Split",
"Split using SplitPoints List"); "Split using SplitPoints List");
ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints,shardSplitMode)->data)); ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, SplitPoints,
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints , shardSplitMode); shardSplitMode)->data));
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints, shardSplitMode);
} }
else else
{ {
@ -295,15 +302,14 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
" FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)"
" WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )"
" AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''",
MaxShardSize*1024 MaxShardSize * 1024
); );
ereport(LOG, errmsg("%s", query->data)); ereport(LOG, errmsg("%s", query->data));
char *shardSplitMode; Oid shardTransferModeOid = PG_GETARG_OID(0);
Oid shardTransferModeOid = PG_GETARG_OID(0); Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid);
Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); char *shardSplitMode = DatumGetCString(enumLabelDatum);
shardSplitMode = DatumGetCString(enumLabelDatum); ereport(LOG, errmsg("%s", shardSplitMode));
ereport(LOG,errmsg("%s",shardSplitMode));
if (SPI_connect() != SPI_OK_CONNECT) if (SPI_connect() != SPI_OK_CONNECT)
{ {
elog(ERROR, "SPI_connect to the query failed"); elog(ERROR, "SPI_connect to the query failed");
@ -354,8 +360,10 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
distributionColumn); distributionColumn);
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
Oid schemaOid = get_rel_namespace(shardinfo.tableId);
ScheduleShardSplit(&shardinfo , shardSplitMode); shardinfo.schemaname = get_namespace_name(schemaOid);
ScheduleShardSplit(&shardinfo, shardSplitMode);
ereport(LOG, (errmsg( ereport(LOG, (errmsg(
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d",
shardinfo.shardid, shardinfo.shardminvalue, shardinfo.shardid, shardinfo.shardminvalue,