diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 10a4ff5b6..6d26263f5 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -35,6 +35,7 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_safe_lib.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -50,6 +51,7 @@ #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" @@ -75,9 +77,13 @@ static uint64 DistributedTableSize(Oid relationId, char *sizeQuery); static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQuery); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); +static char * GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList); +static char * GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode); +static List * GenerateShardSizesQueryList(List *workerNodeList); static void ErrorIfNotSuitableToGetSize(Oid relationId); static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId); +static List * OpenConnectionToNodes(List *workerNodeList); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_table_size); @@ -154,6 +160,48 @@ citus_relation_size(PG_FUNCTION_ARGS) } +/* + * OpenConnectionToNodes opens a single connection per node + * for the given workerNodeList. + */ +static List * +OpenConnectionToNodes(List *workerNodeList) +{ + List *connectionList = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + const char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int connectionFlags = 0; + + MultiConnection *connection = StartNodeConnection(connectionFlags, nodeName, + nodePort); + + connectionList = lappend(connectionList, connection); + } + return connectionList; +} + + +/* + * GenerateShardSizesQueryList generates a query per node that + * will return all shard_name, shard_size pairs from the node. + */ +static List * +GenerateShardSizesQueryList(List *workerNodeList) +{ + List *shardSizesQueryList = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + char *shardSizesQuery = GenerateAllShardNameAndSizeQueryForNode(workerNode); + shardSizesQueryList = lappend(shardSizesQueryList, shardSizesQuery); + } + return shardSizesQueryList; +} + + /* * DistributedTableSize is helper function for each kind of citus size functions. * It first checks whether the table is distributed and size query can be run on @@ -352,6 +400,62 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery) } +/* + * GenerateAllShardNameAndSizeQueryForNode generates a query that returns all + * shard_name, shard_size pairs for the given node. + */ +static char * +GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode) +{ + List *allCitusTableIds = AllCitusTableIds(); + + StringInfo allShardNameAndSizeQuery = makeStringInfo(); + + Oid relationId = InvalidOid; + foreach_oid(relationId, allCitusTableIds) + { + List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); + char *shardNameAndSizeQuery = + GenerateShardNameAndSizeQueryForShardList(shardIntervalsOnNode); + appendStringInfoString(allShardNameAndSizeQuery, shardNameAndSizeQuery); + } + + /* Add a dummy entry so that UNION ALL doesn't complain */ + appendStringInfo(allShardNameAndSizeQuery, "SELECT NULL::text, 0::bigint;"); + return allShardNameAndSizeQuery->data; +} + + +/* + * GenerateShardNameAndSizeQueryForShardList generates a SELECT shard_name - shard_size query to get + * size of multiple tables. + */ +static char * +GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList) +{ + StringInfo selectQuery = makeStringInfo(); + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + uint64 shardId = shardInterval->shardId; + Oid schemaId = get_rel_namespace(shardInterval->relationId); + char *schemaName = get_namespace_name(schemaId); + char *shardName = get_rel_name(shardInterval->relationId); + AppendShardIdToName(&shardName, shardId); + + char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + char *quotedShardName = quote_literal_cstr(shardQualifiedName); + + appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName); + appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); + appendStringInfo(selectQuery, " UNION ALL "); + } + + return selectQuery->data; +} + + /* * ErrorIfNotSuitableToGetSize determines whether the table is suitable to find * its' size with internal functions.