diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index c34d59db4..6786ec4e6 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -79,6 +79,7 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool useShardMinMaxQuery); +static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType); static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool @@ -472,9 +473,15 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); + /* + * We pass false here, because if we optimize this, we would include child tables. + * But citus size functions shouldn't include them, like PG. + */ + bool optimizePartitionCalculations = false; StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements( shardIntervalsOnNode, - sizeQueryType); + sizeQueryType, + optimizePartitionCalculations); MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); @@ -598,10 +605,14 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId) * last parameter to function. Depending on the sizeQueryType enum parameter, the * generated query will call one of the functions: pg_relation_size, * pg_total_relation_size, pg_table_size and cstore_table_size. + * This function uses UDFs named worker_partitioned_*_size for partitioned tables, + * if the parameter optimizePartitionCalculations is true. The UDF to be called is + * determined by the parameter sizeQueryType. */ StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, - SizeQueryType sizeQueryType) + SizeQueryType sizeQueryType, + bool optimizePartitionCalculations) { StringInfo selectQuery = makeStringInfo(); @@ -610,6 +621,18 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { + if (optimizePartitionCalculations && PartitionTable(shardInterval->relationId)) + { + /* + * Skip child tables of a partitioned table as they are already counted in + * worker_partitioned_*_size UDFs, if optimizePartitionCalculations is true. + * We don't expect this case to happen, since we don't send the child tables + * to this function. Because they are all eliminated in + * ColocatedNonPartitionShardIntervalList. Therefore we can't cover here with + * a test currently. This is added for possible future usages. + */ + continue; + } uint64 shardId = shardInterval->shardId; Oid schemaId = get_rel_namespace(shardInterval->relationId); char *schemaName = get_namespace_name(schemaId); @@ -619,8 +642,16 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); char *quotedShardName = quote_literal_cstr(shardQualifiedName); - appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType), - quotedShardName); + if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId)) + { + appendStringInfo(selectQuery, GetWorkerPartitionedSizeUDFNameBySizeQueryType( + sizeQueryType), quotedShardName); + } + else + { + appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType), + quotedShardName); + } appendStringInfo(selectQuery, " + "); } @@ -635,6 +666,42 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, } +/* + * GetWorkerPartitionedSizeUDFNameBySizeQueryType returns the corresponding worker + * partitioned size query for given query type. + * Errors out for an invalid query type. + * Currently this function is only called with the type TOTAL_RELATION_SIZE. + * The others are added for possible future usages. Since they are not used anywhere, + * currently we can't cover them with tests. + */ +static char * +GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType) +{ + switch (sizeQueryType) + { + case RELATION_SIZE: + { + return WORKER_PARTITIONED_RELATION_SIZE_FUNCTION; + } + + case TOTAL_RELATION_SIZE: + { + return WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION; + } + + case TABLE_SIZE: + { + return WORKER_PARTITIONED_TABLE_SIZE_FUNCTION; + } + + default: + { + elog(ERROR, "Size query type couldn't be found."); + } + } +} + + /* * GetSizeQueryBySizeQueryType returns the corresponding size query for given query type. * Errors out for an invalid query type. diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 8c966c82a..df35bc861 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -449,10 +449,14 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) uint32 connectionFlag = 0; PGresult *result = NULL; bool raiseErrors = true; + + /* we skip child tables of a partitioned table if this boolean variable is true */ + bool optimizePartitionCalculations = true; ShardInterval *shardInterval = LoadShardInterval(shardId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval); StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList, - TOTAL_RELATION_SIZE); + TOTAL_RELATION_SIZE, + optimizePartitionCalculations); MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName, workerNodePort); diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 9ca79aea7..b38b4cee1 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -26,6 +26,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -989,6 +990,72 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) } +/* + * ColocatedNonPartitionShardIntervalList function returns list of shard intervals + * which are co-located with given shard, except partitions. If given shard is belong + * to append or range distributed table, co-location is not valid for that shard. + * Therefore such shard is only co-located with itself. + */ +List * +ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval) +{ + Oid distributedTableId = shardInterval->relationId; + List *colocatedShardList = NIL; + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); + + /* + * If distribution type of the table is append or range, each shard of the shard + * is only co-located with itself. We don't expect this case to happen, since + * distributing partitioned tables in only supported for hash-distributed tables. + * Therefore, currently we can't cover here with a test. + */ + if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED)) + { + ShardInterval *copyShardInterval = CopyShardInterval(shardInterval); + + colocatedShardList = lappend(colocatedShardList, copyShardInterval); + + return colocatedShardList; + } + + int shardIntervalIndex = ShardIndex(shardInterval); + List *colocatedTableList = ColocatedTableList(distributedTableId); + + /* ShardIndex have to find index of given shard */ + Assert(shardIntervalIndex >= 0); + + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + if (PartitionTable(colocatedTableId)) + { + continue; + } + + CitusTableCacheEntry *colocatedTableCacheEntry = + GetCitusTableCacheEntry(colocatedTableId); + + /* + * Since we iterate over co-located tables, shard count of each table should be + * same and greater than shardIntervalIndex. + */ + Assert(cacheEntry->shardIntervalArrayLength == + colocatedTableCacheEntry->shardIntervalArrayLength); + + ShardInterval *colocatedShardInterval = + colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; + + ShardInterval *copyShardInterval = CopyShardInterval(colocatedShardInterval); + + colocatedShardList = lappend(colocatedShardList, copyShardInterval); + } + + return SortList(colocatedShardList, CompareShardIntervalsById); +} + + /* * ColocatedTableId returns an arbitrary table which belongs to given colocation * group. If there is not such a colocation group, it returns invalid oid. diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 2ac89dbbc..d2ee3d0be 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -23,6 +23,7 @@ extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); +extern List * ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType, diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 384838e97..4f6050b71 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -34,6 +34,10 @@ #define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)" #define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)" #define CSTORE_TABLE_SIZE_FUNCTION "cstore_table_size(%s)" +#define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)" +#define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)" +#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ + "worker_partitioned_relation_total_size(%s)" #define SHARD_SIZES_COLUMN_COUNT 2 #define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4 @@ -217,7 +221,8 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, - SizeQueryType sizeQueryType); + SizeQueryType sizeQueryType, + bool optimizePartitionCalculations); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 402cfdae3..7d99c754b 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -2080,14 +2080,21 @@ ERROR: non-distributed tables cannot inherit distributed tables DROP TABLE test_inheritance; -- test worker partitioned table size functions CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); - CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); - SELECT create_distributed_table('"events.Energy Added"', 'user_id'); +SELECT create_distributed_table('"events.Energy Added"', 'user_id', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); +-- test shard cost by disk size function +SELECT citus_shard_cost_by_disk_size(1660207); + citus_shard_cost_by_disk_size +--------------------------------------------------------------------- + 16384 +(1 row) + +CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); \c - - - :worker_1_port -- should not be zero because of TOAST, vm, fms SELECT worker_partitioned_table_size('"events.Energy Added_1660207"'); diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index 9f46c1205..98aa5b962 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -38,7 +38,6 @@ INSERT INTO partitioning_hash_test VALUES (4, 4); -- distribute partitioned table SELECT create_distributed_table('partitioning_test', 'id'); - SELECT create_distributed_table('partitioning_hash_test', 'id'); -- see the data is loaded to shards @@ -1227,9 +1226,12 @@ DROP TABLE test_inheritance; -- test worker partitioned table size functions CREATE TABLE "events.Energy Added" (user_id int, time timestamp with time zone, data jsonb, PRIMARY KEY (user_id, time )) PARTITION BY RANGE ("time"); - CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); - SELECT create_distributed_table('"events.Energy Added"', 'user_id'); +SELECT create_distributed_table('"events.Energy Added"', 'user_id', colocate_with:='none'); CREATE TABLE "Energy Added_17634" PARTITION OF "events.Energy Added" FOR VALUES FROM ('2018-04-13 00:00:00+00') TO ('2018-04-14 00:00:00+00'); + +-- test shard cost by disk size function +SELECT citus_shard_cost_by_disk_size(1660207); +CREATE INDEX idx_btree_hobbies ON "events.Energy Added" USING BTREE ((data->>'location')); \c - - - :worker_1_port -- should not be zero because of TOAST, vm, fms SELECT worker_partitioned_table_size('"events.Energy Added_1660207"');