pull/7917/head
Onur Tirtir 2025-04-14 11:04:38 +03:00
parent bb9d90ecc3
commit 4efd6c6189
16 changed files with 1303 additions and 10 deletions

View File

@ -39,6 +39,7 @@
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/stat_counters.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h" #include "distributed/worker_log_messages.h"
@ -354,6 +355,18 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags); MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
/*
* Increment the connection stat counter for the connections that are
* reused only if the connection is in a good state. Here we don't
* bother shutting down the connection or such if it is not in a good
* state but we mostly want to avoid incrementing the connection stat
* counter for a connection that the caller cannot really use.
*/
if (PQstatus(connection->pgConn) == CONNECTION_OK)
{
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
}
return connection; return connection;
} }
} }
@ -395,6 +408,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_delete(&connection->connectionNode); dlist_delete(&connection->connectionNode);
pfree(connection); pfree(connection);
/*
* Here we don't increment the connection stat counter for the optional
* connections that we gave up establishing due to connection throttling
* because the callers who request optional connections know how to
* survive without them.
*/
return NULL; return NULL;
} }
} }
@ -1026,6 +1045,11 @@ FinishConnectionListEstablishment(List *multiConnectionList)
if (event->events & WL_POSTMASTER_DEATH) if (event->events & WL_POSTMASTER_DEATH)
{ {
/*
* Here we don't increment the connection stat counter for the
* optional failed connections because this is not a connection
* failure, but a postmaster death in the local node.
*/
ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
} }
@ -1042,6 +1066,12 @@ FinishConnectionListEstablishment(List *multiConnectionList)
* reset the memory context * reset the memory context
*/ */
MemoryContextDelete(MemoryContextSwitchTo(oldContext)); MemoryContextDelete(MemoryContextSwitchTo(oldContext));
/*
* Similarly, we don't increment the connection stat counter for the
* failed connections here because this is not a connection failure
* but a cancellation request is received.
*/
return; return;
} }
@ -1072,6 +1102,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
eventMask, NULL); eventMask, NULL);
if (!success) if (!success)
{ {
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d " errmsg("connection establishment for node %s:%d "
"failed", connection->hostname, "failed", connection->hostname,
@ -1088,7 +1119,15 @@ FinishConnectionListEstablishment(List *multiConnectionList)
*/ */
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED) if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
{ {
MarkConnectionConnected(connectionState->connection); /*
* Since WaitEventSetFromMultiConnectionStates() only adds the
* connections that we haven't completed the connection
* establishment yet, here we always have a new connection.
* In other words, at this point, we surely know that we're
* not dealing with a cached connection.
*/
bool newConnection = true;
MarkConnectionConnected(connectionState->connection, newConnection);
} }
} }
} }
@ -1172,6 +1211,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
/* close connection, otherwise we take up resource on the other side */ /* close connection, otherwise we take up resource on the other side */
CitusPQFinish(connection); CitusPQFinish(connection);
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
} }
} }
@ -1584,7 +1625,7 @@ RemoteTransactionIdle(MultiConnection *connection)
* establishment time when necessary. * establishment time when necessary.
*/ */
void void
MarkConnectionConnected(MultiConnection *connection) MarkConnectionConnected(MultiConnection *connection, bool newConnection)
{ {
connection->connectionState = MULTI_CONNECTION_CONNECTED; connection->connectionState = MULTI_CONNECTION_CONNECTED;
@ -1592,6 +1633,11 @@ MarkConnectionConnected(MultiConnection *connection)
{ {
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd); INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
} }
if (newConnection)
{
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
}
} }

View File

@ -171,6 +171,7 @@
#include "distributed/repartition_join_execution.h" #include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/stat_counters.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/transaction_identifier.h" #include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
@ -690,7 +691,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session); WorkerSession *session);
static void ConnectionStateMachine(WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session);
static bool HasUnfinishedTaskForSession(WorkerSession *session); static bool HasUnfinishedTaskForSession(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session); static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection);
static bool HasAnyConnectionFailure(WorkerPool *workerPool); static bool HasAnyConnectionFailure(WorkerPool *workerPool);
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
static bool TransactionModifiedDistributedTable(DistributedExecution *execution); static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
@ -2035,6 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
else else
{ {
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
} }
@ -3011,6 +3013,7 @@ ConnectionStateMachine(WorkerSession *session)
* connection, clear any state associated with it. * connection, clear any state associated with it.
*/ */
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
break; break;
} }
@ -3019,7 +3022,12 @@ ConnectionStateMachine(WorkerSession *session)
ConnStatusType status = PQstatus(connection->pgConn); ConnStatusType status = PQstatus(connection->pgConn);
if (status == CONNECTION_OK) if (status == CONNECTION_OK)
{ {
HandleMultiConnectionSuccess(session); /*
* Connection was already established, possibly a cached
* connection.
*/
bool newConnection = false;
HandleMultiConnectionSuccess(session, newConnection);
UpdateConnectionWaitFlags(session, UpdateConnectionWaitFlags(session,
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
break; break;
@ -3027,6 +3035,7 @@ ConnectionStateMachine(WorkerSession *session)
else if (status == CONNECTION_BAD) else if (status == CONNECTION_BAD)
{ {
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
break; break;
} }
@ -3042,6 +3051,7 @@ ConnectionStateMachine(WorkerSession *session)
if (pollMode == PGRES_POLLING_FAILED) if (pollMode == PGRES_POLLING_FAILED)
{ {
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
} }
else if (pollMode == PGRES_POLLING_READING) else if (pollMode == PGRES_POLLING_READING)
{ {
@ -3059,7 +3069,12 @@ ConnectionStateMachine(WorkerSession *session)
} }
else else
{ {
HandleMultiConnectionSuccess(session); /*
* Connection was not established befoore (!= CONNECTION_OK)
* but PQconnectPoll() did so now.
*/
bool newConnection = true;
HandleMultiConnectionSuccess(session, newConnection);
UpdateConnectionWaitFlags(session, UpdateConnectionWaitFlags(session,
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
@ -3137,6 +3152,11 @@ ConnectionStateMachine(WorkerSession *session)
break; break;
} }
/*
* Here we don't increment the connection stat counter for failed
* connections because we don't track the connections that we could
* establish but lost later.
*/
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
break; break;
} }
@ -3299,12 +3319,12 @@ HasUnfinishedTaskForSession(WorkerSession *session)
* connection's state. * connection's state.
*/ */
static void static void
HandleMultiConnectionSuccess(WorkerSession *session) HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection)
{ {
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
WorkerPool *workerPool = session->workerPool; WorkerPool *workerPool = session->workerPool;
MarkConnectionConnected(connection); MarkConnectionConnected(connection, newConnection);
ereport(DEBUG4, (errmsg("established connection to %s:%d for " ereport(DEBUG4, (errmsg("established connection to %s:%d for "
"session %ld in %ld microseconds", "session %ld in %ld microseconds",

View File

@ -66,6 +66,7 @@
#include "distributed/recursive_planning.h" #include "distributed/recursive_planning.h"
#include "distributed/shard_utils.h" #include "distributed/shard_utils.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/stat_counters.h"
#include "distributed/utils/citus_stat_tenants.h" #include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h" #include "distributed/worker_shard_visibility.h"
@ -1433,6 +1434,8 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
if (IsMultiTaskPlan(distributedPlan)) if (IsMultiTaskPlan(distributedPlan))
{ {
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
/* if it is not a single task executable plan, inform user according to the log level */ /* if it is not a single task executable plan, inform user according to the log level */
if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF) if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
{ {
@ -1444,6 +1447,10 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
" queries on the workers."))); " queries on the workers.")));
} }
} }
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
distributedPlan->queryId = localPlan->queryId; distributedPlan->queryId = localPlan->queryId;

View File

@ -105,6 +105,7 @@
#include "distributed/shardsplit_shared_memory.h" #include "distributed/shardsplit_shared_memory.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/stat_counters.h"
#include "distributed/statistics_collection.h" #include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
@ -188,6 +189,7 @@ static void multi_log_hook(ErrorData *edata);
static bool IsSequenceOverflowError(ErrorData *edata); static bool IsSequenceOverflowError(ErrorData *edata);
static void RegisterConnectionCleanup(void); static void RegisterConnectionCleanup(void);
static void RegisterExternalClientBackendCounterDecrement(void); static void RegisterExternalClientBackendCounterDecrement(void);
static void RegisterCitusStatCountersFlush(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg); static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void); static void CreateRequiredDirectories(void);
@ -505,6 +507,11 @@ _PG_init(void)
InitializeMultiTenantMonitorSMHandleManagement(); InitializeMultiTenantMonitorSMHandleManagement();
if (IsCitusStatCountersEnabled())
{
InitializeStatCountersArrayMem();
}
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)
{ {
@ -615,6 +622,12 @@ citus_shmem_request(void)
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize()); RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
if (IsCitusStatCountersEnabled())
{
RequestAddinShmemSpace(StatCountersArrayShmemSize());
RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1);
}
} }
@ -787,6 +800,12 @@ StartupCitusBackend(void)
SetBackendDataDatabaseId(); SetBackendDataDatabaseId();
RegisterConnectionCleanup(); RegisterConnectionCleanup();
if (IsCitusStatCountersEnabled())
{
RegisterCitusStatCountersFlush();
}
FinishedStartupCitusBackend = true; FinishedStartupCitusBackend = true;
} }
@ -841,6 +860,23 @@ RegisterExternalClientBackendCounterDecrement(void)
} }
/*
* RegisterCitusStatCountersFlush registers CitusStatCountersFlushAtExit()
* to be called before the backend exits.
*/
static void
RegisterCitusStatCountersFlush(void)
{
static bool registeredCleanup = false;
if (registeredCleanup == false)
{
before_shmem_exit(CitusStatCountersFlushAtExit, 0);
registeredCleanup = true;
}
}
/* /*
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
* backend for the purposes of any clean-up needed. * backend for the purposes of any clean-up needed.
@ -2429,6 +2465,41 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.stat_counters_flush_interval",
gettext_noop("Sets the interval to flush the local stat counters into "
"the shared memory."),
gettext_noop("Stat counters are used to track the number of certain "
"operations in Citus. While setting this GUC to -1 disables "
"stat counters, setting it to a positive value will flush "
"the local stat counters into the shared memory every "
"interval milliseconds instead of flushing them immediately "
"after the operation, as it is done when the value is 0. "
"Higher values reduce the overhead of flushing the stat "
"counters but increase the time it takes to see the updated "
"stat counters."),
&StatCountersFlushInterval,
DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL,
DISABLE_STAT_COUNTERS_FLUSH_INTERVAL,
5 * MS_PER_MINUTE,
PGC_POSTMASTER,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.stat_counters_purge_interval",
gettext_noop("Determines time interval for citus_stat_counters to remove "
"the shared memory entries for the databases that were dropped."),
gettext_noop("This is automatically disabled when the stat counters are disabled "
"(citus.stat_counters_flush_interval = -1) and there is no explicit "
"way to disable purging the shared memory entries while the stat "
"counters are enabled."),
&StatCountersPurgeInterval,
DEFAULT_STAT_COUNTERS_PURGE_INTERVAL, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MS | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
/* /*
* It takes about 140 bytes of shared memory to store one row, therefore * It takes about 140 bytes of shared memory to store one row, therefore
* this setting should be used responsibly. setting it to 10M will require * this setting should be used responsibly. setting it to 10M will require

View File

@ -48,3 +48,5 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/repl_origin_helper/13.1-1.sql" #include "udfs/repl_origin_helper/13.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql" #include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
#include "udfs/citus_is_primary_node/13.1-1.sql" #include "udfs/citus_is_primary_node/13.1-1.sql"
#include "udfs/citus_stat_counters/13.1-1.sql"
#include "udfs/citus_stat_counters_reset/13.1-1.sql"

View File

@ -41,3 +41,7 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" #include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
DROP VIEW pg_catalog.citus_stat_counters;
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);

View File

@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
database_oid oid,
-- Following stat counter columns must be in the same order as the
-- StatType enum defined in src/include/distributed/stat_counters.h
OUT connection_establishment_succeeded bigint,
OUT connection_establishment_failed bigint,
OUT connection_reused bigint,
OUT query_execution_single_shard bigint,
OUT query_execution_multi_shard bigint,
-- Must always be the last column or you should accordingly update
-- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c
OUT stats_reset timestamp with time zone
)
RETURNS SETOF RECORD
LANGUAGE C VOLATILE PARALLEL UNSAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database';
CREATE VIEW citus.citus_stat_counters AS
SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database;
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;

View File

@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
database_oid oid,
-- Following stat counter columns must be in the same order as the
-- StatType enum defined in src/include/distributed/stat_counters.h
OUT connection_establishment_succeeded bigint,
OUT connection_establishment_failed bigint,
OUT connection_reused bigint,
OUT query_execution_single_shard bigint,
OUT query_execution_multi_shard bigint,
-- Must always be the last column or you should accordingly update
-- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c
OUT stats_reset timestamp with time zone
)
RETURNS SETOF RECORD
LANGUAGE C VOLATILE PARALLEL UNSAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database';
CREATE VIEW citus.citus_stat_counters AS
SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database;
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
RETURNS VOID
LANGUAGE C VOLATILE PARALLEL UNSAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0';
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
RETURNS VOID
LANGUAGE C VOLATILE PARALLEL UNSAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0';
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;

View File

@ -0,0 +1,984 @@
/*-------------------------------------------------------------------------
*
* stat_counters.c
*
* This file contains functions to track various statistic counters for
* Citus.
*
* Each backend increments the local counters (PendingStatCounters) and
* flushes them to the shared memory at the end of the flush interval
* (StatCountersFlushInterval). The shared memory is used to keep track
* of the stat counters across backends for all databases.
*
* We don't have a good way to enforce that we flush the pending counters
* always at the end of the flush interval because we only check for the
* interval at the time of incrementing a counter. And, if a backend exits
* before the interval elapses, then we still flush the pending counters at
* the time of exit via CitusStatCountersFlushAtExit().
*
* Also, we store the stat counters in CITUS_STAT_COUNTERS_DUMP_FILE on
* shutdown via postmaster and restore them on startup via the first backend
* that initializes the shared memory for stat counters.
*
* And for the dropped databases, Citus maintenance daemon removes the stat
* counters from the shared memory periodically, which is controlled by
* StatCountersPurgeInterval. When the feature is disabled, removal of the
* stat counters is automatically disabled. Instead of inserting a cleanup
* record into pg_dist_cleanup, we choose to remove the stat counters via
* the maintenance daemon because inserting a cleanup record wouldn't work
* when the current database is dropped. And if there were other databases
* where Citus is installed, then they would still continue to have the
* dropped database's stat counters in the shared memory.
*
*-------------------------------------------------------------------------
*/
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "common/hashfn.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "utils/syscache.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/stat_counters.h"
/* file path for dumping the stat counters */
#define CITUS_STAT_COUNTERS_DUMP_FILE "pg_stat/citus_stat_counters.stat"
#define CITUS_STAT_COUNTERS_TMP_DUMP_FILE (CITUS_STAT_COUNTERS_DUMP_FILE ".tmp")
/*
* Shared hash constants.
*
* The places where STAT_COUNTERS_MAX_DATABASES is used do not impose a hard
* limit on the number of databases that can be tracked but in ShmemInitHash()
* it's documented that the access efficiency will degrade if it is exceeded
* substantially.
*
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
* a concern.
*/
#define STAT_COUNTERS_INIT_DATABASES 8
#define STAT_COUNTERS_MAX_DATABASES 1024
/* shared hash table entry */
typedef struct StatCountersHashEntry
{
/* must be the first field since this is used as the key */
Oid dbId;
pg_atomic_uint64 counters[N_CITUS_STAT_COUNTERS];
TimestampTz statsResetTimestamp;
} StatCountersHashEntry;
/* shared memory state */
typedef struct StatCountersState
{
LWLockId lock;
} StatCountersState;
/* GUC value for citus.stat_counters_flush_interval */
int StatCountersFlushInterval = DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL;
/* GUC value for citus.stat_counters_purge_interval */
int StatCountersPurgeInterval = DEFAULT_STAT_COUNTERS_PURGE_INTERVAL;
/* stat counters dump file version */
static const uint32 CITUS_STAT_COUNTERS_FILE_VERSION = 1;
/* shared memory init */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* shared memory variables */
static StatCountersState *CitusStatCountersSharedState = NULL;
static HTAB *CitusStatCountersSharedHash = NULL;
/* local stat counters that are pending to be flushed */
static uint64 PendingStatCounters[N_CITUS_STAT_COUNTERS] = { 0 };
/* helper functions for citus_stat_counters() and citus_stat_counters_reset() */
static Tuplestorestate * SetupStatCountersTuplestore(FunctionCallInfo fcinfo,
TupleDesc *tupleDescriptor);
static void StoreStatCountersForDbId(Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor,
Oid dbId);
static void StoreStatCountersFromValues(Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor,
pg_atomic_uint64 *counters,
TimestampTz statsResetTimestamp);
static void ResetStatCountersForDbId(Oid dbId);
static void ResetStatCounterArray(pg_atomic_uint64 *counters);
/* helper functions to increment and flush stat counters */
static bool ShouldByPassLocalCounters(void);
static void IncrementSharedStatCounterForMyDb(int statId);
static void FlushPendingCountersIfNeeded(bool force);
/* shared memory init & management */
static void CitusStatCountersShmemInit(void);
static void CitusStatCountersShmemShutdown(int code, Datum arg);
/* shared hash utilities */
static StatCountersHashEntry * CitusStatCountersHashEntryAllocIfNotExists(Oid dbId);
PG_FUNCTION_INFO_V1(citus_stat_counters);
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
/*
* citus_stat_counters returns the stat counters for the specified database or for
* all databases if InvalidOid is provided.
*/
Datum
citus_stat_counters(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("database oid cannot be NULL")));
}
Oid dbId = PG_GETARG_OID(0);
if (!IsCitusStatCountersEnabled())
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Citus stat counters are not enabled, consider setting "
"citus.stat_counters_flush_interval to a non-negative value "
"and restarting the server")));
}
if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("shared memory is not initialized for Citus stat counters")));
}
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupStatCountersTuplestore(fcinfo, &tupleDescriptor);
StoreStatCountersForDbId(tupleStore, tupleDescriptor, dbId);
PG_RETURN_VOID();
}
/*
* citus_stat_counters_reset resets the stat counters for the specified database
* or for all databases if InvalidOid is provided.
*/
Datum
citus_stat_counters_reset(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("database oid cannot be NULL")));
}
Oid dbId = PG_GETARG_OID(0);
if (!IsCitusStatCountersEnabled())
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Citus stat counters are not enabled, consider setting "
"citus.stat_counters_flush_interval to a non-negative value "
"and restarting the server")));
}
if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("shared memory is not initialized for Citus stat counters")));
}
ResetStatCountersForDbId(dbId);
PG_RETURN_VOID();
}
/*
* IsCitusStatCountersEnabled returns whether the stat counters are enabled.
*/
bool
IsCitusStatCountersEnabled(void)
{
Assert(StatCountersFlushInterval >= DISABLE_STAT_COUNTERS_FLUSH_INTERVAL);
return StatCountersFlushInterval != DISABLE_STAT_COUNTERS_FLUSH_INTERVAL;
}
/*
* IncrementStatCounterForMyDb increments the stat counter for the given statId
* of the current database.
*/
void
IncrementStatCounterForMyDb(int statId)
{
if (!IsCitusStatCountersEnabled())
{
return;
}
if (ShouldByPassLocalCounters())
{
IncrementSharedStatCounterForMyDb(statId);
}
else
{
PendingStatCounters[statId]++;
bool force = false;
FlushPendingCountersIfNeeded(force);
}
}
/*
* InitializeStatCountersArrayMem saves the previous shmem_startup_hook and sets
* up a new shmem_startup_hook to initialize the shared memory used for
* keeping track of stat counters across backends for all databases.
*/
void
InitializeStatCountersArrayMem(void)
{
Assert(IsCitusStatCountersEnabled());
/* on the versions older than PG 15, we use shmem_request_hook_type */
#if PG_VERSION_NUM < PG_VERSION_15
if (!IsUnderPostmaster)
{
RequestAddinShmemSpace(StatCountersArrayShmemSize());
elog(LOG, "requesting named LWLockTranch for %s",
STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME);
RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1);
}
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = CitusStatCountersShmemInit;
}
/*
* CitusStatCountersFlushAtExit is called on backend exit to flush the local
* stat counters to the shared memory, if there are any pending.
*/
void
CitusStatCountersFlushAtExit(int code, Datum arg)
{
Assert(IsCitusStatCountersEnabled());
bool force = true;
FlushPendingCountersIfNeeded(force);
}
/*
* StatCountersArrayShmemSize calculates and returns shared memory size
* required to keep stat counters.
*/
Size
StatCountersArrayShmemSize(void)
{
Size size = MAXALIGN(sizeof(StatCountersState));
size = add_size(size, hash_estimate_size(STAT_COUNTERS_MAX_DATABASES,
sizeof(StatCountersHashEntry)));
return size;
}
/*
* CitusStatCountersRemoveDroppedDatabases removes the stat counters for the
* databases that are dropped.
*/
void
CitusStatCountersRemoveDroppedDatabases(void)
{
Assert(IsCitusStatCountersEnabled());
List *droppedDatabaseIdList = NIL;
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
HASH_SEQ_STATUS hashSeqState = { 0 };
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
StatCountersHashEntry *dbEntry = NULL;
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
{
HeapTuple dbTuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbEntry->dbId));
if (!HeapTupleIsValid(dbTuple))
{
droppedDatabaseIdList = lappend_oid(droppedDatabaseIdList, dbEntry->dbId);
}
else
{
ReleaseSysCache(dbTuple);
}
}
LWLockRelease(CitusStatCountersSharedState->lock);
if (list_length(droppedDatabaseIdList) == 0)
{
return;
}
/* acquire exclusive lock to quickly remove the entries */
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
Oid droppedDatabaseId = InvalidOid;
foreach_declared_oid(droppedDatabaseId, droppedDatabaseIdList)
{
hash_search(CitusStatCountersSharedHash, (void *) &droppedDatabaseId,
HASH_REMOVE, NULL);
}
LWLockRelease(CitusStatCountersSharedState->lock);
}
/*
* SetupStatCountersTuplestore returns a Tuplestorestate for returning the
* stat counters and setups the provided TupleDesc.
*/
static Tuplestorestate *
SetupStatCountersTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor)
{
ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo;
switch (get_call_result_type(fcinfo, NULL, tupleDescriptor))
{
case TYPEFUNC_COMPOSITE:
{
/* success */
break;
}
case TYPEFUNC_RECORD:
{
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
}
default:
{
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
}
MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory;
MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext);
bool randomAccess = true;
bool interTransactions = false;
Tuplestorestate *tupstore = tuplestore_begin_heap(randomAccess, interTransactions,
work_mem);
resultSet->returnMode = SFRM_Materialize;
resultSet->setResult = tupstore;
resultSet->setDesc = *tupleDescriptor;
MemoryContextSwitchTo(oldContext);
return tupstore;
}
/*
* StoreStatCountersForDbId fetches the stat counters for the specified database
* and stores them into the given tuple store.
*/
static void
StoreStatCountersForDbId(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Oid dbId)
{
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash,
(void *) &dbId,
HASH_FIND, NULL);
if (dbEntry)
{
StoreStatCountersFromValues(tupleStore, tupleDescriptor, dbEntry->counters,
dbEntry->statsResetTimestamp);
}
else
{
/* use a zeroed entry if the database doesn't exist */
TimestampTz zeroedTimestamp = 0;
pg_atomic_uint64 zeroedCounters[N_CITUS_STAT_COUNTERS] = { 0 };
ResetStatCounterArray(zeroedCounters);
StoreStatCountersFromValues(tupleStore, tupleDescriptor, zeroedCounters,
zeroedTimestamp);
}
LWLockRelease(CitusStatCountersSharedState->lock);
}
/*
* StoreStatCountersFromValues stores the stat counters stored in given
* counter array and the timestamp into the given tuple store.
*
* Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS.
*/
static void
StoreStatCountersFromValues(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor,
pg_atomic_uint64 *counters, TimestampTz statsResetTimestamp)
{
Datum values[N_CITUS_STAT_COUNTERS + 1] = { 0 };
bool isNulls[N_CITUS_STAT_COUNTERS + 1] = { 0 };
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
values[statIdx] = UInt64GetDatum(pg_atomic_read_u64(&counters[statIdx]));
}
if (statsResetTimestamp == 0)
{
isNulls[N_CITUS_STAT_COUNTERS] = true;
}
else
{
values[N_CITUS_STAT_COUNTERS] = TimestampTzGetDatum(statsResetTimestamp);
}
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
/*
* ResetStatCountersForDbId resets the stat counters for the specified database or
* for all databases if InvalidOid is provided.
*/
static void
ResetStatCountersForDbId(Oid dbId)
{
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
if (OidIsValid(dbId))
{
StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash,
(void *) &dbId,
HASH_FIND, NULL);
/* skip if we don't have an entry for this database */
if (dbEntry)
{
ResetStatCounterArray(dbEntry->counters);
dbEntry->statsResetTimestamp = GetCurrentTimestamp();
}
}
else
{
HASH_SEQ_STATUS hashSeqState = { 0 };
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
StatCountersHashEntry *dbEntry = NULL;
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
{
ResetStatCounterArray(dbEntry->counters);
dbEntry->statsResetTimestamp = GetCurrentTimestamp();
}
}
LWLockRelease(CitusStatCountersSharedState->lock);
}
/*
* ResetStatCounterArray resets the stat counters stored in the given
* counter array.
*
* Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS.
*/
static void
ResetStatCounterArray(pg_atomic_uint64 *counters)
{
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_write_u64(&counters[statIdx], 0);
}
}
/*
* ShouldByPassLocalCounters returns true if we should immediately increment
* the shared memory counters without buffering them in the local counters.
*/
static bool
ShouldByPassLocalCounters(void)
{
Assert(IsCitusStatCountersEnabled());
return StatCountersFlushInterval == 0;
}
/*
* IncrementSharedStatCounterForMyDb increments the stat counter for the given statId
* of the current database in the shared memory.
*/
static void
IncrementSharedStatCounterForMyDb(int statId)
{
Oid dbId = MyDatabaseId;
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
/*
* XXX: Can we cache the entry for the current database?
*
* From one perspective, doing so should be fine since dropping a
* database succeeds only after all the backends connected to it
* are disconnected, so we cannot have a backend that has a dangling
* entry.
*
* On the other hand, we're not sure if a potential hash table resize
* would invalidate the cached entry.
*/
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
CitusStatCountersSharedHash,
(void *) &dbId,
HASH_FIND,
NULL);
if (!dbEntry)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
/* downgrade the lock to shared */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
}
pg_atomic_fetch_add_u64(&dbEntry->counters[statId], 1);
LWLockRelease(CitusStatCountersSharedState->lock);
}
/*
* FlushPendingCountersIfNeeded flushes PendingStatCounters to the shared
* memory if StatCountersFlushInterval has elapsed since the last flush
* or if force is true.
*/
static void
FlushPendingCountersIfNeeded(bool force)
{
static instr_time LastFlushTime = { 0 };
if (!force)
{
if (INSTR_TIME_IS_ZERO(LastFlushTime))
{
/* assume that the first increment is the start of the flush interval */
INSTR_TIME_SET_CURRENT(LastFlushTime);
return;
}
instr_time now = { 0 };
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_SUBTRACT(now, LastFlushTime);
if (INSTR_TIME_GET_MILLISEC(now) < StatCountersFlushInterval)
{
return;
}
}
Oid dbId = MyDatabaseId;
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
/* XXX: Same here, can we cache the entry for the current database? */
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
CitusStatCountersSharedHash,
(void *) &dbId,
HASH_FIND,
NULL);
if (!dbEntry)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
/* downgrade the lock to shared */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
}
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
if (PendingStatCounters[statIdx] == 0)
{
/* nothing to flush for this stat, avoid unnecessary lock contention */
continue;
}
pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx],
PendingStatCounters[statIdx]);
PendingStatCounters[statIdx] = 0;
}
LWLockRelease(CitusStatCountersSharedState->lock);
INSTR_TIME_SET_CURRENT(LastFlushTime);
}
/*
* CitusStatCountersShmemInit initializes the shared memory used
* for keeping track of stat counters and restores the stat counters from
* the dump file.
*/
static void
CitusStatCountersShmemInit(void)
{
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
Assert(IsCitusStatCountersEnabled());
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool alreadyInitialized = false;
CitusStatCountersSharedState = ShmemInitStruct(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME,
sizeof(StatCountersState),
&alreadyInitialized);
if (!alreadyInitialized)
{
CitusStatCountersSharedState->lock = &(GetNamedLWLockTranche(
STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME))
->lock;
}
HASHCTL hashInfo = {
.keysize = sizeof(Oid),
.entrysize = sizeof(StatCountersHashEntry),
.hash = uint32_hash,
};
CitusStatCountersSharedHash = ShmemInitHash("Citus stat counters Hash",
STAT_COUNTERS_INIT_DATABASES,
STAT_COUNTERS_MAX_DATABASES, &hashInfo,
HASH_ELEM | HASH_FUNCTION);
LWLockRelease(AddinShmemInitLock);
if (!IsUnderPostmaster)
{
/* postmaster dumps the stat counters on shutdown */
on_shmem_exit(CitusStatCountersShmemShutdown, (Datum) 0);
}
if (alreadyInitialized)
{
/*
* alreadyInitialized being true doesn't necessarily mean that the
* backend that initialized the shared memory has completed restoring
* the stat counters from the dump file.
*
* However, it's still fine to return here as commented below.
*/
return;
}
/*
* We know that only one backend can be here at this point, so we're
* the only one that restores the stat counters from the dump file.
*
* However, we don't block other backends from accessing the shared
* memory while we're restoring the stat counters - need to be careful.
*/
FILE *file = AllocateFile(CITUS_STAT_COUNTERS_DUMP_FILE, PG_BINARY_R);
if (file == NULL)
{
if (errno == ENOENT)
{
/* ignore not-found error */
return;
}
goto error;
}
uint32 fileVersion = 0;
if (fread(&fileVersion, sizeof(uint32), 1, file) != 1)
{
goto error;
}
/* check if version is correct, atm we only have one */
if (fileVersion != CITUS_STAT_COUNTERS_FILE_VERSION)
{
ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("citus_stat_counters file version mismatch: expected %u, got %u",
CITUS_STAT_COUNTERS_FILE_VERSION, fileVersion)));
goto error;
}
/* get number of entries */
int32 numEntries = 0;
if (fread(&numEntries, sizeof(int32), 1, file) != 1)
{
goto error;
}
for (int i = 0; i < numEntries; i++)
{
Oid dbId = InvalidOid;
if (fread(&dbId, sizeof(Oid), 1, file) != 1)
{
goto error;
}
uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 };
if (fread(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) !=
N_CITUS_STAT_COUNTERS)
{
goto error;
}
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
CitusStatCountersSharedHash,
(void *) &dbId,
HASH_FIND,
NULL);
if (!dbEntry)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
/* downgrade the lock to shared */
LWLockRelease(CitusStatCountersSharedState->lock);
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
}
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
/*
* If no other backend has created the entry for us, then we do that
* via the above call made to CitusStatCountersHashEntryAllocIfNotExists()
* and there we init the counters if we just created the entry.
*
* So here we always "fetch_add" instead of "write" or "init" to avoid
* overwriting the counters-increments made by other backends so far, if
* it's not us who created the entry.
*/
pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], statCounters[statIdx]);
}
LWLockRelease(CitusStatCountersSharedState->lock);
}
FreeFile(file);
/*
* Remove the file so it's not included in backups/replicas, etc. A new file will be
* written on next shutdown.
*/
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read citus_stat_counters file \"%s\": %m",
CITUS_STAT_COUNTERS_DUMP_FILE)));
if (file)
{
FreeFile(file);
}
/* delete bogus file, don't care of errors in this case */
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
}
/*
* CitusStatCountersShmemShutdown is called on shutdown by postmaster to dump the
* stat counters to CITUS_STAT_COUNTERS_DUMP_FILE.
*/
static void
CitusStatCountersShmemShutdown(int code, Datum arg)
{
/* don't try to dump during a crash */
if (code)
{
return;
}
Assert(IsCitusStatCountersEnabled());
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
FILE *file = AllocateFile(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, PG_BINARY_W);
if (file == NULL)
{
goto error;
}
if (fwrite(&CITUS_STAT_COUNTERS_FILE_VERSION, sizeof(uint32), 1, file) != 1)
{
goto error;
}
int32 numEntries = hash_get_num_entries(CitusStatCountersSharedHash);
if (fwrite(&numEntries, sizeof(int32), 1, file) != 1)
{
goto error;
}
StatCountersHashEntry *dbEntry = NULL;
HASH_SEQ_STATUS hashSeqState = { 0 };
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
{
if (fwrite(&dbEntry->dbId, sizeof(Oid), 1, file) != 1)
{
/* we assume hash_seq_term won't change errno */
hash_seq_term(&hashSeqState);
goto error;
}
uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 };
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
statCounters[statIdx] = pg_atomic_read_u64(&dbEntry->counters[statIdx]);
}
if (fwrite(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) !=
N_CITUS_STAT_COUNTERS)
{
/* we assume hash_seq_term won't change errno */
hash_seq_term(&hashSeqState);
goto error;
}
}
if (FreeFile(file))
{
file = NULL;
goto error;
}
/* rename the file inplace */
if (rename(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, CITUS_STAT_COUNTERS_DUMP_FILE) != 0)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not rename citus_stat_counters file \"%s\" to \"%s\": %m",
CITUS_STAT_COUNTERS_TMP_DUMP_FILE,
CITUS_STAT_COUNTERS_DUMP_FILE)));
}
LWLockRelease(CitusStatCountersSharedState->lock);
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not write citus_stat_counters file \"%s\": %m",
CITUS_STAT_COUNTERS_TMP_DUMP_FILE)));
if (file)
{
FreeFile(file);
}
LWLockRelease(CitusStatCountersSharedState->lock);
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
}
/*
* CitusStatCountersHashEntryAllocIfNotExists allocates a new entry in the
* stat counters hash table if it doesn't already exist.
*
* Assumes that the caller has exclusive access to the hash table.
*/
static StatCountersHashEntry *
CitusStatCountersHashEntryAllocIfNotExists(Oid dbId)
{
bool councurrentlyInserted = false;
StatCountersHashEntry *dbEntry =
(StatCountersHashEntry *) hash_search(CitusStatCountersSharedHash, (void *) &dbId,
HASH_ENTER,
&councurrentlyInserted);
/*
* Initialize the counters to 0 if someone else didn't race while we
* were promoting the lock.
*/
if (!councurrentlyInserted)
{
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_init_u64(&dbEntry->counters[statIdx], 0);
}
dbEntry->statsResetTimestamp = 0;
}
return dbEntry;
}

View File

@ -57,6 +57,7 @@
#include "distributed/query_stats.h" #include "distributed/query_stats.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/stat_counters.h"
#include "distributed/statistics_collection.h" #include "distributed/statistics_collection.h"
#include "distributed/transaction_recovery.h" #include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -489,6 +490,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
TimestampTz lastRecoveryTime = 0; TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0; TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0; TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz lastStatCountersPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0; TimestampTz nextMetadataSyncTime = 0;
/* state kept for the background tasks queue monitor */ /* state kept for the background tasks queue monitor */
@ -819,6 +821,36 @@ CitusMaintenanceDaemonMain(Datum main_arg)
timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); timeout = Min(timeout, (StatStatementsPurgeInterval * 1000));
} }
if (IsCitusStatCountersEnabled() &&
TimestampDifferenceExceeds(lastStatCountersPurgeTime, GetCurrentTimestamp(),
StatCountersPurgeInterval))
{
StartTransactionCommand();
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping to purge the stat counter "
"shared memory entries for dropped "
"databases")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
/*
* Record last time we perform the purge to ensure we run once per
* StatCountersPurgeInterval.
*/
lastStatCountersPurgeTime = GetCurrentTimestamp();
CitusStatCountersRemoveDroppedDatabases();
}
CommitTransactionCommand();
/* make sure we don't wait too long */
timeout = Min(timeout, (StatCountersPurgeInterval));
}
pid_t backgroundTaskQueueWorkerPid = 0; pid_t backgroundTaskQueueWorkerPid = 0;
BgwHandleStatus backgroundTaskQueueWorkerStatus = BgwHandleStatus backgroundTaskQueueWorkerStatus =
backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid( backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid(

View File

@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection); extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection);
extern void MarkConnectionConnected(MultiConnection *connection); extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection);
/* waiteventset utilities */ /* waiteventset utilities */
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,

View File

@ -0,0 +1,53 @@
/*-------------------------------------------------------------------------
*
* stat_counters.h
*
* This file contains the exported functions to track various statistic
* counters for Citus.
*
* -------------------------------------------------------------------------
*/
#ifndef STAT_COUNTERS_H
#define STAT_COUNTERS_H
#include <port/atomics.h>
#include "distributed/time_constants.h"
#define STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME "citus_stat_counters_lock_tranche"
#define DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL (30 * MS_PER_SECOND)
#define DISABLE_STAT_COUNTERS_FLUSH_INTERVAL -1
#define DEFAULT_STAT_COUNTERS_PURGE_INTERVAL (5 * MS_PER_MINUTE)
/*
* Must be in the same order as the columns defined in citus_stat_counters view,
* see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql
*/
typedef enum
{
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
STAT_CONNECTION_ESTABLISHMENT_FAILED,
STAT_CONNECTION_REUSED,
STAT_QUERY_EXECUTION_SINGLE_SHARD,
STAT_QUERY_EXECUTION_MULTI_SHARD,
/* do not use this and ensure it is the last entry */
N_CITUS_STAT_COUNTERS
} StatType;
extern int StatCountersFlushInterval;
extern int StatCountersPurgeInterval;
extern bool IsCitusStatCountersEnabled(void);
extern void IncrementStatCounterForMyDb(int statId);
extern void InitializeStatCountersArrayMem(void);
extern void CitusStatCountersFlushAtExit(int code, Datum arg);
extern Size StatCountersArrayShmemSize(void);
extern void CitusStatCountersRemoveDroppedDatabases(void);
#endif /* STAT_COUNTERS_H */

View File

@ -1480,8 +1480,11 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.update_placement_metadata(bigint,integer,integer) void | function citus_internal.update_placement_metadata(bigint,integer,integer) void
| function citus_internal.update_relation_colocation(oid,integer) void | function citus_internal.update_relation_colocation(oid,integer) void
| function citus_is_primary_node() boolean | function citus_is_primary_node() boolean
| function citus_stat_counters(oid) SETOF record
| function citus_stat_counters_reset(oid) void
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void | function citus_unmark_object_distributed(oid,oid,integer,boolean) void
(27 rows) | view citus_stat_counters
(30 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -179,6 +179,8 @@ ORDER BY 1;
function citus_shards_on_worker() function citus_shards_on_worker()
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
function citus_stat_activity() function citus_stat_activity()
function citus_stat_counters(oid)
function citus_stat_counters_reset(oid)
function citus_stat_statements() function citus_stat_statements()
function citus_stat_statements_reset() function citus_stat_statements_reset()
function citus_stat_tenants(boolean) function citus_stat_tenants(boolean)
@ -384,11 +386,12 @@ ORDER BY 1;
view citus_shards view citus_shards
view citus_shards_on_worker view citus_shards_on_worker
view citus_stat_activity view citus_stat_activity
view citus_stat_counters
view citus_stat_statements view citus_stat_statements
view citus_stat_tenants view citus_stat_tenants
view citus_stat_tenants_local view citus_stat_tenants_local
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(358 rows) (361 rows)
DROP TABLE extension_basic_types; DROP TABLE extension_basic_types;