mirror of https://github.com/citusdata/citus.git
suggestions
parent
7d2eb738f0
commit
795b0d82ee
|
@ -91,10 +91,8 @@ static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
|
||||||
HeapTuple heapTuple);
|
HeapTuple heapTuple);
|
||||||
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||||
bool failOnError, uint64 *relationSize);
|
bool failOnError, uint64 *relationSize);
|
||||||
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode,
|
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
Oid relationId, Oid indexId,
|
SizeQueryType sizeQueryType, bool failOnError,
|
||||||
SizeQueryType sizeQueryType,
|
|
||||||
bool failOnError,
|
|
||||||
uint64 *relationSize);
|
uint64 *relationSize);
|
||||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||||
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
|
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
|
||||||
|
@ -522,11 +520,6 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||||
{
|
{
|
||||||
int logLevel = WARNING;
|
int logLevel = WARNING;
|
||||||
|
|
||||||
/*
|
|
||||||
* By default we suppose we examine a table, not an index.
|
|
||||||
*/
|
|
||||||
Oid tableId = relationId;
|
|
||||||
|
|
||||||
if (failOnError)
|
if (failOnError)
|
||||||
{
|
{
|
||||||
logLevel = ERROR;
|
logLevel = ERROR;
|
||||||
|
@ -554,16 +547,14 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
Oid checkRelId = relationId;
|
||||||
* If the relation is an index, update tableId and define indexId
|
|
||||||
*/
|
|
||||||
if (relation->rd_rel->relkind == RELKIND_INDEX)
|
if (relation->rd_rel->relkind == RELKIND_INDEX)
|
||||||
{
|
{
|
||||||
indexId = relationId;
|
bool missingOk = false;
|
||||||
tableId = IndexGetRelation(relation->rd_id, false);
|
checkRelId = IndexGetRelation(relation->rd_id, missingOk);
|
||||||
}
|
}
|
||||||
|
|
||||||
ErrorIfNotSuitableToGetSize(tableId);
|
ErrorIfNotSuitableToGetSize(checkRelId);
|
||||||
|
|
||||||
table_close(relation, AccessShareLock);
|
table_close(relation, AccessShareLock);
|
||||||
|
|
||||||
|
@ -573,8 +564,8 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||||
{
|
{
|
||||||
uint64 relationSizeOnNode = 0;
|
uint64 relationSizeOnNode = 0;
|
||||||
|
|
||||||
bool gotSize = DistributedRelationSizeOnWorker(workerNode, tableId,
|
bool gotSize = DistributedRelationSizeOnWorker(workerNode, relationId,
|
||||||
indexId, sizeQueryType,
|
sizeQueryType,
|
||||||
failOnError, &relationSizeOnNode);
|
failOnError, &relationSizeOnNode);
|
||||||
if (!gotSize)
|
if (!gotSize)
|
||||||
{
|
{
|
||||||
|
@ -597,7 +588,7 @@ DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
Oid indexId, SizeQueryType sizeQueryType,
|
SizeQueryType sizeQueryType,
|
||||||
bool failOnError, uint64 *relationSize)
|
bool failOnError, uint64 *relationSize)
|
||||||
{
|
{
|
||||||
int logLevel = WARNING;
|
int logLevel = WARNING;
|
||||||
|
@ -612,6 +603,16 @@ DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
uint32 connectionFlag = 0;
|
uint32 connectionFlag = 0;
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
|
|
||||||
|
/* if the relation is an index, update relationId and define indexId */
|
||||||
|
Oid indexId = InvalidOid;
|
||||||
|
if (get_rel_relkind(relationId) == RELKIND_INDEX)
|
||||||
|
{
|
||||||
|
indexId = relationId;
|
||||||
|
|
||||||
|
bool missingOk = false;
|
||||||
|
relationId = IndexGetRelation(indexId, missingOk);
|
||||||
|
}
|
||||||
|
|
||||||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -630,15 +631,6 @@ DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||||
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
|
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
|
||||||
&result);
|
&result);
|
||||||
|
|
||||||
/*
|
|
||||||
* After this point, we don't need to keep relationId pointing to the table.
|
|
||||||
* So we swap with indexId if relevant, only for error log...
|
|
||||||
*/
|
|
||||||
if (indexId != InvalidOid)
|
|
||||||
{
|
|
||||||
relationId = indexId;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queryResult != 0)
|
if (queryResult != 0)
|
||||||
{
|
{
|
||||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
@ -771,6 +763,9 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
|
||||||
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
|
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
|
||||||
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
|
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
|
||||||
* determined by the parameter sizeQueryType.
|
* determined by the parameter sizeQueryType.
|
||||||
|
*
|
||||||
|
* indexId is provided if we're interested in the size of an index, not the whole
|
||||||
|
* table.
|
||||||
*/
|
*/
|
||||||
StringInfo
|
StringInfo
|
||||||
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
|
@ -802,11 +797,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||||
/*
|
/*
|
||||||
* We need to build the shard relation name, being an index or table ...
|
* We need to build the shard relation name, being an index or table ...
|
||||||
*/
|
*/
|
||||||
Oid objectId = shardInterval->relationId;
|
Oid objectId = OidIsValid(indexId) ? indexId : shardInterval->relationId;
|
||||||
if (indexId != InvalidOid)
|
|
||||||
{
|
|
||||||
objectId = indexId;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
Oid schemaId = get_rel_namespace(objectId);
|
Oid schemaId = get_rel_namespace(objectId);
|
||||||
|
|
|
@ -792,8 +792,12 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
|
||||||
|
|
||||||
/* we skip child tables of a partitioned table if this boolean variable is true */
|
/* we skip child tables of a partitioned table if this boolean variable is true */
|
||||||
bool optimizePartitionCalculations = true;
|
bool optimizePartitionCalculations = true;
|
||||||
|
|
||||||
|
/* we're interested in whole table, not a particular index */
|
||||||
|
Oid indexId = InvalidOid;
|
||||||
|
|
||||||
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
|
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
|
||||||
InvalidOid,
|
indexId,
|
||||||
TOTAL_RELATION_SIZE,
|
TOTAL_RELATION_SIZE,
|
||||||
optimizePartitionCalculations);
|
optimizePartitionCalculations);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue