mirror of https://github.com/citusdata/citus.git
Add citus_remote_connection_stats() function
This function is intended to be used for monitoring the remote connections.preventConflictingFlags
parent
4d00dc94f8
commit
22c0f2a8cd
|
@ -18,14 +18,20 @@
|
|||
#include "miscadmin.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "utils/hashutils.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "storage/ipc.h"
|
||||
|
||||
|
||||
#define REMOTE_CONNECTION_STATS_COLUMNS 6
|
||||
|
||||
|
||||
/*
|
||||
* The data structure used to store data in shared memory. This data structure only
|
||||
* used for storing the lock. The actual statistics about the connections are stored
|
||||
|
@ -82,12 +88,98 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
|||
|
||||
|
||||
/* local function declarations */
|
||||
static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor);
|
||||
static void UnLockConnectionSharedMemory(void);
|
||||
static void LockConnectionSharedMemory(LWLockMode lockMode);
|
||||
static void SharedConnectionStatsShmemInit(void);
|
||||
static size_t SharedConnectionStatsShmemSize(void);
|
||||
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
|
||||
static uint32 SharedConnectionHashHash(const void *key, Size keysize);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
|
||||
|
||||
|
||||
/*
|
||||
* citus_remote_connection_stats returns all the avaliable information about all
|
||||
* the remote connections (a.k.a., connections to remote nodes).
|
||||
*/
|
||||
Datum
|
||||
citus_remote_connection_stats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
|
||||
StoreAllConnections(tupleStore, tupleDescriptor);
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreAllConnections gets connections established from the current node
|
||||
* and inserts them into the given tuplestore.
|
||||
*
|
||||
* We don't need to enforce any access privileges as the number of backends
|
||||
* on any node is already visible on pg_stat_activity to all users.
|
||||
*/
|
||||
static void
|
||||
StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||
{
|
||||
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
|
||||
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
|
||||
|
||||
/* we're reading all distributed transactions, prevent new backends */
|
||||
LockConnectionSharedMemory(LW_SHARED);
|
||||
|
||||
HASH_SEQ_STATUS status;
|
||||
SharedConnStatsHashEntry *connectionEntry = NULL;
|
||||
|
||||
hash_seq_init(&status, SharedConnStatsHash);
|
||||
while ((connectionEntry = (SharedConnStatsHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
/* get ready for the next tuple */
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
values[0] = Int32GetDatum(connectionEntry->key.nodeId);
|
||||
values[1] = PointerGetDatum(connectionEntry->key.database);
|
||||
values[2] = Int32GetDatum(connectionEntry->connectionCount);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
}
|
||||
|
||||
UnLockConnectionSharedMemory();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockConnectionSharedMemory is a utility function that should be used when
|
||||
* accessing to the SharedConnStatsHash, which is in the shared memory.
|
||||
*/
|
||||
static void
|
||||
LockConnectionSharedMemory(LWLockMode lockMode)
|
||||
{
|
||||
LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UnLockConnectionSharedMemory is a utility function that should be used after
|
||||
* LockConnectionSharedMemory().
|
||||
*/
|
||||
static void
|
||||
UnLockConnectionSharedMemory(void)
|
||||
{
|
||||
LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeSharedConnectionStats requests the necessary shared memory
|
||||
* from Postgres and sets up the shared memory startup hook.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
#include "udfs/citus_remote_connection_stats/9.3-2.sql"
|
|
@ -0,0 +1,6 @@
|
|||
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
|
||||
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
|
||||
IS 'returns statistics about remote connections';
|
|
@ -0,0 +1,6 @@
|
|||
CREATE OR REPLACE FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_remote_connection_stats$$;
|
||||
COMMENT ON FUNCTION citus_remote_connection_stats(OUT node_id int, OUT database_name text, OUT connection_count_to_node int)
|
||||
IS 'returns statistics about remote connections';
|
Loading…
Reference in New Issue