diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e65e211c6..3781eac50 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -81,6 +81,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, uint64 *tableSize); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList); +static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames, + char *sizeFunction); static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType); static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, @@ -720,7 +722,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, { StringInfo selectQuery = makeStringInfo(); - appendStringInfo(selectQuery, "SELECT "); + List *partitionedShardNames = NIL; + List *nonPartitionedShardNames = NIL; ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) @@ -746,30 +749,76 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *quotedShardName = quote_literal_cstr(shardQualifiedName); + /* for partitoned tables, we will call worker_partitioned_... size functions */ if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId)) { - appendStringInfo(selectQuery, GetWorkerPartitionedSizeUDFNameBySizeQueryType( - sizeQueryType), quotedShardName); + partitionedShardNames = lappend(partitionedShardNames, quotedShardName); } + + /* for non-partitioned tables, we will use Postgres' size functions */ else { - appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType), - quotedShardName); + nonPartitionedShardNames = lappend(nonPartitionedShardNames, quotedShardName); } - - appendStringInfo(selectQuery, " + "); } - /* - * Add 0 as a last size, it handles empty list case and makes size control checks - * unnecessary which would have implemented without this line. - */ - appendStringInfo(selectQuery, "0;"); + /* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */ + char *subqueryForPartitionedShards = + GenerateSizeQueryForRelationNameList(partitionedShardNames, + GetWorkerPartitionedSizeUDFNameBySizeQueryType( + sizeQueryType)); + + /* SELECT SUM(pg_..._size) FROM VALUES (...) */ + char *subqueryForNonPartitionedShards = + GenerateSizeQueryForRelationNameList(nonPartitionedShardNames, + GetSizeQueryBySizeQueryType(sizeQueryType)); + + appendStringInfo(selectQuery, "SELECT (%s) + (%s);", + subqueryForPartitionedShards, subqueryForNonPartitionedShards); + + elog(DEBUG4, "Size Query: %s", selectQuery->data); return selectQuery; } +/* + * GenerateSizeQueryForPartitionedShards generates and returns a query with a template: + * SELECT SUM( (relid) ) FROM (VALUES (), (), ...) as q(relid) + */ +static char * +GenerateSizeQueryForRelationNameList(List *quotedShardNames, char *sizeFunction) +{ + if (list_length(quotedShardNames) == 0) + { + return "SELECT 0"; + } + + StringInfo selectQuery = makeStringInfo(); + + appendStringInfo(selectQuery, "SELECT SUM("); + appendStringInfo(selectQuery, sizeFunction, "relid"); + appendStringInfo(selectQuery, ") FROM (VALUES "); + + bool addComma = false; + char *quotedShardName = NULL; + foreach_ptr(quotedShardName, quotedShardNames) + { + if (addComma) + { + appendStringInfoString(selectQuery, ", "); + } + addComma = true; + + appendStringInfo(selectQuery, "(%s)", quotedShardName); + } + + appendStringInfoString(selectQuery, ") as q(relid)"); + + return selectQuery->data; +} + + /* * GetWorkerPartitionedSizeUDFNameBySizeQueryType returns the corresponding worker * partitioned size query for given query type. diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index aab94dad1..4f7fad246 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -2474,3 +2474,52 @@ SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_li (1 row) DROP TABLE test_with_all_shards_excluded; +SET citus.shard_count TO 2; +CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); +CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); +SELECT create_distributed_table('"events.Energy Added"', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); +CREATE TABLE "Energy Added_17635" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-14 00:00:00+00') TO ('2018-04-15 00:00:00+00'); +create table colocated_t1 (a int); +select create_distributed_table('colocated_t1','a',colocate_with=>'"events.Energy Added"'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table colocated_t2 (a int); +select create_distributed_table('colocated_t2','a',colocate_with=>'"events.Energy Added"'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +create table colocated_t3 (a int); +select create_distributed_table('colocated_t3','a',colocate_with=>'"events.Energy Added"'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG4; +SELECT * FROM get_rebalance_table_shards_plan('colocated_t1', rebalance_strategy := 'by_disk_size'); +DEBUG: skipping child tables for relation named: colocated_t1 +DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid)); +DEBUG: skipping child tables for relation named: colocated_t1 +DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid)); +DEBUG: skipping child tables for relation named: colocated_t1 +DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid)); +DEBUG: skipping child tables for relation named: colocated_t1 +DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid)); + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- +(0 rows) + +RESET client_min_messages; +DROP TABLE "events.Energy Added", colocated_t1, colocated_t2, colocated_t3; +RESET citus.shard_count; diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 0d482998b..e5ef36b52 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -1417,3 +1417,27 @@ SELECT shardid FROM pg_dist_shard; SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}'); DROP TABLE test_with_all_shards_excluded; + +SET citus.shard_count TO 2; + +CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); +CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); +SELECT create_distributed_table('"events.Energy Added"', 'user_id'); +CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); +CREATE TABLE "Energy Added_17635" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-14 00:00:00+00') TO ('2018-04-15 00:00:00+00'); + +create table colocated_t1 (a int); +select create_distributed_table('colocated_t1','a',colocate_with=>'"events.Energy Added"'); + +create table colocated_t2 (a int); +select create_distributed_table('colocated_t2','a',colocate_with=>'"events.Energy Added"'); + +create table colocated_t3 (a int); +select create_distributed_table('colocated_t3','a',colocate_with=>'"events.Energy Added"'); + +SET client_min_messages TO DEBUG4; +SELECT * FROM get_rebalance_table_shards_plan('colocated_t1', rebalance_strategy := 'by_disk_size'); +RESET client_min_messages; + +DROP TABLE "events.Energy Added", colocated_t1, colocated_t2, colocated_t3; +RESET citus.shard_count;