diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 4787d8f2f..473d098f2 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -39,6 +39,7 @@ #include "distributed/remote_commands.h" #include "distributed/run_from_same_connection.h" #include "distributed/shared_connection_stats.h" +#include "distributed/stat_counters.h" #include "distributed/time_constants.h" #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" @@ -354,6 +355,18 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { + /* + * Increment the connection stat counter for the connections that are + * reused only if the connection is in a good state. Here we don't + * bother shutting down the connection or such if it is not in a good + * state but we mostly want to avoid incrementing the connection stat + * counter for a connection that the caller cannot really use. + */ + if (PQstatus(connection->pgConn) == CONNECTION_OK) + { + IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED); + } + return connection; } } @@ -395,6 +408,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_delete(&connection->connectionNode); pfree(connection); + /* + * Here we don't increment the connection stat counter for the optional + * connections that we gave up establishing due to connection throttling + * because the callers who request optional connections know how to + * survive without them. + */ return NULL; } } @@ -1026,6 +1045,11 @@ FinishConnectionListEstablishment(List *multiConnectionList) if (event->events & WL_POSTMASTER_DEATH) { + /* + * Here we don't increment the connection stat counter for the + * optional failed connections because this is not a connection + * failure, but a postmaster death in the local node. + */ ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); } @@ -1042,6 +1066,12 @@ FinishConnectionListEstablishment(List *multiConnectionList) * reset the memory context */ MemoryContextDelete(MemoryContextSwitchTo(oldContext)); + + /* + * Similarly, we don't increment the connection stat counter for the + * failed connections here because this is not a connection failure + * but a cancellation request is received. + */ return; } @@ -1072,6 +1102,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) eventMask, NULL); if (!success) { + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection establishment for node %s:%d " "failed", connection->hostname, @@ -1088,7 +1119,15 @@ FinishConnectionListEstablishment(List *multiConnectionList) */ if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED) { - MarkConnectionConnected(connectionState->connection); + /* + * Since WaitEventSetFromMultiConnectionStates() only adds the + * connections that we haven't completed the connection + * establishment yet, here we always have a new connection. + * In other words, at this point, we surely know that we're + * not dealing with a cached connection. + */ + bool newConnection = true; + MarkConnectionConnected(connectionState->connection, newConnection); } } } @@ -1172,6 +1211,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) /* close connection, otherwise we take up resource on the other side */ CitusPQFinish(connection); + + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } } @@ -1584,7 +1625,7 @@ RemoteTransactionIdle(MultiConnection *connection) * establishment time when necessary. */ void -MarkConnectionConnected(MultiConnection *connection) +MarkConnectionConnected(MultiConnection *connection, bool newConnection) { connection->connectionState = MULTI_CONNECTION_CONNECTED; @@ -1592,6 +1633,11 @@ MarkConnectionConnected(MultiConnection *connection) { INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd); } + + if (newConnection) + { + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED); + } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f276b3df1..9d5b9c407 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -171,6 +171,7 @@ #include "distributed/repartition_join_execution.h" #include "distributed/resource_lock.h" #include "distributed/shared_connection_stats.h" +#include "distributed/stat_counters.h" #include "distributed/subplan_execution.h" #include "distributed/transaction_identifier.h" #include "distributed/transaction_management.h" @@ -690,7 +691,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution, WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); static bool HasUnfinishedTaskForSession(WorkerSession *session); -static void HandleMultiConnectionSuccess(WorkerSession *session); +static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection); static bool HasAnyConnectionFailure(WorkerPool *workerPool); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static bool TransactionModifiedDistributedTable(DistributedExecution *execution); @@ -2035,6 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution) else { connection->connectionState = MULTI_CONNECTION_FAILED; + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } @@ -3011,6 +3013,7 @@ ConnectionStateMachine(WorkerSession *session) * connection, clear any state associated with it. */ connection->connectionState = MULTI_CONNECTION_FAILED; + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); break; } @@ -3019,7 +3022,12 @@ ConnectionStateMachine(WorkerSession *session) ConnStatusType status = PQstatus(connection->pgConn); if (status == CONNECTION_OK) { - HandleMultiConnectionSuccess(session); + /* + * Connection was already established, possibly a cached + * connection. + */ + bool newConnection = false; + HandleMultiConnectionSuccess(session, newConnection); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); break; @@ -3027,6 +3035,7 @@ ConnectionStateMachine(WorkerSession *session) else if (status == CONNECTION_BAD) { connection->connectionState = MULTI_CONNECTION_FAILED; + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); break; } @@ -3042,6 +3051,7 @@ ConnectionStateMachine(WorkerSession *session) if (pollMode == PGRES_POLLING_FAILED) { connection->connectionState = MULTI_CONNECTION_FAILED; + IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } else if (pollMode == PGRES_POLLING_READING) { @@ -3059,7 +3069,12 @@ ConnectionStateMachine(WorkerSession *session) } else { - HandleMultiConnectionSuccess(session); + /* + * Connection was not established befoore (!= CONNECTION_OK) + * but PQconnectPoll() did so now. + */ + bool newConnection = true; + HandleMultiConnectionSuccess(session, newConnection); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); @@ -3137,6 +3152,11 @@ ConnectionStateMachine(WorkerSession *session) break; } + /* + * Here we don't increment the connection stat counter for failed + * connections because we don't track the connections that we could + * establish but lost later. + */ connection->connectionState = MULTI_CONNECTION_FAILED; break; } @@ -3299,12 +3319,12 @@ HasUnfinishedTaskForSession(WorkerSession *session) * connection's state. */ static void -HandleMultiConnectionSuccess(WorkerSession *session) +HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection) { MultiConnection *connection = session->connection; WorkerPool *workerPool = session->workerPool; - MarkConnectionConnected(connection); + MarkConnectionConnected(connection, newConnection); ereport(DEBUG4, (errmsg("established connection to %s:%d for " "session %ld in %ld microseconds", diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 7f8f827ea..64680fea1 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,6 +66,7 @@ #include "distributed/recursive_planning.h" #include "distributed/shard_utils.h" #include "distributed/shardinterval_utils.h" +#include "distributed/stat_counters.h" #include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" @@ -1433,6 +1434,8 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) if (IsMultiTaskPlan(distributedPlan)) { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + /* if it is not a single task executable plan, inform user according to the log level */ if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF) { @@ -1444,6 +1447,10 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) " queries on the workers."))); } } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } distributedPlan->queryId = localPlan->queryId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a4146062e..091e7ac44 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -105,6 +105,7 @@ #include "distributed/shardsplit_shared_memory.h" #include "distributed/shared_connection_stats.h" #include "distributed/shared_library_init.h" +#include "distributed/stat_counters.h" #include "distributed/statistics_collection.h" #include "distributed/subplan_execution.h" #include "distributed/time_constants.h" @@ -188,6 +189,7 @@ static void multi_log_hook(ErrorData *edata); static bool IsSequenceOverflowError(ErrorData *edata); static void RegisterConnectionCleanup(void); static void RegisterExternalClientBackendCounterDecrement(void); +static void RegisterCitusStatCountersFlush(void); static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); @@ -505,6 +507,11 @@ _PG_init(void) InitializeMultiTenantMonitorSMHandleManagement(); + if (IsCitusStatCountersEnabled()) + { + InitializeStatCountersArrayMem(); + } + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -615,6 +622,12 @@ citus_shmem_request(void) RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(LogicalClockShmemSize()); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); + + if (IsCitusStatCountersEnabled()) + { + RequestAddinShmemSpace(StatCountersArrayShmemSize()); + RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1); + } } @@ -787,6 +800,12 @@ StartupCitusBackend(void) SetBackendDataDatabaseId(); RegisterConnectionCleanup(); + + if (IsCitusStatCountersEnabled()) + { + RegisterCitusStatCountersFlush(); + } + FinishedStartupCitusBackend = true; } @@ -841,6 +860,23 @@ RegisterExternalClientBackendCounterDecrement(void) } +/* + * RegisterCitusStatCountersFlush registers CitusStatCountersFlushAtExit() + * to be called before the backend exits. + */ +static void +RegisterCitusStatCountersFlush(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(CitusStatCountersFlushAtExit, 0); + + registeredCleanup = true; + } +} + + /* * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the * backend for the purposes of any clean-up needed. @@ -2429,6 +2465,41 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.stat_counters_flush_interval", + gettext_noop("Sets the interval to flush the local stat counters into " + "the shared memory."), + gettext_noop("Stat counters are used to track the number of certain " + "operations in Citus. While setting this GUC to -1 disables " + "stat counters, setting it to a positive value will flush " + "the local stat counters into the shared memory every " + "interval milliseconds instead of flushing them immediately " + "after the operation, as it is done when the value is 0. " + "Higher values reduce the overhead of flushing the stat " + "counters but increase the time it takes to see the updated " + "stat counters."), + &StatCountersFlushInterval, + DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL, + DISABLE_STAT_COUNTERS_FLUSH_INTERVAL, + 5 * MS_PER_MINUTE, + PGC_POSTMASTER, + GUC_UNIT_MS, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.stat_counters_purge_interval", + gettext_noop("Determines time interval for citus_stat_counters to remove " + "the shared memory entries for the databases that were dropped."), + gettext_noop("This is automatically disabled when the stat counters are disabled " + "(citus.stat_counters_flush_interval = -1) and there is no explicit " + "way to disable purging the shared memory entries while the stat " + "counters are enabled."), + &StatCountersPurgeInterval, + DEFAULT_STAT_COUNTERS_PURGE_INTERVAL, 1, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_MS | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + /* * It takes about 140 bytes of shared memory to store one row, therefore * this setting should be used responsibly. setting it to 10M will require diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 0f70438e0..4eefdf60a 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -48,3 +48,5 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/repl_origin_helper/13.1-1.sql" #include "udfs/citus_finish_pg_upgrade/13.1-1.sql" #include "udfs/citus_is_primary_node/13.1-1.sql" +#include "udfs/citus_stat_counters/13.1-1.sql" +#include "udfs/citus_stat_counters_reset/13.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql index dd89fc249..38f195743 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql @@ -41,3 +41,7 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); #include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" + +DROP VIEW pg_catalog.citus_stat_counters; +DROP FUNCTION pg_catalog.citus_stat_counters(oid); +DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid); diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql new file mode 100644 index 000000000..ba5919a04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( + database_oid oid, + + -- Following stat counter columns must be in the same order as the + -- StatType enum defined in src/include/distributed/stat_counters.h + OUT connection_establishment_succeeded bigint, + OUT connection_establishment_failed bigint, + OUT connection_reused bigint, + + OUT query_execution_single_shard bigint, + OUT query_execution_multi_shard bigint, + + -- Must always be the last column or you should accordingly update + -- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c + OUT stats_reset timestamp with time zone +) +RETURNS SETOF RECORD +LANGUAGE C VOLATILE PARALLEL UNSAFE +AS 'MODULE_PATHNAME', $$citus_stat_counters$$; +COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database'; + +CREATE VIEW citus.citus_stat_counters AS +SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database; + +ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog; + +GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql new file mode 100644 index 000000000..ba5919a04 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( + database_oid oid, + + -- Following stat counter columns must be in the same order as the + -- StatType enum defined in src/include/distributed/stat_counters.h + OUT connection_establishment_succeeded bigint, + OUT connection_establishment_failed bigint, + OUT connection_reused bigint, + + OUT query_execution_single_shard bigint, + OUT query_execution_multi_shard bigint, + + -- Must always be the last column or you should accordingly update + -- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c + OUT stats_reset timestamp with time zone +) +RETURNS SETOF RECORD +LANGUAGE C VOLATILE PARALLEL UNSAFE +AS 'MODULE_PATHNAME', $$citus_stat_counters$$; +COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database'; + +CREATE VIEW citus.citus_stat_counters AS +SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database; + +ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog; + +GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql new file mode 100644 index 000000000..82272ab45 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0) +RETURNS VOID +LANGUAGE C VOLATILE PARALLEL UNSAFE +AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$; +COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql new file mode 100644 index 000000000..82272ab45 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0) +RETURNS VOID +LANGUAGE C VOLATILE PARALLEL UNSAFE +AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$; +COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC; diff --git a/src/backend/distributed/stat_counters.c b/src/backend/distributed/stat_counters.c new file mode 100644 index 000000000..51aa227ab --- /dev/null +++ b/src/backend/distributed/stat_counters.c @@ -0,0 +1,984 @@ +/*------------------------------------------------------------------------- + * + * stat_counters.c + * + * This file contains functions to track various statistic counters for + * Citus. + * + * Each backend increments the local counters (PendingStatCounters) and + * flushes them to the shared memory at the end of the flush interval + * (StatCountersFlushInterval). The shared memory is used to keep track + * of the stat counters across backends for all databases. + * + * We don't have a good way to enforce that we flush the pending counters + * always at the end of the flush interval because we only check for the + * interval at the time of incrementing a counter. And, if a backend exits + * before the interval elapses, then we still flush the pending counters at + * the time of exit via CitusStatCountersFlushAtExit(). + * + * Also, we store the stat counters in CITUS_STAT_COUNTERS_DUMP_FILE on + * shutdown via postmaster and restore them on startup via the first backend + * that initializes the shared memory for stat counters. + * + * And for the dropped databases, Citus maintenance daemon removes the stat + * counters from the shared memory periodically, which is controlled by + * StatCountersPurgeInterval. When the feature is disabled, removal of the + * stat counters is automatically disabled. Instead of inserting a cleanup + * record into pg_dist_cleanup, we choose to remove the stat counters via + * the maintenance daemon because inserting a cleanup record wouldn't work + * when the current database is dropped. And if there were other databases + * where Citus is installed, then they would still continue to have the + * dropped database's stat counters in the shared memory. + * + *------------------------------------------------------------------------- + */ + +#include + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" + +#include "common/hashfn.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "utils/syscache.h" + +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/stat_counters.h" + + +/* file path for dumping the stat counters */ +#define CITUS_STAT_COUNTERS_DUMP_FILE "pg_stat/citus_stat_counters.stat" +#define CITUS_STAT_COUNTERS_TMP_DUMP_FILE (CITUS_STAT_COUNTERS_DUMP_FILE ".tmp") + +/* + * Shared hash constants. + * + * The places where STAT_COUNTERS_MAX_DATABASES is used do not impose a hard + * limit on the number of databases that can be tracked but in ShmemInitHash() + * it's documented that the access efficiency will degrade if it is exceeded + * substantially. + * + * XXX: Consider using dshash_table instead of (shared) HTAB if that becomes + * a concern. + */ +#define STAT_COUNTERS_INIT_DATABASES 8 +#define STAT_COUNTERS_MAX_DATABASES 1024 + + +/* shared hash table entry */ +typedef struct StatCountersHashEntry +{ + /* must be the first field since this is used as the key */ + Oid dbId; + + pg_atomic_uint64 counters[N_CITUS_STAT_COUNTERS]; + + TimestampTz statsResetTimestamp; +} StatCountersHashEntry; + +/* shared memory state */ +typedef struct StatCountersState +{ + LWLockId lock; +} StatCountersState; + + +/* GUC value for citus.stat_counters_flush_interval */ +int StatCountersFlushInterval = DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL; + +/* GUC value for citus.stat_counters_purge_interval */ +int StatCountersPurgeInterval = DEFAULT_STAT_COUNTERS_PURGE_INTERVAL; + +/* stat counters dump file version */ +static const uint32 CITUS_STAT_COUNTERS_FILE_VERSION = 1; + +/* shared memory init */ +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +/* shared memory variables */ +static StatCountersState *CitusStatCountersSharedState = NULL; +static HTAB *CitusStatCountersSharedHash = NULL; + +/* local stat counters that are pending to be flushed */ +static uint64 PendingStatCounters[N_CITUS_STAT_COUNTERS] = { 0 }; + + +/* helper functions for citus_stat_counters() and citus_stat_counters_reset() */ +static Tuplestorestate * SetupStatCountersTuplestore(FunctionCallInfo fcinfo, + TupleDesc *tupleDescriptor); +static void StoreStatCountersForDbId(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor, + Oid dbId); +static void StoreStatCountersFromValues(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor, + pg_atomic_uint64 *counters, + TimestampTz statsResetTimestamp); +static void ResetStatCountersForDbId(Oid dbId); +static void ResetStatCounterArray(pg_atomic_uint64 *counters); + +/* helper functions to increment and flush stat counters */ +static bool ShouldByPassLocalCounters(void); +static void IncrementSharedStatCounterForMyDb(int statId); +static void FlushPendingCountersIfNeeded(bool force); + +/* shared memory init & management */ +static void CitusStatCountersShmemInit(void); +static void CitusStatCountersShmemShutdown(int code, Datum arg); + +/* shared hash utilities */ +static StatCountersHashEntry * CitusStatCountersHashEntryAllocIfNotExists(Oid dbId); + + +PG_FUNCTION_INFO_V1(citus_stat_counters); +PG_FUNCTION_INFO_V1(citus_stat_counters_reset); + + +/* + * citus_stat_counters returns the stat counters for the specified database or for + * all databases if InvalidOid is provided. + */ +Datum +citus_stat_counters(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + if (PG_ARGISNULL(0)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("database oid cannot be NULL"))); + } + + Oid dbId = PG_GETARG_OID(0); + + if (!IsCitusStatCountersEnabled()) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Citus stat counters are not enabled, consider setting " + "citus.stat_counters_flush_interval to a non-negative value " + "and restarting the server"))); + + } + + if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("shared memory is not initialized for Citus stat counters"))); + } + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupStatCountersTuplestore(fcinfo, &tupleDescriptor); + + StoreStatCountersForDbId(tupleStore, tupleDescriptor, dbId); + + PG_RETURN_VOID(); +} + + +/* + * citus_stat_counters_reset resets the stat counters for the specified database + * or for all databases if InvalidOid is provided. + */ +Datum +citus_stat_counters_reset(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + if (PG_ARGISNULL(0)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("database oid cannot be NULL"))); + } + + Oid dbId = PG_GETARG_OID(0); + + if (!IsCitusStatCountersEnabled()) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Citus stat counters are not enabled, consider setting " + "citus.stat_counters_flush_interval to a non-negative value " + "and restarting the server"))); + + } + + if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("shared memory is not initialized for Citus stat counters"))); + } + + ResetStatCountersForDbId(dbId); + + PG_RETURN_VOID(); +} + + +/* + * IsCitusStatCountersEnabled returns whether the stat counters are enabled. + */ +bool +IsCitusStatCountersEnabled(void) +{ + Assert(StatCountersFlushInterval >= DISABLE_STAT_COUNTERS_FLUSH_INTERVAL); + return StatCountersFlushInterval != DISABLE_STAT_COUNTERS_FLUSH_INTERVAL; +} + + +/* + * IncrementStatCounterForMyDb increments the stat counter for the given statId + * of the current database. + */ +void +IncrementStatCounterForMyDb(int statId) +{ + if (!IsCitusStatCountersEnabled()) + { + return; + } + + if (ShouldByPassLocalCounters()) + { + IncrementSharedStatCounterForMyDb(statId); + } + else + { + PendingStatCounters[statId]++; + + bool force = false; + FlushPendingCountersIfNeeded(force); + } +} + + +/* + * InitializeStatCountersArrayMem saves the previous shmem_startup_hook and sets + * up a new shmem_startup_hook to initialize the shared memory used for + * keeping track of stat counters across backends for all databases. + */ +void +InitializeStatCountersArrayMem(void) +{ + Assert(IsCitusStatCountersEnabled()); + +/* on the versions older than PG 15, we use shmem_request_hook_type */ +#if PG_VERSION_NUM < PG_VERSION_15 + + if (!IsUnderPostmaster) + { + RequestAddinShmemSpace(StatCountersArrayShmemSize()); + + elog(LOG, "requesting named LWLockTranch for %s", + STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME); + RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1); + } +#endif + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = CitusStatCountersShmemInit; +} + + +/* + * CitusStatCountersFlushAtExit is called on backend exit to flush the local + * stat counters to the shared memory, if there are any pending. + */ +void +CitusStatCountersFlushAtExit(int code, Datum arg) +{ + Assert(IsCitusStatCountersEnabled()); + + bool force = true; + FlushPendingCountersIfNeeded(force); +} + + +/* + * StatCountersArrayShmemSize calculates and returns shared memory size + * required to keep stat counters. + */ +Size +StatCountersArrayShmemSize(void) +{ + Size size = MAXALIGN(sizeof(StatCountersState)); + size = add_size(size, hash_estimate_size(STAT_COUNTERS_MAX_DATABASES, + sizeof(StatCountersHashEntry))); + + return size; +} + + +/* + * CitusStatCountersRemoveDroppedDatabases removes the stat counters for the + * databases that are dropped. + */ +void +CitusStatCountersRemoveDroppedDatabases(void) +{ + Assert(IsCitusStatCountersEnabled()); + + List *droppedDatabaseIdList = NIL; + + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + HASH_SEQ_STATUS hashSeqState = { 0 }; + hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); + + StatCountersHashEntry *dbEntry = NULL; + while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) + { + HeapTuple dbTuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbEntry->dbId)); + if (!HeapTupleIsValid(dbTuple)) + { + droppedDatabaseIdList = lappend_oid(droppedDatabaseIdList, dbEntry->dbId); + } + else + { + ReleaseSysCache(dbTuple); + } + } + + LWLockRelease(CitusStatCountersSharedState->lock); + + if (list_length(droppedDatabaseIdList) == 0) + { + return; + } + + /* acquire exclusive lock to quickly remove the entries */ + LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); + + Oid droppedDatabaseId = InvalidOid; + foreach_declared_oid(droppedDatabaseId, droppedDatabaseIdList) + { + hash_search(CitusStatCountersSharedHash, (void *) &droppedDatabaseId, + HASH_REMOVE, NULL); + } + + LWLockRelease(CitusStatCountersSharedState->lock); +} + + +/* + * SetupStatCountersTuplestore returns a Tuplestorestate for returning the + * stat counters and setups the provided TupleDesc. + */ +static Tuplestorestate * +SetupStatCountersTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor) +{ + ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo; + switch (get_call_result_type(fcinfo, NULL, tupleDescriptor)) + { + case TYPEFUNC_COMPOSITE: + { + /* success */ + break; + } + + case TYPEFUNC_RECORD: + { + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + } + + default: + { + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + } + + MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory; + + MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext); + + bool randomAccess = true; + bool interTransactions = false; + Tuplestorestate *tupstore = tuplestore_begin_heap(randomAccess, interTransactions, + work_mem); + + resultSet->returnMode = SFRM_Materialize; + resultSet->setResult = tupstore; + resultSet->setDesc = *tupleDescriptor; + + MemoryContextSwitchTo(oldContext); + + return tupstore; +} + + +/* + * StoreStatCountersForDbId fetches the stat counters for the specified database + * and stores them into the given tuple store. + */ +static void +StoreStatCountersForDbId(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Oid dbId) +{ + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash, + (void *) &dbId, + HASH_FIND, NULL); + if (dbEntry) + { + StoreStatCountersFromValues(tupleStore, tupleDescriptor, dbEntry->counters, + dbEntry->statsResetTimestamp); + } + else + { + /* use a zeroed entry if the database doesn't exist */ + TimestampTz zeroedTimestamp = 0; + + pg_atomic_uint64 zeroedCounters[N_CITUS_STAT_COUNTERS] = { 0 }; + ResetStatCounterArray(zeroedCounters); + + StoreStatCountersFromValues(tupleStore, tupleDescriptor, zeroedCounters, + zeroedTimestamp); + } + + LWLockRelease(CitusStatCountersSharedState->lock); +} + + +/* + * StoreStatCountersFromValues stores the stat counters stored in given + * counter array and the timestamp into the given tuple store. + * + * Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS. + */ +static void +StoreStatCountersFromValues(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, + pg_atomic_uint64 *counters, TimestampTz statsResetTimestamp) +{ + Datum values[N_CITUS_STAT_COUNTERS + 1] = { 0 }; + bool isNulls[N_CITUS_STAT_COUNTERS + 1] = { 0 }; + + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + values[statIdx] = UInt64GetDatum(pg_atomic_read_u64(&counters[statIdx])); + } + + if (statsResetTimestamp == 0) + { + isNulls[N_CITUS_STAT_COUNTERS] = true; + } + else + { + values[N_CITUS_STAT_COUNTERS] = TimestampTzGetDatum(statsResetTimestamp); + } + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); +} + + +/* + * ResetStatCountersForDbId resets the stat counters for the specified database or + * for all databases if InvalidOid is provided. + */ +static void +ResetStatCountersForDbId(Oid dbId) +{ + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + if (OidIsValid(dbId)) + { + StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash, + (void *) &dbId, + HASH_FIND, NULL); + + /* skip if we don't have an entry for this database */ + if (dbEntry) + { + ResetStatCounterArray(dbEntry->counters); + dbEntry->statsResetTimestamp = GetCurrentTimestamp(); + } + } + else + { + HASH_SEQ_STATUS hashSeqState = { 0 }; + hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); + + StatCountersHashEntry *dbEntry = NULL; + while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) + { + ResetStatCounterArray(dbEntry->counters); + dbEntry->statsResetTimestamp = GetCurrentTimestamp(); + } + } + + LWLockRelease(CitusStatCountersSharedState->lock); +} + + +/* + * ResetStatCounterArray resets the stat counters stored in the given + * counter array. + * + * Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS. + */ +static void +ResetStatCounterArray(pg_atomic_uint64 *counters) +{ + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + pg_atomic_write_u64(&counters[statIdx], 0); + } +} + + +/* + * ShouldByPassLocalCounters returns true if we should immediately increment + * the shared memory counters without buffering them in the local counters. + */ +static bool +ShouldByPassLocalCounters(void) +{ + Assert(IsCitusStatCountersEnabled()); + return StatCountersFlushInterval == 0; +} + + +/* + * IncrementSharedStatCounterForMyDb increments the stat counter for the given statId + * of the current database in the shared memory. + */ +static void +IncrementSharedStatCounterForMyDb(int statId) +{ + Oid dbId = MyDatabaseId; + + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + /* + * XXX: Can we cache the entry for the current database? + * + * From one perspective, doing so should be fine since dropping a + * database succeeds only after all the backends connected to it + * are disconnected, so we cannot have a backend that has a dangling + * entry. + * + * On the other hand, we're not sure if a potential hash table resize + * would invalidate the cached entry. + */ + StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( + CitusStatCountersSharedHash, + (void *) &dbId, + HASH_FIND, + NULL); + if (!dbEntry) + { + /* promote the lock to exclusive to insert the new entry for this database */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); + + dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); + + /* downgrade the lock to shared */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + } + + pg_atomic_fetch_add_u64(&dbEntry->counters[statId], 1); + + LWLockRelease(CitusStatCountersSharedState->lock); +} + + +/* + * FlushPendingCountersIfNeeded flushes PendingStatCounters to the shared + * memory if StatCountersFlushInterval has elapsed since the last flush + * or if force is true. + */ +static void +FlushPendingCountersIfNeeded(bool force) +{ + static instr_time LastFlushTime = { 0 }; + + if (!force) + { + if (INSTR_TIME_IS_ZERO(LastFlushTime)) + { + /* assume that the first increment is the start of the flush interval */ + INSTR_TIME_SET_CURRENT(LastFlushTime); + return; + } + + instr_time now = { 0 }; + INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SUBTRACT(now, LastFlushTime); + if (INSTR_TIME_GET_MILLISEC(now) < StatCountersFlushInterval) + { + return; + } + } + + Oid dbId = MyDatabaseId; + + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + /* XXX: Same here, can we cache the entry for the current database? */ + StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( + CitusStatCountersSharedHash, + (void *) &dbId, + HASH_FIND, + NULL); + if (!dbEntry) + { + /* promote the lock to exclusive to insert the new entry for this database */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); + + dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); + + /* downgrade the lock to shared */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + } + + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + if (PendingStatCounters[statIdx] == 0) + { + /* nothing to flush for this stat, avoid unnecessary lock contention */ + continue; + } + + pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], + PendingStatCounters[statIdx]); + + PendingStatCounters[statIdx] = 0; + } + + LWLockRelease(CitusStatCountersSharedState->lock); + + INSTR_TIME_SET_CURRENT(LastFlushTime); +} + + +/* + * CitusStatCountersShmemInit initializes the shared memory used + * for keeping track of stat counters and restores the stat counters from + * the dump file. + */ +static void +CitusStatCountersShmemInit(void) +{ + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } + + Assert(IsCitusStatCountersEnabled()); + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + bool alreadyInitialized = false; + CitusStatCountersSharedState = ShmemInitStruct(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, + sizeof(StatCountersState), + &alreadyInitialized); + + + if (!alreadyInitialized) + { + CitusStatCountersSharedState->lock = &(GetNamedLWLockTranche( + STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME)) + ->lock; + } + + HASHCTL hashInfo = { + .keysize = sizeof(Oid), + .entrysize = sizeof(StatCountersHashEntry), + .hash = uint32_hash, + }; + + CitusStatCountersSharedHash = ShmemInitHash("Citus stat counters Hash", + STAT_COUNTERS_INIT_DATABASES, + STAT_COUNTERS_MAX_DATABASES, &hashInfo, + HASH_ELEM | HASH_FUNCTION); + + LWLockRelease(AddinShmemInitLock); + + if (!IsUnderPostmaster) + { + /* postmaster dumps the stat counters on shutdown */ + on_shmem_exit(CitusStatCountersShmemShutdown, (Datum) 0); + } + + if (alreadyInitialized) + { + /* + * alreadyInitialized being true doesn't necessarily mean that the + * backend that initialized the shared memory has completed restoring + * the stat counters from the dump file. + * + * However, it's still fine to return here as commented below. + */ + return; + } + + /* + * We know that only one backend can be here at this point, so we're + * the only one that restores the stat counters from the dump file. + * + * However, we don't block other backends from accessing the shared + * memory while we're restoring the stat counters - need to be careful. + */ + FILE *file = AllocateFile(CITUS_STAT_COUNTERS_DUMP_FILE, PG_BINARY_R); + if (file == NULL) + { + if (errno == ENOENT) + { + /* ignore not-found error */ + return; + } + goto error; + } + + uint32 fileVersion = 0; + if (fread(&fileVersion, sizeof(uint32), 1, file) != 1) + { + goto error; + } + + /* check if version is correct, atm we only have one */ + if (fileVersion != CITUS_STAT_COUNTERS_FILE_VERSION) + { + ereport(LOG, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("citus_stat_counters file version mismatch: expected %u, got %u", + CITUS_STAT_COUNTERS_FILE_VERSION, fileVersion))); + + goto error; + } + + /* get number of entries */ + int32 numEntries = 0; + if (fread(&numEntries, sizeof(int32), 1, file) != 1) + { + goto error; + } + + for (int i = 0; i < numEntries; i++) + { + Oid dbId = InvalidOid; + if (fread(&dbId, sizeof(Oid), 1, file) != 1) + { + goto error; + } + + uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 }; + if (fread(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) != + N_CITUS_STAT_COUNTERS) + { + goto error; + } + + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( + CitusStatCountersSharedHash, + (void *) &dbId, + HASH_FIND, + NULL); + if (!dbEntry) + { + /* promote the lock to exclusive to insert the new entry for this database */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); + + dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); + + /* downgrade the lock to shared */ + LWLockRelease(CitusStatCountersSharedState->lock); + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + } + + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + /* + * If no other backend has created the entry for us, then we do that + * via the above call made to CitusStatCountersHashEntryAllocIfNotExists() + * and there we init the counters if we just created the entry. + * + * So here we always "fetch_add" instead of "write" or "init" to avoid + * overwriting the counters-increments made by other backends so far, if + * it's not us who created the entry. + */ + pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], statCounters[statIdx]); + } + + LWLockRelease(CitusStatCountersSharedState->lock); + } + + FreeFile(file); + + /* + * Remove the file so it's not included in backups/replicas, etc. A new file will be + * written on next shutdown. + */ + unlink(CITUS_STAT_COUNTERS_DUMP_FILE); + + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read citus_stat_counters file \"%s\": %m", + CITUS_STAT_COUNTERS_DUMP_FILE))); + + if (file) + { + FreeFile(file); + } + + /* delete bogus file, don't care of errors in this case */ + unlink(CITUS_STAT_COUNTERS_DUMP_FILE); +} + + +/* + * CitusStatCountersShmemShutdown is called on shutdown by postmaster to dump the + * stat counters to CITUS_STAT_COUNTERS_DUMP_FILE. + */ +static void +CitusStatCountersShmemShutdown(int code, Datum arg) +{ + /* don't try to dump during a crash */ + if (code) + { + return; + } + + Assert(IsCitusStatCountersEnabled()); + + LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); + + FILE *file = AllocateFile(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, PG_BINARY_W); + if (file == NULL) + { + goto error; + } + + if (fwrite(&CITUS_STAT_COUNTERS_FILE_VERSION, sizeof(uint32), 1, file) != 1) + { + goto error; + } + + int32 numEntries = hash_get_num_entries(CitusStatCountersSharedHash); + if (fwrite(&numEntries, sizeof(int32), 1, file) != 1) + { + goto error; + } + + StatCountersHashEntry *dbEntry = NULL; + + HASH_SEQ_STATUS hashSeqState = { 0 }; + hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); + while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) + { + if (fwrite(&dbEntry->dbId, sizeof(Oid), 1, file) != 1) + { + /* we assume hash_seq_term won't change errno */ + hash_seq_term(&hashSeqState); + goto error; + } + + uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 }; + + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + statCounters[statIdx] = pg_atomic_read_u64(&dbEntry->counters[statIdx]); + } + + if (fwrite(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) != + N_CITUS_STAT_COUNTERS) + { + /* we assume hash_seq_term won't change errno */ + hash_seq_term(&hashSeqState); + goto error; + } + } + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* rename the file inplace */ + if (rename(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, CITUS_STAT_COUNTERS_DUMP_FILE) != 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not rename citus_stat_counters file \"%s\" to \"%s\": %m", + CITUS_STAT_COUNTERS_TMP_DUMP_FILE, + CITUS_STAT_COUNTERS_DUMP_FILE))); + } + + LWLockRelease(CitusStatCountersSharedState->lock); + + return; + +error: + + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write citus_stat_counters file \"%s\": %m", + CITUS_STAT_COUNTERS_TMP_DUMP_FILE))); + + if (file) + { + FreeFile(file); + } + + LWLockRelease(CitusStatCountersSharedState->lock); + + unlink(CITUS_STAT_COUNTERS_DUMP_FILE); +} + + +/* + * CitusStatCountersHashEntryAllocIfNotExists allocates a new entry in the + * stat counters hash table if it doesn't already exist. + * + * Assumes that the caller has exclusive access to the hash table. + */ +static StatCountersHashEntry * +CitusStatCountersHashEntryAllocIfNotExists(Oid dbId) +{ + bool councurrentlyInserted = false; + StatCountersHashEntry *dbEntry = + (StatCountersHashEntry *) hash_search(CitusStatCountersSharedHash, (void *) &dbId, + HASH_ENTER, + &councurrentlyInserted); + + /* + * Initialize the counters to 0 if someone else didn't race while we + * were promoting the lock. + */ + if (!councurrentlyInserted) + { + for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + { + pg_atomic_init_u64(&dbEntry->counters[statIdx], 0); + } + + dbEntry->statsResetTimestamp = 0; + } + + return dbEntry; +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9cef13539..088fa9bdb 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -57,6 +57,7 @@ #include "distributed/query_stats.h" #include "distributed/resource_lock.h" #include "distributed/shard_cleaner.h" +#include "distributed/stat_counters.h" #include "distributed/statistics_collection.h" #include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" @@ -489,6 +490,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz lastRecoveryTime = 0; TimestampTz lastShardCleanTime = 0; TimestampTz lastStatStatementsPurgeTime = 0; + TimestampTz lastStatCountersPurgeTime = 0; TimestampTz nextMetadataSyncTime = 0; /* state kept for the background tasks queue monitor */ @@ -819,6 +821,36 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); } + if (IsCitusStatCountersEnabled() && + TimestampDifferenceExceeds(lastStatCountersPurgeTime, GetCurrentTimestamp(), + StatCountersPurgeInterval)) + { + StartTransactionCommand(); + + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping to purge the stat counter " + "shared memory entries for dropped " + "databases"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + /* + * Record last time we perform the purge to ensure we run once per + * StatCountersPurgeInterval. + */ + lastStatCountersPurgeTime = GetCurrentTimestamp(); + + CitusStatCountersRemoveDroppedDatabases(); + } + + CommitTransactionCommand(); + + /* make sure we don't wait too long */ + timeout = Min(timeout, (StatCountersPurgeInterval)); + } + pid_t backgroundTaskQueueWorkerPid = 0; BgwHandleStatus backgroundTaskQueueWorkerStatus = backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid( diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index d93e4483a..c742eb4e9 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); -extern void MarkConnectionConnected(MultiConnection *connection); +extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection); /* waiteventset utilities */ extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, diff --git a/src/include/distributed/stat_counters.h b/src/include/distributed/stat_counters.h new file mode 100644 index 000000000..4dc13a091 --- /dev/null +++ b/src/include/distributed/stat_counters.h @@ -0,0 +1,53 @@ +/*------------------------------------------------------------------------- + * + * stat_counters.h + * + * This file contains the exported functions to track various statistic + * counters for Citus. + * + * ------------------------------------------------------------------------- + */ + +#ifndef STAT_COUNTERS_H +#define STAT_COUNTERS_H + +#include + +#include "distributed/time_constants.h" + +#define STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME "citus_stat_counters_lock_tranche" + +#define DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL (30 * MS_PER_SECOND) +#define DISABLE_STAT_COUNTERS_FLUSH_INTERVAL -1 + +#define DEFAULT_STAT_COUNTERS_PURGE_INTERVAL (5 * MS_PER_MINUTE) + +/* + * Must be in the same order as the columns defined in citus_stat_counters view, + * see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql + */ +typedef enum +{ + STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED, + STAT_CONNECTION_ESTABLISHMENT_FAILED, + STAT_CONNECTION_REUSED, + + STAT_QUERY_EXECUTION_SINGLE_SHARD, + STAT_QUERY_EXECUTION_MULTI_SHARD, + + /* do not use this and ensure it is the last entry */ + N_CITUS_STAT_COUNTERS +} StatType; + +extern int StatCountersFlushInterval; +extern int StatCountersPurgeInterval; + +extern bool IsCitusStatCountersEnabled(void); +extern void IncrementStatCounterForMyDb(int statId); + +extern void InitializeStatCountersArrayMem(void); +extern void CitusStatCountersFlushAtExit(int code, Datum arg); +extern Size StatCountersArrayShmemSize(void); +extern void CitusStatCountersRemoveDroppedDatabases(void); + +#endif /* STAT_COUNTERS_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 51b2be416..88104a46a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1480,8 +1480,11 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal.update_placement_metadata(bigint,integer,integer) void | function citus_internal.update_relation_colocation(oid,integer) void | function citus_is_primary_node() boolean + | function citus_stat_counters(oid) SETOF record + | function citus_stat_counters_reset(oid) void | function citus_unmark_object_distributed(oid,oid,integer,boolean) void -(27 rows) + | view citus_stat_counters +(30 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 048e86c67..70c71093d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -179,6 +179,8 @@ ORDER BY 1; function citus_shards_on_worker() function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) function citus_stat_activity() + function citus_stat_counters(oid) + function citus_stat_counters_reset(oid) function citus_stat_statements() function citus_stat_statements_reset() function citus_stat_tenants(boolean) @@ -384,11 +386,12 @@ ORDER BY 1; view citus_shards view citus_shards_on_worker view citus_stat_activity + view citus_stat_counters view citus_stat_statements view citus_stat_tenants view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(358 rows) +(361 rows) DROP TABLE extension_basic_types;