From 474739ba31f1d7e95a2ca6b37f30f2dd85d2f91b Mon Sep 17 00:00:00 2001 From: Shabnam Khan Date: Tue, 20 Jun 2023 16:04:35 +0530 Subject: [PATCH] Added schemaname in Query --- .../distributed/operations/auto_shard_split.c | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index 17110ef52..4a9212ca5 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -10,6 +10,7 @@ #include "distributed/distribution_column.h" #include "utils/builtins.h" #include "distributed/shard_split.h" +#include "utils/lsyscache.h" PG_FUNCTION_INFO_V1(citus_auto_shard_split_start); @@ -31,6 +32,7 @@ typedef struct ShardInfoData char *distributionColumn; char *datatype; char *shardname; + char *schemaname; Oid tableId; Oid distributionColumnId; }ShardInfoData; @@ -64,10 +66,10 @@ ErrorOnConcurrentOperation() /* - * For a given SplitPoints , it creates the SQL query for the shard Splitting + * For a given SplitPoints , it creates the SQL query for the Shard Splitting */ StringInfo -GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) +GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints, char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); @@ -80,13 +82,14 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i)); } appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints, - length - 1)); + length - 1)); for (int i = 0; i < length; i++) { appendStringInfo(splitQuery, "%d,", shardinfo->nodeid); } - appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(shardSplitMode)); + appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr( + shardSplitMode)); return splitQuery; } @@ -96,10 +99,11 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode * It creates a background job for citus_split_shard_by_split_points and executes it in background. */ void -ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) +ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints, + char *shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - splitQuery = GetShardSplitQuery(shardinfo, SplitPoints , shardSplitMode); + splitQuery = GetShardSplitQuery(shardinfo, SplitPoints, shardSplitMode); ereport(LOG, (errmsg(splitQuery->data))); int32 nodesInvolved[1]; nodesInvolved[0] = shardinfo->nodeid; @@ -168,12 +172,13 @@ FindShardSplitPoints(ShardInfo shardinfo) appendStringInfo(CommonValueQuery, "SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)" " FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)" - " WHERE tablename = %s AND attname = %s AND freq > 0.2 $$)" + " WHERE tablename = %s AND attname = %s AND schemaname = %s AND freq > 0.3 $$)" " WHERE result <> '' AND shardid = %ld;", shardinfo->datatype, quote_literal_cstr(shardinfo->tablename), shardinfo->datatype, quote_literal_cstr(shardinfo->shardname), quote_literal_cstr(shardinfo->distributionColumn), + quote_literal_cstr(shardinfo->schemaname), shardinfo->shardid); ereport(LOG, errmsg("%s", CommonValueQuery->data)); @@ -204,12 +209,13 @@ FindShardSplitPoints(ShardInfo shardinfo) tenantIdDatum); hashedValue = DatumGetInt32(hashedValueDatum); ereport(LOG, errmsg("%d", hashedValue)); + /*Switching the memory context to store the unique SplitPoints in a list*/ MemoryContextSwitchTo(originalContext); if (hashedValue == shardinfo->shardminvalue) { - SplitPoints = list_append_unique_int(SplitPoints,hashedValue); + SplitPoints = list_append_unique_int(SplitPoints, hashedValue); } else if (hashedValue == shardinfo->shardmaxvalue) { @@ -253,16 +259,17 @@ FindShardSplitPoints(ShardInfo shardinfo) * split and then executes the background job for the shard split. */ void -ScheduleShardSplit(ShardInfo shardinfo , char* shardSplitMode) +ScheduleShardSplit(ShardInfo shardinfo, char *shardSplitMode) { List *SplitPoints = FindShardSplitPoints(shardinfo); if (list_length(SplitPoints) > 0) { - // ErrorOnConcurrentOperation(); + /* ErrorOnConcurrentOperation(); */ int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); - ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints,shardSplitMode)->data)); - ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints , shardSplitMode); + ereport(LOG, errmsg("%s", GetShardSplitQuery(shardinfo, SplitPoints, + shardSplitMode)->data)); + ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints, shardSplitMode); } else { @@ -295,15 +302,14 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) " FROM citus_shards cs JOIN pg_dist_shard ps USING(shardid)" " WINDOW w AS (PARTITION BY colocation_id, shardminvalue ORDER BY shard_size DESC) )as t where total_sum >= %ld )" " AS max_sizes ON cs.shardid=max_sizes.shardid AND cs.shard_size = max_sizes.max_size JOIN citus_tables ct ON cs.table_name = ct.table_name AND pd.shardminvalue <> pd.shardmaxvalue AND pd.shardminvalue <> ''", - MaxShardSize*1024 + MaxShardSize * 1024 ); - ereport(LOG, errmsg("%s", query->data)); - char *shardSplitMode; - Oid shardTransferModeOid = PG_GETARG_OID(0); - Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); - shardSplitMode = DatumGetCString(enumLabelDatum); - ereport(LOG,errmsg("%s",shardSplitMode)); + ereport(LOG, errmsg("%s", query->data)); + Oid shardTransferModeOid = PG_GETARG_OID(0); + Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); + char *shardSplitMode = DatumGetCString(enumLabelDatum); + ereport(LOG, errmsg("%s", shardSplitMode)); if (SPI_connect() != SPI_OK_CONNECT) { elog(ERROR, "SPI_connect to the query failed"); @@ -354,8 +360,10 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) distributionColumn); shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); - - ScheduleShardSplit(&shardinfo , shardSplitMode); + Oid schemaOid = get_rel_namespace(shardinfo.tableId); + shardinfo.schemaname = get_namespace_name(schemaOid); + + ScheduleShardSplit(&shardinfo, shardSplitMode); ereport(LOG, (errmsg( "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", shardinfo.shardid, shardinfo.shardminvalue,