Add OpenConnectionToNodes and functions that generate shard queries

(ALL CODE BORROWED from commit 724d56f949)
pull/5409/head
naisila 2021-07-29 15:43:00 +03:00 committed by Naisila Puka
parent a529866f93
commit 27d74f1540
1 changed files with 104 additions and 0 deletions

View File

@ -35,6 +35,7 @@
#include "distributed/citus_nodes.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
@ -50,6 +51,7 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
@ -75,9 +77,13 @@ static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList);
static char * GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode);
static List * GenerateShardSizesQueryList(List *workerNodeList);
static void ErrorIfNotSuitableToGetSize(Oid relationId);
static ShardPlacement * ShardPlacementOnGroup(uint64 shardId, int groupId);
static List * OpenConnectionToNodes(List *workerNodeList);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_table_size);
@ -154,6 +160,48 @@ citus_relation_size(PG_FUNCTION_ARGS)
}
/*
* OpenConnectionToNodes opens a single connection per node
* for the given workerNodeList.
*/
static List *
OpenConnectionToNodes(List *workerNodeList)
{
List *connectionList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int connectionFlags = 0;
MultiConnection *connection = StartNodeConnection(connectionFlags, nodeName,
nodePort);
connectionList = lappend(connectionList, connection);
}
return connectionList;
}
/*
* GenerateShardSizesQueryList generates a query per node that
* will return all shard_name, shard_size pairs from the node.
*/
static List *
GenerateShardSizesQueryList(List *workerNodeList)
{
List *shardSizesQueryList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
char *shardSizesQuery = GenerateAllShardNameAndSizeQueryForNode(workerNode);
shardSizesQueryList = lappend(shardSizesQueryList, shardSizesQuery);
}
return shardSizesQueryList;
}
/*
* DistributedTableSize is helper function for each kind of citus size functions.
* It first checks whether the table is distributed and size query can be run on
@ -352,6 +400,62 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
}
/*
* GenerateAllShardNameAndSizeQueryForNode generates a query that returns all
* shard_name, shard_size pairs for the given node.
*/
static char *
GenerateAllShardNameAndSizeQueryForNode(WorkerNode *workerNode)
{
List *allCitusTableIds = AllCitusTableIds();
StringInfo allShardNameAndSizeQuery = makeStringInfo();
Oid relationId = InvalidOid;
foreach_oid(relationId, allCitusTableIds)
{
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
char *shardNameAndSizeQuery =
GenerateShardNameAndSizeQueryForShardList(shardIntervalsOnNode);
appendStringInfoString(allShardNameAndSizeQuery, shardNameAndSizeQuery);
}
/* Add a dummy entry so that UNION ALL doesn't complain */
appendStringInfo(allShardNameAndSizeQuery, "SELECT NULL::text, 0::bigint;");
return allShardNameAndSizeQuery->data;
}
/*
* GenerateShardNameAndSizeQueryForShardList generates a SELECT shard_name - shard_size query to get
* size of multiple tables.
*/
static char *
GenerateShardNameAndSizeQueryForShardList(List *shardIntervalList)
{
StringInfo selectQuery = makeStringInfo();
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
AppendShardIdToName(&shardName, shardId);
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName);
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, " UNION ALL ");
}
return selectQuery->data;
}
/*
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
* its' size with internal functions.