Shard mode is to be implemented

pull/7013/head
Shabnam Khan 2023-06-19 18:02:38 +05:30
parent 27389e8f58
commit afefcf7461
1 changed files with 26 additions and 19 deletions

View File

@ -1,21 +1,21 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "executor/spi.h"
// #include "distributed/lock_graph.h"
#include "distributed/lock_graph.h"
#include "distributed/coordinator_protocol.h"
// #include "distributed/metadata_cache.h"
// #include "distributed/metadata_utility.h"
// #include "distributed/multi_logical_replication.h"
// #include "distributed/multi_server_executor.h"
// #include "distributed/pg_dist_rebalance_strategy.h"
// #include "distributed/pg_dist_shard.h"
// #include "distributed/reference_table_utils.h"
// #include "distributed/remote_commands.h"
// #include "distributed/resource_lock.h"
// #include "distributed/tuplestore.h"
// #include "distributed/utils/array_type.h"
// #include "distributed/worker_protocol.h"
// #include "nodes/pg_list.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_rebalance_strategy.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_protocol.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "distributed/distribution_column.h"
#include "utils/builtins.h"
@ -26,6 +26,10 @@ PG_FUNCTION_INFO_V1(citus_auto_shard_split_start);
int MaxShardSize = 104857600;
/*
* Struct to store all the information related to
* a shard.
*/
typedef struct ShardInfoData
{
int64 shardsize;
@ -165,6 +169,12 @@ List *
FindShardSplitPoints(ShardInfo shardinfo)
{
StringInfo CommonValueQuery = makeStringInfo();
/*
* The inner query for extracting the tenant value having frequency > 0.3 is executed on
* every shard of the table and outer query gives the shardid and tenant values as the
* output.
*/
appendStringInfo(CommonValueQuery,
"SELECT shardid , unnest(result::%s[]) from run_command_on_shards(%s,$$SELECT array_agg(val)"
" FROM pg_stats s , unnest(most_common_vals::text::%s[],most_common_freqs) as res(val,freq)"
@ -246,7 +256,8 @@ FindShardSplitPoints(ShardInfo shardinfo)
/*
* This function calculates the split points of the shard to split and then executes the background job.
* This function calculates the split points of the shard to
* split and then executes the background job for the shard split.
*/
void
ScheduleShardSplit(ShardInfo shardinfo)
@ -346,10 +357,6 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
char *shardSplitMode;
/* Oid shardTransferModeOid = PG_GETARG_OID(0); */
/* Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); */
/* char *enumLabel = DatumGetCString(enumLabelDatum); */
/* ereport(LOG,errmsg("%s",enumLabel)); */
ScheduleShardSplit(&shardinfo);
ereport(LOG, (errmsg(