diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 473d098f2..4787d8f2f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -39,7 +39,6 @@ #include "distributed/remote_commands.h" #include "distributed/run_from_same_connection.h" #include "distributed/shared_connection_stats.h" -#include "distributed/stat_counters.h" #include "distributed/time_constants.h" #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" @@ -355,18 +354,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = FindAvailableConnection(entry->connections, flags); if (connection) { - /* - * Increment the connection stat counter for the connections that are - * reused only if the connection is in a good state. Here we don't - * bother shutting down the connection or such if it is not in a good - * state but we mostly want to avoid incrementing the connection stat - * counter for a connection that the caller cannot really use. - */ - if (PQstatus(connection->pgConn) == CONNECTION_OK) - { - IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED); - } - return connection; } } @@ -408,12 +395,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, dlist_delete(&connection->connectionNode); pfree(connection); - /* - * Here we don't increment the connection stat counter for the optional - * connections that we gave up establishing due to connection throttling - * because the callers who request optional connections know how to - * survive without them. - */ return NULL; } } @@ -1045,11 +1026,6 @@ FinishConnectionListEstablishment(List *multiConnectionList) if (event->events & WL_POSTMASTER_DEATH) { - /* - * Here we don't increment the connection stat counter for the - * optional failed connections because this is not a connection - * failure, but a postmaster death in the local node. - */ ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); } @@ -1066,12 +1042,6 @@ FinishConnectionListEstablishment(List *multiConnectionList) * reset the memory context */ MemoryContextDelete(MemoryContextSwitchTo(oldContext)); - - /* - * Similarly, we don't increment the connection stat counter for the - * failed connections here because this is not a connection failure - * but a cancellation request is received. - */ return; } @@ -1102,7 +1072,6 @@ FinishConnectionListEstablishment(List *multiConnectionList) eventMask, NULL); if (!success) { - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection establishment for node %s:%d " "failed", connection->hostname, @@ -1119,15 +1088,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) */ if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED) { - /* - * Since WaitEventSetFromMultiConnectionStates() only adds the - * connections that we haven't completed the connection - * establishment yet, here we always have a new connection. - * In other words, at this point, we surely know that we're - * not dealing with a cached connection. - */ - bool newConnection = true; - MarkConnectionConnected(connectionState->connection, newConnection); + MarkConnectionConnected(connectionState->connection); } } } @@ -1211,8 +1172,6 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) /* close connection, otherwise we take up resource on the other side */ CitusPQFinish(connection); - - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } } @@ -1625,7 +1584,7 @@ RemoteTransactionIdle(MultiConnection *connection) * establishment time when necessary. */ void -MarkConnectionConnected(MultiConnection *connection, bool newConnection) +MarkConnectionConnected(MultiConnection *connection) { connection->connectionState = MULTI_CONNECTION_CONNECTED; @@ -1633,11 +1592,6 @@ MarkConnectionConnected(MultiConnection *connection, bool newConnection) { INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd); } - - if (newConnection) - { - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED); - } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9d5b9c407..f276b3df1 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -171,7 +171,6 @@ #include "distributed/repartition_join_execution.h" #include "distributed/resource_lock.h" #include "distributed/shared_connection_stats.h" -#include "distributed/stat_counters.h" #include "distributed/subplan_execution.h" #include "distributed/transaction_identifier.h" #include "distributed/transaction_management.h" @@ -691,7 +690,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution, WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); static bool HasUnfinishedTaskForSession(WorkerSession *session); -static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection); +static void HandleMultiConnectionSuccess(WorkerSession *session); static bool HasAnyConnectionFailure(WorkerPool *workerPool); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static bool TransactionModifiedDistributedTable(DistributedExecution *execution); @@ -2036,7 +2035,6 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution) else { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } @@ -3013,7 +3011,6 @@ ConnectionStateMachine(WorkerSession *session) * connection, clear any state associated with it. */ connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); break; } @@ -3022,12 +3019,7 @@ ConnectionStateMachine(WorkerSession *session) ConnStatusType status = PQstatus(connection->pgConn); if (status == CONNECTION_OK) { - /* - * Connection was already established, possibly a cached - * connection. - */ - bool newConnection = false; - HandleMultiConnectionSuccess(session, newConnection); + HandleMultiConnectionSuccess(session); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); break; @@ -3035,7 +3027,6 @@ ConnectionStateMachine(WorkerSession *session) else if (status == CONNECTION_BAD) { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); break; } @@ -3051,7 +3042,6 @@ ConnectionStateMachine(WorkerSession *session) if (pollMode == PGRES_POLLING_FAILED) { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); } else if (pollMode == PGRES_POLLING_READING) { @@ -3069,12 +3059,7 @@ ConnectionStateMachine(WorkerSession *session) } else { - /* - * Connection was not established befoore (!= CONNECTION_OK) - * but PQconnectPoll() did so now. - */ - bool newConnection = true; - HandleMultiConnectionSuccess(session, newConnection); + HandleMultiConnectionSuccess(session); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); @@ -3152,11 +3137,6 @@ ConnectionStateMachine(WorkerSession *session) break; } - /* - * Here we don't increment the connection stat counter for failed - * connections because we don't track the connections that we could - * establish but lost later. - */ connection->connectionState = MULTI_CONNECTION_FAILED; break; } @@ -3319,12 +3299,12 @@ HasUnfinishedTaskForSession(WorkerSession *session) * connection's state. */ static void -HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection) +HandleMultiConnectionSuccess(WorkerSession *session) { MultiConnection *connection = session->connection; WorkerPool *workerPool = session->workerPool; - MarkConnectionConnected(connection, newConnection); + MarkConnectionConnected(connection); ereport(DEBUG4, (errmsg("established connection to %s:%d for " "session %ld in %ld microseconds", diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 64680fea1..7f8f827ea 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -66,7 +66,6 @@ #include "distributed/recursive_planning.h" #include "distributed/shard_utils.h" #include "distributed/shardinterval_utils.h" -#include "distributed/stat_counters.h" #include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" @@ -1434,8 +1433,6 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) if (IsMultiTaskPlan(distributedPlan)) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - /* if it is not a single task executable plan, inform user according to the log level */ if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF) { @@ -1447,10 +1444,6 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) " queries on the workers."))); } } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } distributedPlan->queryId = localPlan->queryId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 091e7ac44..a4146062e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -105,7 +105,6 @@ #include "distributed/shardsplit_shared_memory.h" #include "distributed/shared_connection_stats.h" #include "distributed/shared_library_init.h" -#include "distributed/stat_counters.h" #include "distributed/statistics_collection.h" #include "distributed/subplan_execution.h" #include "distributed/time_constants.h" @@ -189,7 +188,6 @@ static void multi_log_hook(ErrorData *edata); static bool IsSequenceOverflowError(ErrorData *edata); static void RegisterConnectionCleanup(void); static void RegisterExternalClientBackendCounterDecrement(void); -static void RegisterCitusStatCountersFlush(void); static void CitusCleanupConnectionsAtExit(int code, Datum arg); static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg); static void CreateRequiredDirectories(void); @@ -507,11 +505,6 @@ _PG_init(void) InitializeMultiTenantMonitorSMHandleManagement(); - if (IsCitusStatCountersEnabled()) - { - InitializeStatCountersArrayMem(); - } - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -622,12 +615,6 @@ citus_shmem_request(void) RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(LogicalClockShmemSize()); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); - - if (IsCitusStatCountersEnabled()) - { - RequestAddinShmemSpace(StatCountersArrayShmemSize()); - RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1); - } } @@ -800,12 +787,6 @@ StartupCitusBackend(void) SetBackendDataDatabaseId(); RegisterConnectionCleanup(); - - if (IsCitusStatCountersEnabled()) - { - RegisterCitusStatCountersFlush(); - } - FinishedStartupCitusBackend = true; } @@ -860,23 +841,6 @@ RegisterExternalClientBackendCounterDecrement(void) } -/* - * RegisterCitusStatCountersFlush registers CitusStatCountersFlushAtExit() - * to be called before the backend exits. - */ -static void -RegisterCitusStatCountersFlush(void) -{ - static bool registeredCleanup = false; - if (registeredCleanup == false) - { - before_shmem_exit(CitusStatCountersFlushAtExit, 0); - - registeredCleanup = true; - } -} - - /* * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the * backend for the purposes of any clean-up needed. @@ -2465,41 +2429,6 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); - DefineCustomIntVariable( - "citus.stat_counters_flush_interval", - gettext_noop("Sets the interval to flush the local stat counters into " - "the shared memory."), - gettext_noop("Stat counters are used to track the number of certain " - "operations in Citus. While setting this GUC to -1 disables " - "stat counters, setting it to a positive value will flush " - "the local stat counters into the shared memory every " - "interval milliseconds instead of flushing them immediately " - "after the operation, as it is done when the value is 0. " - "Higher values reduce the overhead of flushing the stat " - "counters but increase the time it takes to see the updated " - "stat counters."), - &StatCountersFlushInterval, - DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL, - DISABLE_STAT_COUNTERS_FLUSH_INTERVAL, - 5 * MS_PER_MINUTE, - PGC_POSTMASTER, - GUC_UNIT_MS, - NULL, NULL, NULL); - - DefineCustomIntVariable( - "citus.stat_counters_purge_interval", - gettext_noop("Determines time interval for citus_stat_counters to remove " - "the shared memory entries for the databases that were dropped."), - gettext_noop("This is automatically disabled when the stat counters are disabled " - "(citus.stat_counters_flush_interval = -1) and there is no explicit " - "way to disable purging the shared memory entries while the stat " - "counters are enabled."), - &StatCountersPurgeInterval, - DEFAULT_STAT_COUNTERS_PURGE_INTERVAL, 1, INT_MAX, - PGC_SIGHUP, - GUC_UNIT_MS | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, - NULL, NULL, NULL); - /* * It takes about 140 bytes of shared memory to store one row, therefore * this setting should be used responsibly. setting it to 10M will require diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 4eefdf60a..0f70438e0 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -48,5 +48,3 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/repl_origin_helper/13.1-1.sql" #include "udfs/citus_finish_pg_upgrade/13.1-1.sql" #include "udfs/citus_is_primary_node/13.1-1.sql" -#include "udfs/citus_stat_counters/13.1-1.sql" -#include "udfs/citus_stat_counters_reset/13.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql index 38f195743..dd89fc249 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql @@ -41,7 +41,3 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); #include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" - -DROP VIEW pg_catalog.citus_stat_counters; -DROP FUNCTION pg_catalog.citus_stat_counters(oid); -DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid); diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql deleted file mode 100644 index ba5919a04..000000000 --- a/src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( - database_oid oid, - - -- Following stat counter columns must be in the same order as the - -- StatType enum defined in src/include/distributed/stat_counters.h - OUT connection_establishment_succeeded bigint, - OUT connection_establishment_failed bigint, - OUT connection_reused bigint, - - OUT query_execution_single_shard bigint, - OUT query_execution_multi_shard bigint, - - -- Must always be the last column or you should accordingly update - -- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c - OUT stats_reset timestamp with time zone -) -RETURNS SETOF RECORD -LANGUAGE C VOLATILE PARALLEL UNSAFE -AS 'MODULE_PATHNAME', $$citus_stat_counters$$; -COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database'; - -CREATE VIEW citus.citus_stat_counters AS -SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database; - -ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog; - -GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql deleted file mode 100644 index ba5919a04..000000000 --- a/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( - database_oid oid, - - -- Following stat counter columns must be in the same order as the - -- StatType enum defined in src/include/distributed/stat_counters.h - OUT connection_establishment_succeeded bigint, - OUT connection_establishment_failed bigint, - OUT connection_reused bigint, - - OUT query_execution_single_shard bigint, - OUT query_execution_multi_shard bigint, - - -- Must always be the last column or you should accordingly update - -- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c - OUT stats_reset timestamp with time zone -) -RETURNS SETOF RECORD -LANGUAGE C VOLATILE PARALLEL UNSAFE -AS 'MODULE_PATHNAME', $$citus_stat_counters$$; -COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database'; - -CREATE VIEW citus.citus_stat_counters AS -SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database; - -ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog; - -GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql deleted file mode 100644 index 82272ab45..000000000 --- a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0) -RETURNS VOID -LANGUAGE C VOLATILE PARALLEL UNSAFE -AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$; -COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0'; - -REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql deleted file mode 100644 index 82272ab45..000000000 --- a/src/backend/distributed/sql/udfs/citus_stat_counters_reset/latest.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0) -RETURNS VOID -LANGUAGE C VOLATILE PARALLEL UNSAFE -AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$; -COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0'; - -REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC; diff --git a/src/backend/distributed/stat_counters.c b/src/backend/distributed/stat_counters.c deleted file mode 100644 index 51aa227ab..000000000 --- a/src/backend/distributed/stat_counters.c +++ /dev/null @@ -1,984 +0,0 @@ -/*------------------------------------------------------------------------- - * - * stat_counters.c - * - * This file contains functions to track various statistic counters for - * Citus. - * - * Each backend increments the local counters (PendingStatCounters) and - * flushes them to the shared memory at the end of the flush interval - * (StatCountersFlushInterval). The shared memory is used to keep track - * of the stat counters across backends for all databases. - * - * We don't have a good way to enforce that we flush the pending counters - * always at the end of the flush interval because we only check for the - * interval at the time of incrementing a counter. And, if a backend exits - * before the interval elapses, then we still flush the pending counters at - * the time of exit via CitusStatCountersFlushAtExit(). - * - * Also, we store the stat counters in CITUS_STAT_COUNTERS_DUMP_FILE on - * shutdown via postmaster and restore them on startup via the first backend - * that initializes the shared memory for stat counters. - * - * And for the dropped databases, Citus maintenance daemon removes the stat - * counters from the shared memory periodically, which is controlled by - * StatCountersPurgeInterval. When the feature is disabled, removal of the - * stat counters is automatically disabled. Instead of inserting a cleanup - * record into pg_dist_cleanup, we choose to remove the stat counters via - * the maintenance daemon because inserting a cleanup record wouldn't work - * when the current database is dropped. And if there were other databases - * where Citus is installed, then they would still continue to have the - * dropped database's stat counters in the shared memory. - * - *------------------------------------------------------------------------- - */ - -#include - -#include "postgres.h" - -#include "funcapi.h" -#include "miscadmin.h" - -#include "common/hashfn.h" -#include "storage/ipc.h" -#include "storage/lwlock.h" -#include "storage/shmem.h" -#include "utils/syscache.h" - -#include "distributed/listutils.h" -#include "distributed/metadata_cache.h" -#include "distributed/stat_counters.h" - - -/* file path for dumping the stat counters */ -#define CITUS_STAT_COUNTERS_DUMP_FILE "pg_stat/citus_stat_counters.stat" -#define CITUS_STAT_COUNTERS_TMP_DUMP_FILE (CITUS_STAT_COUNTERS_DUMP_FILE ".tmp") - -/* - * Shared hash constants. - * - * The places where STAT_COUNTERS_MAX_DATABASES is used do not impose a hard - * limit on the number of databases that can be tracked but in ShmemInitHash() - * it's documented that the access efficiency will degrade if it is exceeded - * substantially. - * - * XXX: Consider using dshash_table instead of (shared) HTAB if that becomes - * a concern. - */ -#define STAT_COUNTERS_INIT_DATABASES 8 -#define STAT_COUNTERS_MAX_DATABASES 1024 - - -/* shared hash table entry */ -typedef struct StatCountersHashEntry -{ - /* must be the first field since this is used as the key */ - Oid dbId; - - pg_atomic_uint64 counters[N_CITUS_STAT_COUNTERS]; - - TimestampTz statsResetTimestamp; -} StatCountersHashEntry; - -/* shared memory state */ -typedef struct StatCountersState -{ - LWLockId lock; -} StatCountersState; - - -/* GUC value for citus.stat_counters_flush_interval */ -int StatCountersFlushInterval = DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL; - -/* GUC value for citus.stat_counters_purge_interval */ -int StatCountersPurgeInterval = DEFAULT_STAT_COUNTERS_PURGE_INTERVAL; - -/* stat counters dump file version */ -static const uint32 CITUS_STAT_COUNTERS_FILE_VERSION = 1; - -/* shared memory init */ -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; - -/* shared memory variables */ -static StatCountersState *CitusStatCountersSharedState = NULL; -static HTAB *CitusStatCountersSharedHash = NULL; - -/* local stat counters that are pending to be flushed */ -static uint64 PendingStatCounters[N_CITUS_STAT_COUNTERS] = { 0 }; - - -/* helper functions for citus_stat_counters() and citus_stat_counters_reset() */ -static Tuplestorestate * SetupStatCountersTuplestore(FunctionCallInfo fcinfo, - TupleDesc *tupleDescriptor); -static void StoreStatCountersForDbId(Tuplestorestate *tupleStore, - TupleDesc tupleDescriptor, - Oid dbId); -static void StoreStatCountersFromValues(Tuplestorestate *tupleStore, - TupleDesc tupleDescriptor, - pg_atomic_uint64 *counters, - TimestampTz statsResetTimestamp); -static void ResetStatCountersForDbId(Oid dbId); -static void ResetStatCounterArray(pg_atomic_uint64 *counters); - -/* helper functions to increment and flush stat counters */ -static bool ShouldByPassLocalCounters(void); -static void IncrementSharedStatCounterForMyDb(int statId); -static void FlushPendingCountersIfNeeded(bool force); - -/* shared memory init & management */ -static void CitusStatCountersShmemInit(void); -static void CitusStatCountersShmemShutdown(int code, Datum arg); - -/* shared hash utilities */ -static StatCountersHashEntry * CitusStatCountersHashEntryAllocIfNotExists(Oid dbId); - - -PG_FUNCTION_INFO_V1(citus_stat_counters); -PG_FUNCTION_INFO_V1(citus_stat_counters_reset); - - -/* - * citus_stat_counters returns the stat counters for the specified database or for - * all databases if InvalidOid is provided. - */ -Datum -citus_stat_counters(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - if (PG_ARGISNULL(0)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("database oid cannot be NULL"))); - } - - Oid dbId = PG_GETARG_OID(0); - - if (!IsCitusStatCountersEnabled()) - { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("Citus stat counters are not enabled, consider setting " - "citus.stat_counters_flush_interval to a non-negative value " - "and restarting the server"))); - - } - - if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("shared memory is not initialized for Citus stat counters"))); - } - - TupleDesc tupleDescriptor = NULL; - Tuplestorestate *tupleStore = SetupStatCountersTuplestore(fcinfo, &tupleDescriptor); - - StoreStatCountersForDbId(tupleStore, tupleDescriptor, dbId); - - PG_RETURN_VOID(); -} - - -/* - * citus_stat_counters_reset resets the stat counters for the specified database - * or for all databases if InvalidOid is provided. - */ -Datum -citus_stat_counters_reset(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - if (PG_ARGISNULL(0)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("database oid cannot be NULL"))); - } - - Oid dbId = PG_GETARG_OID(0); - - if (!IsCitusStatCountersEnabled()) - { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("Citus stat counters are not enabled, consider setting " - "citus.stat_counters_flush_interval to a non-negative value " - "and restarting the server"))); - - } - - if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("shared memory is not initialized for Citus stat counters"))); - } - - ResetStatCountersForDbId(dbId); - - PG_RETURN_VOID(); -} - - -/* - * IsCitusStatCountersEnabled returns whether the stat counters are enabled. - */ -bool -IsCitusStatCountersEnabled(void) -{ - Assert(StatCountersFlushInterval >= DISABLE_STAT_COUNTERS_FLUSH_INTERVAL); - return StatCountersFlushInterval != DISABLE_STAT_COUNTERS_FLUSH_INTERVAL; -} - - -/* - * IncrementStatCounterForMyDb increments the stat counter for the given statId - * of the current database. - */ -void -IncrementStatCounterForMyDb(int statId) -{ - if (!IsCitusStatCountersEnabled()) - { - return; - } - - if (ShouldByPassLocalCounters()) - { - IncrementSharedStatCounterForMyDb(statId); - } - else - { - PendingStatCounters[statId]++; - - bool force = false; - FlushPendingCountersIfNeeded(force); - } -} - - -/* - * InitializeStatCountersArrayMem saves the previous shmem_startup_hook and sets - * up a new shmem_startup_hook to initialize the shared memory used for - * keeping track of stat counters across backends for all databases. - */ -void -InitializeStatCountersArrayMem(void) -{ - Assert(IsCitusStatCountersEnabled()); - -/* on the versions older than PG 15, we use shmem_request_hook_type */ -#if PG_VERSION_NUM < PG_VERSION_15 - - if (!IsUnderPostmaster) - { - RequestAddinShmemSpace(StatCountersArrayShmemSize()); - - elog(LOG, "requesting named LWLockTranch for %s", - STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME); - RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1); - } -#endif - - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = CitusStatCountersShmemInit; -} - - -/* - * CitusStatCountersFlushAtExit is called on backend exit to flush the local - * stat counters to the shared memory, if there are any pending. - */ -void -CitusStatCountersFlushAtExit(int code, Datum arg) -{ - Assert(IsCitusStatCountersEnabled()); - - bool force = true; - FlushPendingCountersIfNeeded(force); -} - - -/* - * StatCountersArrayShmemSize calculates and returns shared memory size - * required to keep stat counters. - */ -Size -StatCountersArrayShmemSize(void) -{ - Size size = MAXALIGN(sizeof(StatCountersState)); - size = add_size(size, hash_estimate_size(STAT_COUNTERS_MAX_DATABASES, - sizeof(StatCountersHashEntry))); - - return size; -} - - -/* - * CitusStatCountersRemoveDroppedDatabases removes the stat counters for the - * databases that are dropped. - */ -void -CitusStatCountersRemoveDroppedDatabases(void) -{ - Assert(IsCitusStatCountersEnabled()); - - List *droppedDatabaseIdList = NIL; - - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - HASH_SEQ_STATUS hashSeqState = { 0 }; - hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); - - StatCountersHashEntry *dbEntry = NULL; - while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) - { - HeapTuple dbTuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbEntry->dbId)); - if (!HeapTupleIsValid(dbTuple)) - { - droppedDatabaseIdList = lappend_oid(droppedDatabaseIdList, dbEntry->dbId); - } - else - { - ReleaseSysCache(dbTuple); - } - } - - LWLockRelease(CitusStatCountersSharedState->lock); - - if (list_length(droppedDatabaseIdList) == 0) - { - return; - } - - /* acquire exclusive lock to quickly remove the entries */ - LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); - - Oid droppedDatabaseId = InvalidOid; - foreach_declared_oid(droppedDatabaseId, droppedDatabaseIdList) - { - hash_search(CitusStatCountersSharedHash, (void *) &droppedDatabaseId, - HASH_REMOVE, NULL); - } - - LWLockRelease(CitusStatCountersSharedState->lock); -} - - -/* - * SetupStatCountersTuplestore returns a Tuplestorestate for returning the - * stat counters and setups the provided TupleDesc. - */ -static Tuplestorestate * -SetupStatCountersTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor) -{ - ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo; - switch (get_call_result_type(fcinfo, NULL, tupleDescriptor)) - { - case TYPEFUNC_COMPOSITE: - { - /* success */ - break; - } - - case TYPEFUNC_RECORD: - { - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - } - - default: - { - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - } - - MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory; - - MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext); - - bool randomAccess = true; - bool interTransactions = false; - Tuplestorestate *tupstore = tuplestore_begin_heap(randomAccess, interTransactions, - work_mem); - - resultSet->returnMode = SFRM_Materialize; - resultSet->setResult = tupstore; - resultSet->setDesc = *tupleDescriptor; - - MemoryContextSwitchTo(oldContext); - - return tupstore; -} - - -/* - * StoreStatCountersForDbId fetches the stat counters for the specified database - * and stores them into the given tuple store. - */ -static void -StoreStatCountersForDbId(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Oid dbId) -{ - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash, - (void *) &dbId, - HASH_FIND, NULL); - if (dbEntry) - { - StoreStatCountersFromValues(tupleStore, tupleDescriptor, dbEntry->counters, - dbEntry->statsResetTimestamp); - } - else - { - /* use a zeroed entry if the database doesn't exist */ - TimestampTz zeroedTimestamp = 0; - - pg_atomic_uint64 zeroedCounters[N_CITUS_STAT_COUNTERS] = { 0 }; - ResetStatCounterArray(zeroedCounters); - - StoreStatCountersFromValues(tupleStore, tupleDescriptor, zeroedCounters, - zeroedTimestamp); - } - - LWLockRelease(CitusStatCountersSharedState->lock); -} - - -/* - * StoreStatCountersFromValues stores the stat counters stored in given - * counter array and the timestamp into the given tuple store. - * - * Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS. - */ -static void -StoreStatCountersFromValues(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, - pg_atomic_uint64 *counters, TimestampTz statsResetTimestamp) -{ - Datum values[N_CITUS_STAT_COUNTERS + 1] = { 0 }; - bool isNulls[N_CITUS_STAT_COUNTERS + 1] = { 0 }; - - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - values[statIdx] = UInt64GetDatum(pg_atomic_read_u64(&counters[statIdx])); - } - - if (statsResetTimestamp == 0) - { - isNulls[N_CITUS_STAT_COUNTERS] = true; - } - else - { - values[N_CITUS_STAT_COUNTERS] = TimestampTzGetDatum(statsResetTimestamp); - } - - tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); -} - - -/* - * ResetStatCountersForDbId resets the stat counters for the specified database or - * for all databases if InvalidOid is provided. - */ -static void -ResetStatCountersForDbId(Oid dbId) -{ - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - if (OidIsValid(dbId)) - { - StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash, - (void *) &dbId, - HASH_FIND, NULL); - - /* skip if we don't have an entry for this database */ - if (dbEntry) - { - ResetStatCounterArray(dbEntry->counters); - dbEntry->statsResetTimestamp = GetCurrentTimestamp(); - } - } - else - { - HASH_SEQ_STATUS hashSeqState = { 0 }; - hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); - - StatCountersHashEntry *dbEntry = NULL; - while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) - { - ResetStatCounterArray(dbEntry->counters); - dbEntry->statsResetTimestamp = GetCurrentTimestamp(); - } - } - - LWLockRelease(CitusStatCountersSharedState->lock); -} - - -/* - * ResetStatCounterArray resets the stat counters stored in the given - * counter array. - * - * Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS. - */ -static void -ResetStatCounterArray(pg_atomic_uint64 *counters) -{ - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - pg_atomic_write_u64(&counters[statIdx], 0); - } -} - - -/* - * ShouldByPassLocalCounters returns true if we should immediately increment - * the shared memory counters without buffering them in the local counters. - */ -static bool -ShouldByPassLocalCounters(void) -{ - Assert(IsCitusStatCountersEnabled()); - return StatCountersFlushInterval == 0; -} - - -/* - * IncrementSharedStatCounterForMyDb increments the stat counter for the given statId - * of the current database in the shared memory. - */ -static void -IncrementSharedStatCounterForMyDb(int statId) -{ - Oid dbId = MyDatabaseId; - - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - /* - * XXX: Can we cache the entry for the current database? - * - * From one perspective, doing so should be fine since dropping a - * database succeeds only after all the backends connected to it - * are disconnected, so we cannot have a backend that has a dangling - * entry. - * - * On the other hand, we're not sure if a potential hash table resize - * would invalidate the cached entry. - */ - StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( - CitusStatCountersSharedHash, - (void *) &dbId, - HASH_FIND, - NULL); - if (!dbEntry) - { - /* promote the lock to exclusive to insert the new entry for this database */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); - - dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); - - /* downgrade the lock to shared */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - } - - pg_atomic_fetch_add_u64(&dbEntry->counters[statId], 1); - - LWLockRelease(CitusStatCountersSharedState->lock); -} - - -/* - * FlushPendingCountersIfNeeded flushes PendingStatCounters to the shared - * memory if StatCountersFlushInterval has elapsed since the last flush - * or if force is true. - */ -static void -FlushPendingCountersIfNeeded(bool force) -{ - static instr_time LastFlushTime = { 0 }; - - if (!force) - { - if (INSTR_TIME_IS_ZERO(LastFlushTime)) - { - /* assume that the first increment is the start of the flush interval */ - INSTR_TIME_SET_CURRENT(LastFlushTime); - return; - } - - instr_time now = { 0 }; - INSTR_TIME_SET_CURRENT(now); - INSTR_TIME_SUBTRACT(now, LastFlushTime); - if (INSTR_TIME_GET_MILLISEC(now) < StatCountersFlushInterval) - { - return; - } - } - - Oid dbId = MyDatabaseId; - - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - /* XXX: Same here, can we cache the entry for the current database? */ - StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( - CitusStatCountersSharedHash, - (void *) &dbId, - HASH_FIND, - NULL); - if (!dbEntry) - { - /* promote the lock to exclusive to insert the new entry for this database */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); - - dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); - - /* downgrade the lock to shared */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - } - - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - if (PendingStatCounters[statIdx] == 0) - { - /* nothing to flush for this stat, avoid unnecessary lock contention */ - continue; - } - - pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], - PendingStatCounters[statIdx]); - - PendingStatCounters[statIdx] = 0; - } - - LWLockRelease(CitusStatCountersSharedState->lock); - - INSTR_TIME_SET_CURRENT(LastFlushTime); -} - - -/* - * CitusStatCountersShmemInit initializes the shared memory used - * for keeping track of stat counters and restores the stat counters from - * the dump file. - */ -static void -CitusStatCountersShmemInit(void) -{ - if (prev_shmem_startup_hook != NULL) - { - prev_shmem_startup_hook(); - } - - Assert(IsCitusStatCountersEnabled()); - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - - bool alreadyInitialized = false; - CitusStatCountersSharedState = ShmemInitStruct(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, - sizeof(StatCountersState), - &alreadyInitialized); - - - if (!alreadyInitialized) - { - CitusStatCountersSharedState->lock = &(GetNamedLWLockTranche( - STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME)) - ->lock; - } - - HASHCTL hashInfo = { - .keysize = sizeof(Oid), - .entrysize = sizeof(StatCountersHashEntry), - .hash = uint32_hash, - }; - - CitusStatCountersSharedHash = ShmemInitHash("Citus stat counters Hash", - STAT_COUNTERS_INIT_DATABASES, - STAT_COUNTERS_MAX_DATABASES, &hashInfo, - HASH_ELEM | HASH_FUNCTION); - - LWLockRelease(AddinShmemInitLock); - - if (!IsUnderPostmaster) - { - /* postmaster dumps the stat counters on shutdown */ - on_shmem_exit(CitusStatCountersShmemShutdown, (Datum) 0); - } - - if (alreadyInitialized) - { - /* - * alreadyInitialized being true doesn't necessarily mean that the - * backend that initialized the shared memory has completed restoring - * the stat counters from the dump file. - * - * However, it's still fine to return here as commented below. - */ - return; - } - - /* - * We know that only one backend can be here at this point, so we're - * the only one that restores the stat counters from the dump file. - * - * However, we don't block other backends from accessing the shared - * memory while we're restoring the stat counters - need to be careful. - */ - FILE *file = AllocateFile(CITUS_STAT_COUNTERS_DUMP_FILE, PG_BINARY_R); - if (file == NULL) - { - if (errno == ENOENT) - { - /* ignore not-found error */ - return; - } - goto error; - } - - uint32 fileVersion = 0; - if (fread(&fileVersion, sizeof(uint32), 1, file) != 1) - { - goto error; - } - - /* check if version is correct, atm we only have one */ - if (fileVersion != CITUS_STAT_COUNTERS_FILE_VERSION) - { - ereport(LOG, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("citus_stat_counters file version mismatch: expected %u, got %u", - CITUS_STAT_COUNTERS_FILE_VERSION, fileVersion))); - - goto error; - } - - /* get number of entries */ - int32 numEntries = 0; - if (fread(&numEntries, sizeof(int32), 1, file) != 1) - { - goto error; - } - - for (int i = 0; i < numEntries; i++) - { - Oid dbId = InvalidOid; - if (fread(&dbId, sizeof(Oid), 1, file) != 1) - { - goto error; - } - - uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 }; - if (fread(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) != - N_CITUS_STAT_COUNTERS) - { - goto error; - } - - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search( - CitusStatCountersSharedHash, - (void *) &dbId, - HASH_FIND, - NULL); - if (!dbEntry) - { - /* promote the lock to exclusive to insert the new entry for this database */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE); - - dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId); - - /* downgrade the lock to shared */ - LWLockRelease(CitusStatCountersSharedState->lock); - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - } - - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - /* - * If no other backend has created the entry for us, then we do that - * via the above call made to CitusStatCountersHashEntryAllocIfNotExists() - * and there we init the counters if we just created the entry. - * - * So here we always "fetch_add" instead of "write" or "init" to avoid - * overwriting the counters-increments made by other backends so far, if - * it's not us who created the entry. - */ - pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], statCounters[statIdx]); - } - - LWLockRelease(CitusStatCountersSharedState->lock); - } - - FreeFile(file); - - /* - * Remove the file so it's not included in backups/replicas, etc. A new file will be - * written on next shutdown. - */ - unlink(CITUS_STAT_COUNTERS_DUMP_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read citus_stat_counters file \"%s\": %m", - CITUS_STAT_COUNTERS_DUMP_FILE))); - - if (file) - { - FreeFile(file); - } - - /* delete bogus file, don't care of errors in this case */ - unlink(CITUS_STAT_COUNTERS_DUMP_FILE); -} - - -/* - * CitusStatCountersShmemShutdown is called on shutdown by postmaster to dump the - * stat counters to CITUS_STAT_COUNTERS_DUMP_FILE. - */ -static void -CitusStatCountersShmemShutdown(int code, Datum arg) -{ - /* don't try to dump during a crash */ - if (code) - { - return; - } - - Assert(IsCitusStatCountersEnabled()); - - LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED); - - FILE *file = AllocateFile(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, PG_BINARY_W); - if (file == NULL) - { - goto error; - } - - if (fwrite(&CITUS_STAT_COUNTERS_FILE_VERSION, sizeof(uint32), 1, file) != 1) - { - goto error; - } - - int32 numEntries = hash_get_num_entries(CitusStatCountersSharedHash); - if (fwrite(&numEntries, sizeof(int32), 1, file) != 1) - { - goto error; - } - - StatCountersHashEntry *dbEntry = NULL; - - HASH_SEQ_STATUS hashSeqState = { 0 }; - hash_seq_init(&hashSeqState, CitusStatCountersSharedHash); - while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL) - { - if (fwrite(&dbEntry->dbId, sizeof(Oid), 1, file) != 1) - { - /* we assume hash_seq_term won't change errno */ - hash_seq_term(&hashSeqState); - goto error; - } - - uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 }; - - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - statCounters[statIdx] = pg_atomic_read_u64(&dbEntry->counters[statIdx]); - } - - if (fwrite(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) != - N_CITUS_STAT_COUNTERS) - { - /* we assume hash_seq_term won't change errno */ - hash_seq_term(&hashSeqState); - goto error; - } - } - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* rename the file inplace */ - if (rename(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, CITUS_STAT_COUNTERS_DUMP_FILE) != 0) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not rename citus_stat_counters file \"%s\" to \"%s\": %m", - CITUS_STAT_COUNTERS_TMP_DUMP_FILE, - CITUS_STAT_COUNTERS_DUMP_FILE))); - } - - LWLockRelease(CitusStatCountersSharedState->lock); - - return; - -error: - - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write citus_stat_counters file \"%s\": %m", - CITUS_STAT_COUNTERS_TMP_DUMP_FILE))); - - if (file) - { - FreeFile(file); - } - - LWLockRelease(CitusStatCountersSharedState->lock); - - unlink(CITUS_STAT_COUNTERS_DUMP_FILE); -} - - -/* - * CitusStatCountersHashEntryAllocIfNotExists allocates a new entry in the - * stat counters hash table if it doesn't already exist. - * - * Assumes that the caller has exclusive access to the hash table. - */ -static StatCountersHashEntry * -CitusStatCountersHashEntryAllocIfNotExists(Oid dbId) -{ - bool councurrentlyInserted = false; - StatCountersHashEntry *dbEntry = - (StatCountersHashEntry *) hash_search(CitusStatCountersSharedHash, (void *) &dbId, - HASH_ENTER, - &councurrentlyInserted); - - /* - * Initialize the counters to 0 if someone else didn't race while we - * were promoting the lock. - */ - if (!councurrentlyInserted) - { - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) - { - pg_atomic_init_u64(&dbEntry->counters[statIdx], 0); - } - - dbEntry->statsResetTimestamp = 0; - } - - return dbEntry; -} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 088fa9bdb..9cef13539 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -57,7 +57,6 @@ #include "distributed/query_stats.h" #include "distributed/resource_lock.h" #include "distributed/shard_cleaner.h" -#include "distributed/stat_counters.h" #include "distributed/statistics_collection.h" #include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" @@ -490,7 +489,6 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz lastRecoveryTime = 0; TimestampTz lastShardCleanTime = 0; TimestampTz lastStatStatementsPurgeTime = 0; - TimestampTz lastStatCountersPurgeTime = 0; TimestampTz nextMetadataSyncTime = 0; /* state kept for the background tasks queue monitor */ @@ -821,36 +819,6 @@ CitusMaintenanceDaemonMain(Datum main_arg) timeout = Min(timeout, (StatStatementsPurgeInterval * 1000)); } - if (IsCitusStatCountersEnabled() && - TimestampDifferenceExceeds(lastStatCountersPurgeTime, GetCurrentTimestamp(), - StatCountersPurgeInterval)) - { - StartTransactionCommand(); - - if (!LockCitusExtension()) - { - ereport(DEBUG1, (errmsg("could not lock the citus extension, " - "skipping to purge the stat counter " - "shared memory entries for dropped " - "databases"))); - } - else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) - { - /* - * Record last time we perform the purge to ensure we run once per - * StatCountersPurgeInterval. - */ - lastStatCountersPurgeTime = GetCurrentTimestamp(); - - CitusStatCountersRemoveDroppedDatabases(); - } - - CommitTransactionCommand(); - - /* make sure we don't wait too long */ - timeout = Min(timeout, (StatCountersPurgeInterval)); - } - pid_t backgroundTaskQueueWorkerPid = 0; BgwHandleStatus backgroundTaskQueueWorkerStatus = backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid( diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index c742eb4e9..d93e4483a 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); -extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection); +extern void MarkConnectionConnected(MultiConnection *connection); /* waiteventset utilities */ extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, diff --git a/src/include/distributed/stat_counters.h b/src/include/distributed/stat_counters.h deleted file mode 100644 index 4dc13a091..000000000 --- a/src/include/distributed/stat_counters.h +++ /dev/null @@ -1,53 +0,0 @@ -/*------------------------------------------------------------------------- - * - * stat_counters.h - * - * This file contains the exported functions to track various statistic - * counters for Citus. - * - * ------------------------------------------------------------------------- - */ - -#ifndef STAT_COUNTERS_H -#define STAT_COUNTERS_H - -#include - -#include "distributed/time_constants.h" - -#define STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME "citus_stat_counters_lock_tranche" - -#define DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL (30 * MS_PER_SECOND) -#define DISABLE_STAT_COUNTERS_FLUSH_INTERVAL -1 - -#define DEFAULT_STAT_COUNTERS_PURGE_INTERVAL (5 * MS_PER_MINUTE) - -/* - * Must be in the same order as the columns defined in citus_stat_counters view, - * see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql - */ -typedef enum -{ - STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED, - STAT_CONNECTION_ESTABLISHMENT_FAILED, - STAT_CONNECTION_REUSED, - - STAT_QUERY_EXECUTION_SINGLE_SHARD, - STAT_QUERY_EXECUTION_MULTI_SHARD, - - /* do not use this and ensure it is the last entry */ - N_CITUS_STAT_COUNTERS -} StatType; - -extern int StatCountersFlushInterval; -extern int StatCountersPurgeInterval; - -extern bool IsCitusStatCountersEnabled(void); -extern void IncrementStatCounterForMyDb(int statId); - -extern void InitializeStatCountersArrayMem(void); -extern void CitusStatCountersFlushAtExit(int code, Datum arg); -extern Size StatCountersArrayShmemSize(void); -extern void CitusStatCountersRemoveDroppedDatabases(void); - -#endif /* STAT_COUNTERS_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 88104a46a..51b2be416 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1480,11 +1480,8 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal.update_placement_metadata(bigint,integer,integer) void | function citus_internal.update_relation_colocation(oid,integer) void | function citus_is_primary_node() boolean - | function citus_stat_counters(oid) SETOF record - | function citus_stat_counters_reset(oid) void | function citus_unmark_object_distributed(oid,oid,integer,boolean) void - | view citus_stat_counters -(30 rows) +(27 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 70c71093d..048e86c67 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -179,8 +179,6 @@ ORDER BY 1; function citus_shards_on_worker() function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) function citus_stat_activity() - function citus_stat_counters(oid) - function citus_stat_counters_reset(oid) function citus_stat_statements() function citus_stat_statements_reset() function citus_stat_tenants(boolean) @@ -386,12 +384,11 @@ ORDER BY 1; view citus_shards view citus_shards_on_worker view citus_stat_activity - view citus_stat_counters view citus_stat_statements view citus_stat_tenants view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(361 rows) +(358 rows) DROP TABLE extension_basic_types;