Use citus_shard_sizes in citus_tables (#7018)

Fixes #7019 

This PR updates citus_tables view to use citus_shard_sizes function,
instead of citus_total_relation_size to improve performance.
pull/7013/merge
Halil Ozan Akgül 2023-07-05 11:40:34 +03:00 committed by GitHub
parent 719d92c8b9
commit 613cced1ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 64 additions and 25 deletions

View File

@ -91,7 +91,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
@ -104,7 +105,7 @@ static List * OpenConnectionToNodes(List *workerNodeList);
static void ReceiveShardIdAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval);
static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
@ -916,6 +917,12 @@ static char *
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
{
StringInfo allShardStatisticsQuery = makeStringInfo();
bool insertedValues = false;
appendStringInfoString(allShardStatisticsQuery, "SELECT shard_id, ");
appendStringInfo(allShardStatisticsQuery, PG_TOTAL_RELATION_SIZE_FUNCTION,
"table_name");
appendStringInfoString(allShardStatisticsQuery, " FROM (VALUES ");
Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIds)
@ -930,34 +937,49 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
{
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
relationId);
char *shardStatisticsQuery =
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
if (list_length(shardIntervalsOnNode) == 0)
{
relation_close(relation, AccessShareLock);
continue;
}
char *shardIdNameValues =
GenerateShardIdNameValuesForShardList(shardIntervalsOnNode,
!insertedValues);
insertedValues = true;
appendStringInfoString(allShardStatisticsQuery, shardIdNameValues);
relation_close(relation, AccessShareLock);
}
}
/* Add a dummy entry so that UNION ALL doesn't complain */
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, 0::bigint;");
if (!insertedValues)
{
return "SELECT 0 AS shard_id, '' AS table_name LIMIT 0";
}
appendStringInfoString(allShardStatisticsQuery, ") t(shard_id, table_name) "
"WHERE to_regclass(table_name) IS NOT NULL");
return allShardStatisticsQuery->data;
}
/*
* GenerateShardStatisticsQueryForShardList generates a query that returns:
* SELECT shard_id, shard_name, shard_size for all shards in the list
* GenerateShardIdNameValuesForShardList generates a list of (shard_id, shard_name) values
* for all shards in the list
*/
static char *
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
GenerateShardIdNameValuesForShardList(List *shardIntervalList, bool firstValue)
{
StringInfo selectQuery = makeStringInfo();
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
AppendShardSizeQuery(selectQuery, shardInterval);
appendStringInfo(selectQuery, " UNION ALL ");
if (!firstValue)
{
appendStringInfoString(selectQuery, ", ");
}
firstValue = false;
AppendShardIdNameValues(selectQuery, shardInterval);
}
return selectQuery->data;
@ -965,11 +987,10 @@ GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
/*
* AppendShardSizeQuery appends a query in the following form to selectQuery
* SELECT shard_id, shard_name, shard_size
* AppendShardIdNameValues appends (shard_id, shard_name) for shard
*/
static void
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)
{
uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
@ -981,8 +1002,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, "(" UINT64_FORMAT ", %s)", shardId, quotedShardName);
}

View File

@ -25,6 +25,8 @@ GRANT SELECT ON pg_catalog.pg_dist_schema TO public;
#include "udfs/citus_drop_trigger/12.0-1.sql"
DROP VIEW citus_shards;
DROP VIEW IF EXISTS pg_catalog.citus_tables;
DROP VIEW IF EXISTS public.citus_tables;
DROP FUNCTION citus_shard_sizes;
#include "udfs/citus_shard_sizes/12.0-1.sql"

View File

@ -47,6 +47,9 @@ DROP FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(Oid, t
DROP VIEW IF EXISTS public.citus_schemas;
DROP VIEW IF EXISTS pg_catalog.citus_schemas;
DROP VIEW IF EXISTS public.citus_tables;
DROP VIEW IF EXISTS pg_catalog.citus_tables;
DROP VIEW pg_catalog.citus_shards;
DROP FUNCTION pg_catalog.citus_shard_sizes;
#include "../udfs/citus_shard_sizes/10.0-1.sql"

View File

@ -14,7 +14,7 @@ citus_tables_create_query=$CTCQ$
END AS citus_table_type,
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS distribution_column,
colocationid AS colocation_id,
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS table_size,
pg_size_pretty(table_sizes.table_size) AS table_size,
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS shard_count,
pg_get_userbyid(relowner) AS table_owner,
amname AS access_method
@ -24,6 +24,13 @@ citus_tables_create_query=$CTCQ$
pg_class c ON (p.logicalrelid = c.oid)
LEFT JOIN
pg_am a ON (a.oid = c.relam)
JOIN
(
SELECT ds.logicalrelid AS table_id, SUM(css.size) AS table_size
FROM citus_shard_sizes() css, pg_dist_shard ds
WHERE css.shard_id = ds.shardid
GROUP BY ds.logicalrelid
) table_sizes ON (table_sizes.table_id = p.logicalrelid)
WHERE
-- filter out tables owned by extensions
logicalrelid NOT IN (

View File

@ -14,7 +14,7 @@ citus_tables_create_query=$CTCQ$
END AS citus_table_type,
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS distribution_column,
colocationid AS colocation_id,
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS table_size,
pg_size_pretty(table_sizes.table_size) AS table_size,
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS shard_count,
pg_get_userbyid(relowner) AS table_owner,
amname AS access_method
@ -24,6 +24,13 @@ citus_tables_create_query=$CTCQ$
pg_class c ON (p.logicalrelid = c.oid)
LEFT JOIN
pg_am a ON (a.oid = c.relam)
JOIN
(
SELECT ds.logicalrelid AS table_id, SUM(css.size) AS table_size
FROM citus_shard_sizes() css, pg_dist_shard ds
WHERE css.shard_id = ds.shardid
GROUP BY ds.logicalrelid
) table_sizes ON (table_sizes.table_id = p.logicalrelid)
WHERE
-- filter out tables owned by extensions
logicalrelid NOT IN (

View File

@ -64,15 +64,15 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_hash');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT 0 AS shard_id, '' AS table_name LIMIT 0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981000, 'public.test_table_statistics_hash_981000'), (981001, 'public.test_table_statistics_hash_981001'), (981002, 'public.test_table_statistics_hash_981002'), (981003, 'public.test_table_statistics_hash_981003'), (981004, 'public.test_table_statistics_hash_981004'), (981005, 'public.test_table_statistics_hash_981005'), (981006, 'public.test_table_statistics_hash_981006'), (981007, 'public.test_table_statistics_hash_981007')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981000, 'public.test_table_statistics_hash_981000'), (981001, 'public.test_table_statistics_hash_981001'), (981002, 'public.test_table_statistics_hash_981002'), (981003, 'public.test_table_statistics_hash_981003'), (981004, 'public.test_table_statistics_hash_981004'), (981005, 'public.test_table_statistics_hash_981005'), (981006, 'public.test_table_statistics_hash_981006'), (981007, 'public.test_table_statistics_hash_981007')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -158,15 +158,15 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_append');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT 0 AS shard_id, '' AS table_name LIMIT 0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981008, 'public.test_table_statistics_append_981008'), (981009, 'public.test_table_statistics_append_981009')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981008, 'public.test_table_statistics_append_981008'), (981009, 'public.test_table_statistics_append_981009')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx