Refactor size queries with new enum SizeQueryType (#4898)

* Refactor size queries with new enum SizeQueryType

* Polish
pull/4869/head
Ahmet Gedemenli 2021-04-12 17:14:29 +03:00 committed by GitHub
parent b453563e88
commit d74d358a45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 24 deletions

View File

@ -71,14 +71,15 @@ static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static bool DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError,
uint64 *tableSize);
static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize);
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery, bool failOnError,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
useShardMinMaxQuery);
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
List *citusTableIds, bool
useShardMinMaxQuery);
@ -143,18 +144,18 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
bool failOnError = PG_GETARG_BOOL(1);
char *tableSizeFunction = PG_TOTAL_RELATION_SIZE_FUNCTION;
SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
sizeQueryType = CSTORE_TABLE_SIZE;
}
uint64 tableSize = 0;
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
@ -173,18 +174,18 @@ citus_table_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
char *tableSizeFunction = PG_TABLE_SIZE_FUNCTION;
SizeQueryType sizeQueryType = TABLE_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
sizeQueryType = CSTORE_TABLE_SIZE;
}
uint64 tableSize = 0;
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &tableSize))
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
@ -203,18 +204,18 @@ citus_relation_size(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
char *tableSizeFunction = PG_RELATION_SIZE_FUNCTION;
SizeQueryType sizeQueryType = RELATION_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
tableSizeFunction = CSTORE_TABLE_SIZE_FUNCTION;
sizeQueryType = CSTORE_TABLE_SIZE;
}
uint64 relationSize = 0;
if (!DistributedTableSize(relationId, tableSizeFunction, failOnError, &relationSize))
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
@ -389,7 +390,8 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore
* it. Connection to each node has to be established to get the size of the table.
*/
static bool
DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *tableSize)
DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize)
{
int logLevel = WARNING;
@ -430,7 +432,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *
{
uint64 relationSizeOnNode = 0;
bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQuery,
bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQueryType,
failOnError, &relationSizeOnNode);
if (!gotSize)
{
@ -452,7 +454,8 @@ DistributedTableSize(Oid relationId, char *sizeQuery, bool failOnError, uint64 *
* shard placement.
*/
static bool
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery,
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize)
{
int logLevel = WARNING;
@ -471,7 +474,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode,
sizeQuery);
sizeQueryType);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);
@ -591,12 +594,13 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
/*
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
* size of multiple tables. Note that, different size functions supported by PG
* are also supported by this function changing the size query given as the
* last parameter to function. Format of sizeQuery is pg_*_size(%s). Examples
* of it can be found in the coordinator_protocol.h
* are also supported by this function changing the size query type given as the
* last parameter to function. Depending on the sizeQueryType enum parameter, the
* generated query will be pg_relation_size or pg_total_relation_size.
*/
StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType)
{
StringInfo selectQuery = makeStringInfo();
@ -614,7 +618,9 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
appendStringInfo(selectQuery, sizeQuery, quotedShardName);
appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType),
quotedShardName);
appendStringInfo(selectQuery, " + ");
}
@ -628,6 +634,43 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
}
/*
* GetSizeQueryBySizeQueryType returns the corresponding size query for given query type.
* Errors out for an invalid query type.
*/
static char *
GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType)
{
switch (sizeQueryType)
{
case RELATION_SIZE:
{
return PG_RELATION_SIZE_FUNCTION;
}
case TOTAL_RELATION_SIZE:
{
return PG_TOTAL_RELATION_SIZE_FUNCTION;
}
case CSTORE_TABLE_SIZE:
{
return CSTORE_TABLE_SIZE_FUNCTION;
}
case TABLE_SIZE:
{
return PG_TABLE_SIZE_FUNCTION;
}
default:
{
elog(ERROR, "Size query type couldn't be found.");
}
}
}
/*
* GenerateAllShardStatisticsQueryForNode generates a query that returns:
* - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false)

View File

@ -449,11 +449,10 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
uint32 connectionFlag = 0;
PGresult *result = NULL;
bool raiseErrors = true;
char *sizeQuery = PG_TOTAL_RELATION_SIZE_FUNCTION;
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
sizeQuery);
TOTAL_RELATION_SIZE);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);

View File

@ -177,6 +177,20 @@ typedef struct TableConversionReturn
}TableConversionReturn;
/*
* Size query types for PG and Citus
* For difference details, please see:
* https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-ADMIN-DBSIZE
*/
typedef enum SizeQueryType
{
RELATION_SIZE, /* pg_relation_size() */
TOTAL_RELATION_SIZE, /* pg_total_relation_size() */
TABLE_SIZE, /* pg_table_size() */
CSTORE_TABLE_SIZE /* cstore_table_size() */
} SizeQueryType;
/* Config variable managed via guc.c */
extern int ReplicationModel;
@ -203,7 +217,7 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *sizeQuery);
SizeQueryType sizeQueryType);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);