From d74d358a45c5cf40d3ae05fb10ced24f458e14c1 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 12 Apr 2021 17:14:29 +0300 Subject: [PATCH] Refactor size queries with new enum SizeQueryType (#4898) * Refactor size queries with new enum SizeQueryType * Polish --- .../distributed/metadata/metadata_utility.c | 85 ++++++++++++++----- .../distributed/operations/shard_rebalancer.c | 3 +- src/include/distributed/metadata_utility.h | 16 +++- 3 files changed, 80 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 3bf034ad9..95977ce7c 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -71,14 +71,15 @@ static uint64 * AllocateUint64(uint64 value); static void RecordDistributedRelationDependencies(Oid distributedRelationId); static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc, HeapTuple heapTuple); -static bool DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, - uint64 *tableSize); +static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, + bool failOnError, uint64 *tableSize); static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, - char *sizeQuery, bool failOnError, + SizeQueryType sizeQueryType, bool failOnError, uint64 *tableSize); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool useShardMinMaxQuery); +static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool useShardMinMaxQuery); @@ -143,18 +144,18 @@ citus_total_relation_size(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); bool failOnError = PG_GETARG_BOOL(1); - char *tableSizeFunction = PG_TOTAL_RELATION_SIZE_FUNCTION; + SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE; CheckCitusVersion(ERROR); if (CStoreTable(relationId)) { - tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION; + sizeQueryType = CSTORE_TABLE_SIZE; } uint64 tableSize = 0; - if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize)) + if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize)) { Assert(!failOnError); PG_RETURN_NULL(); @@ -173,18 +174,18 @@ citus_table_size(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); bool failOnError = true; - char *tableSizeFunction = PG_TABLE_SIZE_FUNCTION; + SizeQueryType sizeQueryType = TABLE_SIZE; CheckCitusVersion(ERROR); if (CStoreTable(relationId)) { - tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION; + sizeQueryType = CSTORE_TABLE_SIZE; } uint64 tableSize = 0; - if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize)) + if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize)) { Assert(!failOnError); PG_RETURN_NULL(); @@ -203,18 +204,18 @@ citus_relation_size(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); bool failOnError = true; - char *tableSizeFunction = PG_RELATION_SIZE_FUNCTION; + SizeQueryType sizeQueryType = RELATION_SIZE; CheckCitusVersion(ERROR); if (CStoreTable(relationId)) { - tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION; + sizeQueryType = CSTORE_TABLE_SIZE; } uint64 relationSize = 0; - if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &relationSize)) + if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &relationSize)) { Assert(!failOnError); PG_RETURN_NULL(); @@ -389,7 +390,8 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore * it. Connection to each node has to be established to get the size of the table. */ static bool -DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *tableSize) +DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError, + uint64 *tableSize) { int logLevel = WARNING; @@ -430,7 +432,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 * { uint64 relationSizeOnNode = 0; - bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQuery, + bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQueryType, failOnError, &relationSizeOnNode); if (!gotSize) { @@ -452,7 +454,8 @@ DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 * * shard placement. */ static bool -DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery, +DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, + SizeQueryType sizeQueryType, bool failOnError, uint64 *tableSize) { int logLevel = WARNING; @@ -471,7 +474,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements( shardIntervalsOnNode, - sizeQuery); + sizeQueryType); MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); @@ -591,12 +594,13 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId) /* * GenerateSizeQueryOnMultiplePlacements generates a select size query to get * size of multiple tables. Note that, different size functions supported by PG - * are also supported by this function changing the size query given as the - * last parameter to function. Format of sizeQuery is pg_*_size(%s). Examples - * of it can be found in the coordinator_protocol.h + * are also supported by this function changing the size query type given as the + * last parameter to function. Depending on the sizeQueryType enum parameter, the + * generated query will be pg_relation_size or pg_total_relation_size. */ StringInfo -GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) +GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, + SizeQueryType sizeQueryType) { StringInfo selectQuery = makeStringInfo(); @@ -614,7 +618,9 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *quotedShardName = quote_literal_cstr(shardQualifiedName); - appendStringInfo(selectQuery, sizeQuery, quotedShardName); + appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType), + quotedShardName); + appendStringInfo(selectQuery, " + "); } @@ -628,6 +634,43 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) } +/* + * GetSizeQueryBySizeQueryType returns the corresponding size query for given query type. + * Errors out for an invalid query type. + */ +static char * +GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType) +{ + switch (sizeQueryType) + { + case RELATION_SIZE: + { + return PG_RELATION_SIZE_FUNCTION; + } + + case TOTAL_RELATION_SIZE: + { + return PG_TOTAL_RELATION_SIZE_FUNCTION; + } + + case CSTORE_TABLE_SIZE: + { + return CSTORE_TABLE_SIZE_FUNCTION; + } + + case TABLE_SIZE: + { + return PG_TABLE_SIZE_FUNCTION; + } + + default: + { + elog(ERROR, "Size query type couldn't be found."); + } + } +} + + /* * GenerateAllShardStatisticsQueryForNode generates a query that returns: * - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 977467237..8c966c82a 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -449,11 +449,10 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) uint32 connectionFlag = 0; PGresult *result = NULL; bool raiseErrors = true; - char *sizeQuery = PG_TOTAL_RELATION_SIZE_FUNCTION; ShardInterval *shardInterval = LoadShardInterval(shardId); List *colocatedShardList = ColocatedShardIntervalList(shardInterval); StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList, - sizeQuery); + TOTAL_RELATION_SIZE); MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index e86dc2f73..384838e97 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -177,6 +177,20 @@ typedef struct TableConversionReturn }TableConversionReturn; +/* + * Size query types for PG and Citus + * For difference details, please see: + * https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-ADMIN-DBSIZE + */ +typedef enum SizeQueryType +{ + RELATION_SIZE, /* pg_relation_size() */ + TOTAL_RELATION_SIZE, /* pg_total_relation_size() */ + TABLE_SIZE, /* pg_table_size() */ + CSTORE_TABLE_SIZE /* cstore_table_size() */ +} SizeQueryType; + + /* Config variable managed via guc.c */ extern int ReplicationModel; @@ -203,7 +217,7 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, - char *sizeQuery); + SizeQueryType sizeQueryType); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);