Optimize partitioned disk size calculation (#4905)

* Optimize partitioned disk size calculation

* Polish

* Fix test for citus_shard_cost_by_disk_size

Try optimizing if not CSTORE
expose/defer_marked_shards_API
Ahmet Gedemenli 2021-04-19 13:30:56 +03:00 committed by GitHub
parent 96278822d9
commit 33c620f232
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 165 additions and 12 deletions

View File

@ -79,6 +79,7 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool
useShardMinMaxQuery); useShardMinMaxQuery);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
List *citusTableIds, bool List *citusTableIds, bool
@ -472,9 +473,15 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
/*
* We pass false here, because if we optimize this, we would include child tables.
* But citus size functions shouldn't include them, like PG.
*/
bool optimizePartitionCalculations = false;
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements( StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode, shardIntervalsOnNode,
sizeQueryType); sizeQueryType,
optimizePartitionCalculations);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort); workerNodePort);
@ -598,10 +605,14 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
* last parameter to function. Depending on the sizeQueryType enum parameter, the * last parameter to function. Depending on the sizeQueryType enum parameter, the
* generated query will call one of the functions: pg_relation_size, * generated query will call one of the functions: pg_relation_size,
* pg_total_relation_size, pg_table_size and cstore_table_size. * pg_total_relation_size, pg_table_size and cstore_table_size.
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
* determined by the parameter sizeQueryType.
*/ */
StringInfo StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType) SizeQueryType sizeQueryType,
bool optimizePartitionCalculations)
{ {
StringInfo selectQuery = makeStringInfo(); StringInfo selectQuery = makeStringInfo();
@ -610,6 +621,18 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
if (optimizePartitionCalculations && PartitionTable(shardInterval->relationId))
{
/*
* Skip child tables of a partitioned table as they are already counted in
* worker_partitioned_*_size UDFs, if optimizePartitionCalculations is true.
* We don't expect this case to happen, since we don't send the child tables
* to this function. Because they are all eliminated in
* ColocatedNonPartitionShardIntervalList. Therefore we can't cover here with
* a test currently. This is added for possible future usages.
*/
continue;
}
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId); Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
@ -619,8 +642,16 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName); char *quotedShardName = quote_literal_cstr(shardQualifiedName);
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
{
appendStringInfo(selectQuery, GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType), quotedShardName);
}
else
{
appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType), appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType),
quotedShardName); quotedShardName);
}
appendStringInfo(selectQuery, " + "); appendStringInfo(selectQuery, " + ");
} }
@ -635,6 +666,42 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
} }
/*
* GetWorkerPartitionedSizeUDFNameBySizeQueryType returns the corresponding worker
* partitioned size query for given query type.
* Errors out for an invalid query type.
* Currently this function is only called with the type TOTAL_RELATION_SIZE.
* The others are added for possible future usages. Since they are not used anywhere,
* currently we can't cover them with tests.
*/
static char *
GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType)
{
switch (sizeQueryType)
{
case RELATION_SIZE:
{
return WORKER_PARTITIONED_RELATION_SIZE_FUNCTION;
}
case TOTAL_RELATION_SIZE:
{
return WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION;
}
case TABLE_SIZE:
{
return WORKER_PARTITIONED_TABLE_SIZE_FUNCTION;
}
default:
{
elog(ERROR, "Size query type couldn't be found.");
}
}
}
/* /*
* GetSizeQueryBySizeQueryType returns the corresponding size query for given query type. * GetSizeQueryBySizeQueryType returns the corresponding size query for given query type.
* Errors out for an invalid query type. * Errors out for an invalid query type.

View File

@ -449,10 +449,14 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
uint32 connectionFlag = 0; uint32 connectionFlag = 0;
PGresult *result = NULL; PGresult *result = NULL;
bool raiseErrors = true; bool raiseErrors = true;
/* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true;
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval); List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval);
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList, StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
TOTAL_RELATION_SIZE); TOTAL_RELATION_SIZE,
optimizePartitionCalculations);
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort); workerNodePort);

View File

@ -26,6 +26,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
@ -989,6 +990,72 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
} }
/*
* ColocatedNonPartitionShardIntervalList function returns list of shard intervals
* which are co-located with given shard, except partitions. If given shard is belong
* to append or range distributed table, co-location is not valid for that shard.
* Therefore such shard is only co-located with itself.
*/
List *
ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval)
{
Oid distributedTableId = shardInterval->relationId;
List *colocatedShardList = NIL;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
/*
* If distribution type of the table is append or range, each shard of the shard
* is only co-located with itself. We don't expect this case to happen, since
* distributing partitioned tables in only supported for hash-distributed tables.
* Therefore, currently we can't cover here with a test.
*/
if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED))
{
ShardInterval *copyShardInterval = CopyShardInterval(shardInterval);
colocatedShardList = lappend(colocatedShardList, copyShardInterval);
return colocatedShardList;
}
int shardIntervalIndex = ShardIndex(shardInterval);
List *colocatedTableList = ColocatedTableList(distributedTableId);
/* ShardIndex have to find index of given shard */
Assert(shardIntervalIndex >= 0);
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
{
if (PartitionTable(colocatedTableId))
{
continue;
}
CitusTableCacheEntry *colocatedTableCacheEntry =
GetCitusTableCacheEntry(colocatedTableId);
/*
* Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex.
*/
Assert(cacheEntry->shardIntervalArrayLength ==
colocatedTableCacheEntry->shardIntervalArrayLength);
ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
ShardInterval *copyShardInterval = CopyShardInterval(colocatedShardInterval);
colocatedShardList = lappend(colocatedShardList, copyShardInterval);
}
return SortList(colocatedShardList, CompareShardIntervalsById);
}
/* /*
* ColocatedTableId returns an arbitrary table which belongs to given colocation * ColocatedTableId returns an arbitrary table which belongs to given colocation
* group. If there is not such a colocation group, it returns invalid oid. * group. If there is not such a colocation group, it returns invalid oid.

View File

@ -23,6 +23,7 @@ extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval); ShardInterval *rightShardInterval);
extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern List * ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId); extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType, uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,

View File

@ -34,6 +34,10 @@
#define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)" #define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)"
#define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)" #define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)"
#define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)" #define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)"
#define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)"
#define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)"
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
"worker_partitioned_relation_total_size(%s)"
#define SHARD_SIZES_COLUMN_COUNT 2 #define SHARD_SIZES_COLUMN_COUNT 2
#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4 #define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4
@ -217,7 +221,8 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
SizeQueryType sizeQueryType); SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);

View File

@ -2080,14 +2080,21 @@ ERROR: non-distributed tables cannot inherit distributed tables
DROP TABLE test_inheritance; DROP TABLE test_inheritance;
-- test worker partitioned table size functions -- test worker partitioned table size functions
CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time");
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); SELECT create_distributed_table('"events.Energy Added"', 'user_id', colocate_with:='none');
SELECT create_distributed_table('"events.Energy Added"', 'user_id');
create_distributed_table create_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00');
-- test shard cost by disk size function
SELECT citus_shard_cost_by_disk_size(1660207);
citus_shard_cost_by_disk_size
---------------------------------------------------------------------
16384
(1 row)
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location'));
\c - - - :worker_1_port \c - - - :worker_1_port
-- should not be zero because of TOAST, vm, fms -- should not be zero because of TOAST, vm, fms
SELECT worker_partitioned_table_size('"events.Energy Added_1660207"'); SELECT worker_partitioned_table_size('"events.Energy Added_1660207"');

View File

@ -38,7 +38,6 @@ INSERT INTO partitioning_hash_test VALUES (4, 4);
-- distribute partitioned table -- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id'); SELECT create_distributed_table('partitioning_test', 'id');
SELECT create_distributed_table('partitioning_hash_test', 'id'); SELECT create_distributed_table('partitioning_hash_test', 'id');
-- see the data is loaded to shards -- see the data is loaded to shards
@ -1227,9 +1226,12 @@ DROP TABLE test_inheritance;
-- test worker partitioned table size functions -- test worker partitioned table size functions
CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time");
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); SELECT create_distributed_table('"events.Energy Added"', 'user_id', colocate_with:='none');
SELECT create_distributed_table('"events.Energy Added"', 'user_id');
CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00');
-- test shard cost by disk size function
SELECT citus_shard_cost_by_disk_size(1660207);
CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location'));
\c - - - :worker_1_port \c - - - :worker_1_port
-- should not be zero because of TOAST, vm, fms -- should not be zero because of TOAST, vm, fms
SELECT worker_partitioned_table_size('"events.Energy Added_1660207"'); SELECT worker_partitioned_table_size('"events.Energy Added_1660207"');