Fix #6496 about citus index size

I just enhanced the existing code to check if the relation is an index
belonging to a distributed table.
If so the shardId is appended to relation (index) name and the *_size
function are executed as before.

There is a change in an extern function:
  `extern StringInfo GenerateSizeQueryOnMultiplePlacements(...)`
It's possible to create a new function and deprecate this one later if
compatibility is an issue.
remotes/c2main/clean-6496
Cédric Villemain 2023-10-21 23:05:51 +02:00
parent 2c92cc08d7
commit 7d2eb738f0
3 changed files with 46 additions and 8 deletions

View File

@ -24,6 +24,7 @@
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
@ -91,7 +92,7 @@ static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType, static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize); bool failOnError, uint64 *relationSize);
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode, static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode,
Oid relationId, Oid relationId, Oid indexId,
SizeQueryType sizeQueryType, SizeQueryType sizeQueryType,
bool failOnError, bool failOnError,
uint64 *relationSize); uint64 *relationSize);
@ -521,6 +522,11 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
{ {
int logLevel = WARNING; int logLevel = WARNING;
/*
* By default we suppose we examine a table, not an index.
*/
Oid tableId = relationId;
if (failOnError) if (failOnError)
{ {
logLevel = ERROR; logLevel = ERROR;
@ -548,7 +554,16 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
return false; return false;
} }
ErrorIfNotSuitableToGetSize(relationId); /*
* If the relation is an index, update tableId and define indexId
*/
if (relation->rd_rel->relkind == RELKIND_INDEX)
{
indexId = relationId;
tableId = IndexGetRelation(relation->rd_id, false);
}
ErrorIfNotSuitableToGetSize(tableId);
table_close(relation, AccessShareLock); table_close(relation, AccessShareLock);
@ -558,8 +573,8 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
{ {
uint64 relationSizeOnNode = 0; uint64 relationSizeOnNode = 0;
bool gotSize = DistributedRelationSizeOnWorker(workerNode, relationId, bool gotSize = DistributedRelationSizeOnWorker(workerNode, tableId,
sizeQueryType, indexId, sizeQueryType,
failOnError, &relationSizeOnNode); failOnError, &relationSizeOnNode);
if (!gotSize) if (!gotSize)
{ {
@ -578,11 +593,11 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
/* /*
* DistributedRelationSizeOnWorker gets the workerNode and relationId to calculate * DistributedRelationSizeOnWorker gets the workerNode and relationId to calculate
* size of that relation on the given workerNode by summing up the size of each * size of that relation on the given workerNode by summing up the size of each
* shard placement. * shard placement. If indexId is defined then the relation is an index.
*/ */
static bool static bool
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId, DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, Oid indexId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize) bool failOnError, uint64 *relationSize)
{ {
int logLevel = WARNING; int logLevel = WARNING;
@ -606,6 +621,7 @@ DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
bool optimizePartitionCalculations = false; bool optimizePartitionCalculations = false;
StringInfo relationSizeQuery = GenerateSizeQueryOnMultiplePlacements( StringInfo relationSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode, shardIntervalsOnNode,
indexId,
sizeQueryType, sizeQueryType,
optimizePartitionCalculations); optimizePartitionCalculations);
@ -614,6 +630,15 @@ DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data, int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
&result); &result);
/*
* After this point, we don't need to keep relationId pointing to the table.
* So we swap with indexId if relevant, only for error log...
*/
if (indexId != InvalidOid)
{
relationId = indexId;
}
if (queryResult != 0) if (queryResult != 0)
{ {
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
@ -749,6 +774,7 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
*/ */
StringInfo StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType, SizeQueryType sizeQueryType,
bool optimizePartitionCalculations) bool optimizePartitionCalculations)
{ {
@ -772,10 +798,20 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
*/ */
continue; continue;
} }
/*
* We need to build the shard relation name, being an index or table ...
*/
Oid objectId = shardInterval->relationId;
if (indexId != InvalidOid)
{
objectId = indexId;
}
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId); Oid schemaId = get_rel_namespace(objectId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId); char *shardName = get_rel_name(objectId);
AppendShardIdToName(&shardName, shardId); AppendShardIdToName(&shardName, shardId);
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);

View File

@ -793,6 +793,7 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
/* we skip child tables of a partitioned table if this boolean variable is true */ /* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true; bool optimizePartitionCalculations = true;
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList, StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
InvalidOid,
TOTAL_RELATION_SIZE, TOTAL_RELATION_SIZE,
optimizePartitionCalculations); optimizePartitionCalculations);

View File

@ -342,6 +342,7 @@ extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char *
int *nodePort); int *nodePort);
extern bool IsDummyPlacement(ShardPlacement *taskPlacement); extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType, SizeQueryType sizeQueryType,
bool optimizePartitionCalculations); bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);