Use SUM for calculating non partitioned table sizes (#6222)

We currently do a `pg_relation_total_size('t1') + pg_relation_total_size('t2') + ..` on shard lists, especially when rebalancing the shards. This in some cases goes huge. With this PR, we basically use a SUM for all table sizes, instead of using thousands of pluses.
pull/6261/head
Ahmet Gedemenli 2022-08-26 18:02:14 +03:00 committed by GitHub
parent 4df8eca77f
commit 0855a9d1d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 12 deletions

View File

@ -81,6 +81,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
@ -720,7 +722,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{
StringInfo selectQuery = makeStringInfo();
appendStringInfo(selectQuery, "SELECT ");
List *partitionedShardNames = NIL;
List *nonPartitionedShardNames = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
@ -746,30 +749,76 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
/* for partitoned tables, we will call worker_partitioned_... size functions */
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
{
appendStringInfo(selectQuery, GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType), quotedShardName);
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
}
/* for non-partitioned tables, we will use Postgres' size functions */
else
{
appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType),
quotedShardName);
nonPartitionedShardNames = lappend(nonPartitionedShardNames, quotedShardName);
}
appendStringInfo(selectQuery, " + ");
}
/*
* Add 0 as a last size, it handles empty list case and makes size control checks
* unnecessary which would have implemented without this line.
*/
appendStringInfo(selectQuery, "0;");
/* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */
char *subqueryForPartitionedShards =
GenerateSizeQueryForRelationNameList(partitionedShardNames,
GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType));
/* SELECT SUM(pg_..._size) FROM VALUES (...) */
char *subqueryForNonPartitionedShards =
GenerateSizeQueryForRelationNameList(nonPartitionedShardNames,
GetSizeQueryBySizeQueryType(sizeQueryType));
appendStringInfo(selectQuery, "SELECT (%s) + (%s);",
subqueryForPartitionedShards, subqueryForNonPartitionedShards);
elog(DEBUG4, "Size Query: %s", selectQuery->data);
return selectQuery;
}
/*
* GenerateSizeQueryForPartitionedShards generates and returns a query with a template:
* SELECT SUM( <sizeFunction>(relid) ) FROM (VALUES (<shardName>), (<shardName>), ...) as q(relid)
*/
static char *
GenerateSizeQueryForRelationNameList(List *quotedShardNames, char *sizeFunction)
{
if (list_length(quotedShardNames) == 0)
{
return "SELECT 0";
}
StringInfo selectQuery = makeStringInfo();
appendStringInfo(selectQuery, "SELECT SUM(");
appendStringInfo(selectQuery, sizeFunction, "relid");
appendStringInfo(selectQuery, ") FROM (VALUES ");
bool addComma = false;
char *quotedShardName = NULL;
foreach_ptr(quotedShardName, quotedShardNames)
{
if (addComma)
{
appendStringInfoString(selectQuery, ", ");
}
addComma = true;
appendStringInfo(selectQuery, "(%s)", quotedShardName);
}
appendStringInfoString(selectQuery, ") as q(relid)");
return selectQuery->data;
}
/*
* GetWorkerPartitionedSizeUDFNameBySizeQueryType returns the corresponding worker
* partitioned size query for given query type.

View File

@ -2474,3 +2474,52 @@ SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_li
(1 row)
DROP TABLE test_with_all_shards_excluded;
SET citus.shard_count TO 2;
CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time");
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location'));
SELECT create_distributed_table('"events.Energy Added"', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00');
CREATE TABLE "Energy Added_17635" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-14 00:00:00+00') TO ('2018-04-15 00:00:00+00');
create table colocated_t1 (a int);
select create_distributed_table('colocated_t1','a',colocate_with=>'"events.Energy Added"');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table colocated_t2 (a int);
select create_distributed_table('colocated_t2','a',colocate_with=>'"events.Energy Added"');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table colocated_t3 (a int);
select create_distributed_table('colocated_t3','a',colocate_with=>'"events.Energy Added"');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO DEBUG4;
SELECT * FROM get_rebalance_table_shards_plan('colocated_t1', rebalance_strategy := 'by_disk_size');
DEBUG: skipping child tables for relation named: colocated_t1
DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid));
DEBUG: skipping child tables for relation named: colocated_t1
DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433508"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433514'), ('public.colocated_t2_433516'), ('public.colocated_t3_433518')) as q(relid));
DEBUG: skipping child tables for relation named: colocated_t1
DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid));
DEBUG: skipping child tables for relation named: colocated_t1
DEBUG: Size Query: SELECT (SELECT SUM(worker_partitioned_relation_total_size(relid)) FROM (VALUES ('public."events.Energy Added_433509"')) as q(relid)) + (SELECT SUM(pg_total_relation_size(relid)) FROM (VALUES ('public.colocated_t1_433515'), ('public.colocated_t2_433517'), ('public.colocated_t3_433519')) as q(relid));
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
DROP TABLE "events.Energy Added", colocated_t1, colocated_t2, colocated_t3;
RESET citus.shard_count;

View File

@ -1417,3 +1417,27 @@ SELECT shardid FROM pg_dist_shard;
SELECT rebalance_table_shards('test_with_all_shards_excluded', excluded_shard_list:='{102073, 102074, 102075, 102076}');
DROP TABLE test_with_all_shards_excluded;
SET citus.shard_count TO 2;
CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time");
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location'));
SELECT create_distributed_table('"events.Energy Added"', 'user_id');
CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00');
CREATE TABLE "Energy Added_17635" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-14 00:00:00+00') TO ('2018-04-15 00:00:00+00');
create table colocated_t1 (a int);
select create_distributed_table('colocated_t1','a',colocate_with=>'"events.Energy Added"');
create table colocated_t2 (a int);
select create_distributed_table('colocated_t2','a',colocate_with=>'"events.Energy Added"');
create table colocated_t3 (a int);
select create_distributed_table('colocated_t3','a',colocate_with=>'"events.Energy Added"');
SET client_min_messages TO DEBUG4;
SELECT * FROM get_rebalance_table_shards_plan('colocated_t1', rebalance_strategy := 'by_disk_size');
RESET client_min_messages;
DROP TABLE "events.Energy Added", colocated_t1, colocated_t2, colocated_t3;
RESET citus.shard_count;