mirror of https://github.com/citusdata/citus.git
Revert "v1"
This reverts commit 30d2c8e0ed7c28e0647b9688c61e3751457ae09c.pull/7917/head
parent
4efd6c6189
commit
a5ba21d223
|
@ -39,7 +39,6 @@
|
|||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
#include "distributed/time_constants.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_log_messages.h"
|
||||
|
@ -355,18 +354,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
||||
if (connection)
|
||||
{
|
||||
/*
|
||||
* Increment the connection stat counter for the connections that are
|
||||
* reused only if the connection is in a good state. Here we don't
|
||||
* bother shutting down the connection or such if it is not in a good
|
||||
* state but we mostly want to avoid incrementing the connection stat
|
||||
* counter for a connection that the caller cannot really use.
|
||||
*/
|
||||
if (PQstatus(connection->pgConn) == CONNECTION_OK)
|
||||
{
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
@ -408,12 +395,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
dlist_delete(&connection->connectionNode);
|
||||
pfree(connection);
|
||||
|
||||
/*
|
||||
* Here we don't increment the connection stat counter for the optional
|
||||
* connections that we gave up establishing due to connection throttling
|
||||
* because the callers who request optional connections know how to
|
||||
* survive without them.
|
||||
*/
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -1045,11 +1026,6 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
|||
|
||||
if (event->events & WL_POSTMASTER_DEATH)
|
||||
{
|
||||
/*
|
||||
* Here we don't increment the connection stat counter for the
|
||||
* optional failed connections because this is not a connection
|
||||
* failure, but a postmaster death in the local node.
|
||||
*/
|
||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||
}
|
||||
|
||||
|
@ -1066,12 +1042,6 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
|||
* reset the memory context
|
||||
*/
|
||||
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
|
||||
|
||||
/*
|
||||
* Similarly, we don't increment the connection stat counter for the
|
||||
* failed connections here because this is not a connection failure
|
||||
* but a cancellation request is received.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1102,7 +1072,6 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
|||
eventMask, NULL);
|
||||
if (!success)
|
||||
{
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("connection establishment for node %s:%d "
|
||||
"failed", connection->hostname,
|
||||
|
@ -1119,15 +1088,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
|||
*/
|
||||
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
|
||||
{
|
||||
/*
|
||||
* Since WaitEventSetFromMultiConnectionStates() only adds the
|
||||
* connections that we haven't completed the connection
|
||||
* establishment yet, here we always have a new connection.
|
||||
* In other words, at this point, we surely know that we're
|
||||
* not dealing with a cached connection.
|
||||
*/
|
||||
bool newConnection = true;
|
||||
MarkConnectionConnected(connectionState->connection, newConnection);
|
||||
MarkConnectionConnected(connectionState->connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1211,8 +1172,6 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
|
|||
|
||||
/* close connection, otherwise we take up resource on the other side */
|
||||
CitusPQFinish(connection);
|
||||
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1625,7 +1584,7 @@ RemoteTransactionIdle(MultiConnection *connection)
|
|||
* establishment time when necessary.
|
||||
*/
|
||||
void
|
||||
MarkConnectionConnected(MultiConnection *connection, bool newConnection)
|
||||
MarkConnectionConnected(MultiConnection *connection)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
||||
|
||||
|
@ -1633,11 +1592,6 @@ MarkConnectionConnected(MultiConnection *connection, bool newConnection)
|
|||
{
|
||||
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
|
||||
}
|
||||
|
||||
if (newConnection)
|
||||
{
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -171,7 +171,6 @@
|
|||
#include "distributed/repartition_join_execution.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
#include "distributed/subplan_execution.h"
|
||||
#include "distributed/transaction_identifier.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
|
@ -691,7 +690,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution,
|
|||
WorkerSession *session);
|
||||
static void ConnectionStateMachine(WorkerSession *session);
|
||||
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
||||
static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection);
|
||||
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
||||
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
||||
|
@ -2036,7 +2035,6 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
|||
else
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
}
|
||||
|
||||
|
||||
|
@ -3013,7 +3011,6 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
* connection, clear any state associated with it.
|
||||
*/
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -3022,12 +3019,7 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
ConnStatusType status = PQstatus(connection->pgConn);
|
||||
if (status == CONNECTION_OK)
|
||||
{
|
||||
/*
|
||||
* Connection was already established, possibly a cached
|
||||
* connection.
|
||||
*/
|
||||
bool newConnection = false;
|
||||
HandleMultiConnectionSuccess(session, newConnection);
|
||||
HandleMultiConnectionSuccess(session);
|
||||
UpdateConnectionWaitFlags(session,
|
||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||
break;
|
||||
|
@ -3035,7 +3027,6 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
else if (status == CONNECTION_BAD)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -3051,7 +3042,6 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
if (pollMode == PGRES_POLLING_FAILED)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||
}
|
||||
else if (pollMode == PGRES_POLLING_READING)
|
||||
{
|
||||
|
@ -3069,12 +3059,7 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Connection was not established befoore (!= CONNECTION_OK)
|
||||
* but PQconnectPoll() did so now.
|
||||
*/
|
||||
bool newConnection = true;
|
||||
HandleMultiConnectionSuccess(session, newConnection);
|
||||
HandleMultiConnectionSuccess(session);
|
||||
UpdateConnectionWaitFlags(session,
|
||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||
|
||||
|
@ -3152,11 +3137,6 @@ ConnectionStateMachine(WorkerSession *session)
|
|||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Here we don't increment the connection stat counter for failed
|
||||
* connections because we don't track the connections that we could
|
||||
* establish but lost later.
|
||||
*/
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
break;
|
||||
}
|
||||
|
@ -3319,12 +3299,12 @@ HasUnfinishedTaskForSession(WorkerSession *session)
|
|||
* connection's state.
|
||||
*/
|
||||
static void
|
||||
HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection)
|
||||
HandleMultiConnectionSuccess(WorkerSession *session)
|
||||
{
|
||||
MultiConnection *connection = session->connection;
|
||||
WorkerPool *workerPool = session->workerPool;
|
||||
|
||||
MarkConnectionConnected(connection, newConnection);
|
||||
MarkConnectionConnected(connection);
|
||||
|
||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
||||
"session %ld in %ld microseconds",
|
||||
|
|
|
@ -66,7 +66,6 @@
|
|||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
#include "distributed/utils/citus_stat_tenants.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
|
@ -1434,8 +1433,6 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
|||
|
||||
if (IsMultiTaskPlan(distributedPlan))
|
||||
{
|
||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||
|
||||
/* if it is not a single task executable plan, inform user according to the log level */
|
||||
if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||
{
|
||||
|
@ -1447,10 +1444,6 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
|||
" queries on the workers.")));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||
}
|
||||
|
||||
distributedPlan->queryId = localPlan->queryId;
|
||||
|
||||
|
|
|
@ -105,7 +105,6 @@
|
|||
#include "distributed/shardsplit_shared_memory.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
#include "distributed/statistics_collection.h"
|
||||
#include "distributed/subplan_execution.h"
|
||||
#include "distributed/time_constants.h"
|
||||
|
@ -189,7 +188,6 @@ static void multi_log_hook(ErrorData *edata);
|
|||
static bool IsSequenceOverflowError(ErrorData *edata);
|
||||
static void RegisterConnectionCleanup(void);
|
||||
static void RegisterExternalClientBackendCounterDecrement(void);
|
||||
static void RegisterCitusStatCountersFlush(void);
|
||||
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
||||
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
||||
static void CreateRequiredDirectories(void);
|
||||
|
@ -507,11 +505,6 @@ _PG_init(void)
|
|||
|
||||
InitializeMultiTenantMonitorSMHandleManagement();
|
||||
|
||||
if (IsCitusStatCountersEnabled())
|
||||
{
|
||||
InitializeStatCountersArrayMem();
|
||||
}
|
||||
|
||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||
if (IsBinaryUpgrade)
|
||||
{
|
||||
|
@ -622,12 +615,6 @@ citus_shmem_request(void)
|
|||
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
|
||||
RequestAddinShmemSpace(LogicalClockShmemSize());
|
||||
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
|
||||
|
||||
if (IsCitusStatCountersEnabled())
|
||||
{
|
||||
RequestAddinShmemSpace(StatCountersArrayShmemSize());
|
||||
RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -800,12 +787,6 @@ StartupCitusBackend(void)
|
|||
|
||||
SetBackendDataDatabaseId();
|
||||
RegisterConnectionCleanup();
|
||||
|
||||
if (IsCitusStatCountersEnabled())
|
||||
{
|
||||
RegisterCitusStatCountersFlush();
|
||||
}
|
||||
|
||||
FinishedStartupCitusBackend = true;
|
||||
}
|
||||
|
||||
|
@ -860,23 +841,6 @@ RegisterExternalClientBackendCounterDecrement(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RegisterCitusStatCountersFlush registers CitusStatCountersFlushAtExit()
|
||||
* to be called before the backend exits.
|
||||
*/
|
||||
static void
|
||||
RegisterCitusStatCountersFlush(void)
|
||||
{
|
||||
static bool registeredCleanup = false;
|
||||
if (registeredCleanup == false)
|
||||
{
|
||||
before_shmem_exit(CitusStatCountersFlushAtExit, 0);
|
||||
|
||||
registeredCleanup = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
|
||||
* backend for the purposes of any clean-up needed.
|
||||
|
@ -2465,41 +2429,6 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stat_counters_flush_interval",
|
||||
gettext_noop("Sets the interval to flush the local stat counters into "
|
||||
"the shared memory."),
|
||||
gettext_noop("Stat counters are used to track the number of certain "
|
||||
"operations in Citus. While setting this GUC to -1 disables "
|
||||
"stat counters, setting it to a positive value will flush "
|
||||
"the local stat counters into the shared memory every "
|
||||
"interval milliseconds instead of flushing them immediately "
|
||||
"after the operation, as it is done when the value is 0. "
|
||||
"Higher values reduce the overhead of flushing the stat "
|
||||
"counters but increase the time it takes to see the updated "
|
||||
"stat counters."),
|
||||
&StatCountersFlushInterval,
|
||||
DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL,
|
||||
DISABLE_STAT_COUNTERS_FLUSH_INTERVAL,
|
||||
5 * MS_PER_MINUTE,
|
||||
PGC_POSTMASTER,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.stat_counters_purge_interval",
|
||||
gettext_noop("Determines time interval for citus_stat_counters to remove "
|
||||
"the shared memory entries for the databases that were dropped."),
|
||||
gettext_noop("This is automatically disabled when the stat counters are disabled "
|
||||
"(citus.stat_counters_flush_interval = -1) and there is no explicit "
|
||||
"way to disable purging the shared memory entries while the stat "
|
||||
"counters are enabled."),
|
||||
&StatCountersPurgeInterval,
|
||||
DEFAULT_STAT_COUNTERS_PURGE_INTERVAL, 1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
/*
|
||||
* It takes about 140 bytes of shared memory to store one row, therefore
|
||||
* this setting should be used responsibly. setting it to 10M will require
|
||||
|
|
|
@ -48,5 +48,3 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
|
|||
#include "udfs/repl_origin_helper/13.1-1.sql"
|
||||
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
|
||||
#include "udfs/citus_is_primary_node/13.1-1.sql"
|
||||
#include "udfs/citus_stat_counters/13.1-1.sql"
|
||||
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
|
||||
|
|
|
@ -41,7 +41,3 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking();
|
|||
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
|
||||
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
|
||||
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
||||
|
||||
DROP VIEW pg_catalog.citus_stat_counters;
|
||||
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);
|
||||
|
|
|
@ -1,27 +0,0 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||
database_oid oid,
|
||||
|
||||
-- Following stat counter columns must be in the same order as the
|
||||
-- StatType enum defined in src/include/distributed/stat_counters.h
|
||||
OUT connection_establishment_succeeded bigint,
|
||||
OUT connection_establishment_failed bigint,
|
||||
OUT connection_reused bigint,
|
||||
|
||||
OUT query_execution_single_shard bigint,
|
||||
OUT query_execution_multi_shard bigint,
|
||||
|
||||
-- Must always be the last column or you should accordingly update
|
||||
-- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c
|
||||
OUT stats_reset timestamp with time zone
|
||||
)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C VOLATILE PARALLEL UNSAFE
|
||||
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database';
|
||||
|
||||
CREATE VIEW citus.citus_stat_counters AS
|
||||
SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database;
|
||||
|
||||
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||
|
||||
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
|
@ -1,27 +0,0 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||
database_oid oid,
|
||||
|
||||
-- Following stat counter columns must be in the same order as the
|
||||
-- StatType enum defined in src/include/distributed/stat_counters.h
|
||||
OUT connection_establishment_succeeded bigint,
|
||||
OUT connection_establishment_failed bigint,
|
||||
OUT connection_reused bigint,
|
||||
|
||||
OUT query_execution_single_shard bigint,
|
||||
OUT query_execution_multi_shard bigint,
|
||||
|
||||
-- Must always be the last column or you should accordingly update
|
||||
-- StoreStatCountersFromArray() function in src/backend/distributed/stat_counters.c
|
||||
OUT stats_reset timestamp with time zone
|
||||
)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C VOLATILE PARALLEL UNSAFE
|
||||
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database';
|
||||
|
||||
CREATE VIEW citus.citus_stat_counters AS
|
||||
SELECT oid, datname, (pg_catalog.citus_stat_counters(oid)).* FROM pg_catalog.pg_database;
|
||||
|
||||
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||
|
||||
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
|
@ -1,7 +0,0 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||
RETURNS VOID
|
||||
LANGUAGE C VOLATILE PARALLEL UNSAFE
|
||||
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
|
@ -1,7 +0,0 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||
RETURNS VOID
|
||||
LANGUAGE C VOLATILE PARALLEL UNSAFE
|
||||
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database or all databases if database_oid is not provided or provided as 0';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
|
@ -1,984 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* stat_counters.c
|
||||
*
|
||||
* This file contains functions to track various statistic counters for
|
||||
* Citus.
|
||||
*
|
||||
* Each backend increments the local counters (PendingStatCounters) and
|
||||
* flushes them to the shared memory at the end of the flush interval
|
||||
* (StatCountersFlushInterval). The shared memory is used to keep track
|
||||
* of the stat counters across backends for all databases.
|
||||
*
|
||||
* We don't have a good way to enforce that we flush the pending counters
|
||||
* always at the end of the flush interval because we only check for the
|
||||
* interval at the time of incrementing a counter. And, if a backend exits
|
||||
* before the interval elapses, then we still flush the pending counters at
|
||||
* the time of exit via CitusStatCountersFlushAtExit().
|
||||
*
|
||||
* Also, we store the stat counters in CITUS_STAT_COUNTERS_DUMP_FILE on
|
||||
* shutdown via postmaster and restore them on startup via the first backend
|
||||
* that initializes the shared memory for stat counters.
|
||||
*
|
||||
* And for the dropped databases, Citus maintenance daemon removes the stat
|
||||
* counters from the shared memory periodically, which is controlled by
|
||||
* StatCountersPurgeInterval. When the feature is disabled, removal of the
|
||||
* stat counters is automatically disabled. Instead of inserting a cleanup
|
||||
* record into pg_dist_cleanup, we choose to remove the stat counters via
|
||||
* the maintenance daemon because inserting a cleanup record wouldn't work
|
||||
* when the current database is dropped. And if there were other databases
|
||||
* where Citus is installed, then they would still continue to have the
|
||||
* dropped database's stat counters in the shared memory.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "common/hashfn.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
|
||||
|
||||
/* file path for dumping the stat counters */
|
||||
#define CITUS_STAT_COUNTERS_DUMP_FILE "pg_stat/citus_stat_counters.stat"
|
||||
#define CITUS_STAT_COUNTERS_TMP_DUMP_FILE (CITUS_STAT_COUNTERS_DUMP_FILE ".tmp")
|
||||
|
||||
/*
|
||||
* Shared hash constants.
|
||||
*
|
||||
* The places where STAT_COUNTERS_MAX_DATABASES is used do not impose a hard
|
||||
* limit on the number of databases that can be tracked but in ShmemInitHash()
|
||||
* it's documented that the access efficiency will degrade if it is exceeded
|
||||
* substantially.
|
||||
*
|
||||
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
|
||||
* a concern.
|
||||
*/
|
||||
#define STAT_COUNTERS_INIT_DATABASES 8
|
||||
#define STAT_COUNTERS_MAX_DATABASES 1024
|
||||
|
||||
|
||||
/* shared hash table entry */
|
||||
typedef struct StatCountersHashEntry
|
||||
{
|
||||
/* must be the first field since this is used as the key */
|
||||
Oid dbId;
|
||||
|
||||
pg_atomic_uint64 counters[N_CITUS_STAT_COUNTERS];
|
||||
|
||||
TimestampTz statsResetTimestamp;
|
||||
} StatCountersHashEntry;
|
||||
|
||||
/* shared memory state */
|
||||
typedef struct StatCountersState
|
||||
{
|
||||
LWLockId lock;
|
||||
} StatCountersState;
|
||||
|
||||
|
||||
/* GUC value for citus.stat_counters_flush_interval */
|
||||
int StatCountersFlushInterval = DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL;
|
||||
|
||||
/* GUC value for citus.stat_counters_purge_interval */
|
||||
int StatCountersPurgeInterval = DEFAULT_STAT_COUNTERS_PURGE_INTERVAL;
|
||||
|
||||
/* stat counters dump file version */
|
||||
static const uint32 CITUS_STAT_COUNTERS_FILE_VERSION = 1;
|
||||
|
||||
/* shared memory init */
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
/* shared memory variables */
|
||||
static StatCountersState *CitusStatCountersSharedState = NULL;
|
||||
static HTAB *CitusStatCountersSharedHash = NULL;
|
||||
|
||||
/* local stat counters that are pending to be flushed */
|
||||
static uint64 PendingStatCounters[N_CITUS_STAT_COUNTERS] = { 0 };
|
||||
|
||||
|
||||
/* helper functions for citus_stat_counters() and citus_stat_counters_reset() */
|
||||
static Tuplestorestate * SetupStatCountersTuplestore(FunctionCallInfo fcinfo,
|
||||
TupleDesc *tupleDescriptor);
|
||||
static void StoreStatCountersForDbId(Tuplestorestate *tupleStore,
|
||||
TupleDesc tupleDescriptor,
|
||||
Oid dbId);
|
||||
static void StoreStatCountersFromValues(Tuplestorestate *tupleStore,
|
||||
TupleDesc tupleDescriptor,
|
||||
pg_atomic_uint64 *counters,
|
||||
TimestampTz statsResetTimestamp);
|
||||
static void ResetStatCountersForDbId(Oid dbId);
|
||||
static void ResetStatCounterArray(pg_atomic_uint64 *counters);
|
||||
|
||||
/* helper functions to increment and flush stat counters */
|
||||
static bool ShouldByPassLocalCounters(void);
|
||||
static void IncrementSharedStatCounterForMyDb(int statId);
|
||||
static void FlushPendingCountersIfNeeded(bool force);
|
||||
|
||||
/* shared memory init & management */
|
||||
static void CitusStatCountersShmemInit(void);
|
||||
static void CitusStatCountersShmemShutdown(int code, Datum arg);
|
||||
|
||||
/* shared hash utilities */
|
||||
static StatCountersHashEntry * CitusStatCountersHashEntryAllocIfNotExists(Oid dbId);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_stat_counters);
|
||||
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
|
||||
|
||||
|
||||
/*
|
||||
* citus_stat_counters returns the stat counters for the specified database or for
|
||||
* all databases if InvalidOid is provided.
|
||||
*/
|
||||
Datum
|
||||
citus_stat_counters(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("database oid cannot be NULL")));
|
||||
}
|
||||
|
||||
Oid dbId = PG_GETARG_OID(0);
|
||||
|
||||
if (!IsCitusStatCountersEnabled())
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Citus stat counters are not enabled, consider setting "
|
||||
"citus.stat_counters_flush_interval to a non-negative value "
|
||||
"and restarting the server")));
|
||||
|
||||
}
|
||||
|
||||
if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("shared memory is not initialized for Citus stat counters")));
|
||||
}
|
||||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupStatCountersTuplestore(fcinfo, &tupleDescriptor);
|
||||
|
||||
StoreStatCountersForDbId(tupleStore, tupleDescriptor, dbId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_stat_counters_reset resets the stat counters for the specified database
|
||||
* or for all databases if InvalidOid is provided.
|
||||
*/
|
||||
Datum
|
||||
citus_stat_counters_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("database oid cannot be NULL")));
|
||||
}
|
||||
|
||||
Oid dbId = PG_GETARG_OID(0);
|
||||
|
||||
if (!IsCitusStatCountersEnabled())
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Citus stat counters are not enabled, consider setting "
|
||||
"citus.stat_counters_flush_interval to a non-negative value "
|
||||
"and restarting the server")));
|
||||
|
||||
}
|
||||
|
||||
if (!CitusStatCountersSharedState || !CitusStatCountersSharedHash)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("shared memory is not initialized for Citus stat counters")));
|
||||
}
|
||||
|
||||
ResetStatCountersForDbId(dbId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusStatCountersEnabled returns whether the stat counters are enabled.
|
||||
*/
|
||||
bool
|
||||
IsCitusStatCountersEnabled(void)
|
||||
{
|
||||
Assert(StatCountersFlushInterval >= DISABLE_STAT_COUNTERS_FLUSH_INTERVAL);
|
||||
return StatCountersFlushInterval != DISABLE_STAT_COUNTERS_FLUSH_INTERVAL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IncrementStatCounterForMyDb increments the stat counter for the given statId
|
||||
* of the current database.
|
||||
*/
|
||||
void
|
||||
IncrementStatCounterForMyDb(int statId)
|
||||
{
|
||||
if (!IsCitusStatCountersEnabled())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (ShouldByPassLocalCounters())
|
||||
{
|
||||
IncrementSharedStatCounterForMyDb(statId);
|
||||
}
|
||||
else
|
||||
{
|
||||
PendingStatCounters[statId]++;
|
||||
|
||||
bool force = false;
|
||||
FlushPendingCountersIfNeeded(force);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeStatCountersArrayMem saves the previous shmem_startup_hook and sets
|
||||
* up a new shmem_startup_hook to initialize the shared memory used for
|
||||
* keeping track of stat counters across backends for all databases.
|
||||
*/
|
||||
void
|
||||
InitializeStatCountersArrayMem(void)
|
||||
{
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
|
||||
/* on the versions older than PG 15, we use shmem_request_hook_type */
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
|
||||
if (!IsUnderPostmaster)
|
||||
{
|
||||
RequestAddinShmemSpace(StatCountersArrayShmemSize());
|
||||
|
||||
elog(LOG, "requesting named LWLockTranch for %s",
|
||||
STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME);
|
||||
RequestNamedLWLockTranche(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME, 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = CitusStatCountersShmemInit;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusStatCountersFlushAtExit is called on backend exit to flush the local
|
||||
* stat counters to the shared memory, if there are any pending.
|
||||
*/
|
||||
void
|
||||
CitusStatCountersFlushAtExit(int code, Datum arg)
|
||||
{
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
|
||||
bool force = true;
|
||||
FlushPendingCountersIfNeeded(force);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StatCountersArrayShmemSize calculates and returns shared memory size
|
||||
* required to keep stat counters.
|
||||
*/
|
||||
Size
|
||||
StatCountersArrayShmemSize(void)
|
||||
{
|
||||
Size size = MAXALIGN(sizeof(StatCountersState));
|
||||
size = add_size(size, hash_estimate_size(STAT_COUNTERS_MAX_DATABASES,
|
||||
sizeof(StatCountersHashEntry)));
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusStatCountersRemoveDroppedDatabases removes the stat counters for the
|
||||
* databases that are dropped.
|
||||
*/
|
||||
void
|
||||
CitusStatCountersRemoveDroppedDatabases(void)
|
||||
{
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
|
||||
List *droppedDatabaseIdList = NIL;
|
||||
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
HASH_SEQ_STATUS hashSeqState = { 0 };
|
||||
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
|
||||
|
||||
StatCountersHashEntry *dbEntry = NULL;
|
||||
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
|
||||
{
|
||||
HeapTuple dbTuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbEntry->dbId));
|
||||
if (!HeapTupleIsValid(dbTuple))
|
||||
{
|
||||
droppedDatabaseIdList = lappend_oid(droppedDatabaseIdList, dbEntry->dbId);
|
||||
}
|
||||
else
|
||||
{
|
||||
ReleaseSysCache(dbTuple);
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
|
||||
if (list_length(droppedDatabaseIdList) == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* acquire exclusive lock to quickly remove the entries */
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
|
||||
|
||||
Oid droppedDatabaseId = InvalidOid;
|
||||
foreach_declared_oid(droppedDatabaseId, droppedDatabaseIdList)
|
||||
{
|
||||
hash_search(CitusStatCountersSharedHash, (void *) &droppedDatabaseId,
|
||||
HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupStatCountersTuplestore returns a Tuplestorestate for returning the
|
||||
* stat counters and setups the provided TupleDesc.
|
||||
*/
|
||||
static Tuplestorestate *
|
||||
SetupStatCountersTuplestore(FunctionCallInfo fcinfo, TupleDesc *tupleDescriptor)
|
||||
{
|
||||
ReturnSetInfo *resultSet = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
switch (get_call_result_type(fcinfo, NULL, tupleDescriptor))
|
||||
{
|
||||
case TYPEFUNC_COMPOSITE:
|
||||
{
|
||||
/* success */
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPEFUNC_RECORD:
|
||||
{
|
||||
/* failed to determine actual type of RECORD */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function returning record called in context "
|
||||
"that cannot accept type record")));
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/* result type isn't composite */
|
||||
elog(ERROR, "return type must be a row type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
MemoryContext perQueryContext = resultSet->econtext->ecxt_per_query_memory;
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(perQueryContext);
|
||||
|
||||
bool randomAccess = true;
|
||||
bool interTransactions = false;
|
||||
Tuplestorestate *tupstore = tuplestore_begin_heap(randomAccess, interTransactions,
|
||||
work_mem);
|
||||
|
||||
resultSet->returnMode = SFRM_Materialize;
|
||||
resultSet->setResult = tupstore;
|
||||
resultSet->setDesc = *tupleDescriptor;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return tupstore;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreStatCountersForDbId fetches the stat counters for the specified database
|
||||
* and stores them into the given tuple store.
|
||||
*/
|
||||
static void
|
||||
StoreStatCountersForDbId(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Oid dbId)
|
||||
{
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash,
|
||||
(void *) &dbId,
|
||||
HASH_FIND, NULL);
|
||||
if (dbEntry)
|
||||
{
|
||||
StoreStatCountersFromValues(tupleStore, tupleDescriptor, dbEntry->counters,
|
||||
dbEntry->statsResetTimestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* use a zeroed entry if the database doesn't exist */
|
||||
TimestampTz zeroedTimestamp = 0;
|
||||
|
||||
pg_atomic_uint64 zeroedCounters[N_CITUS_STAT_COUNTERS] = { 0 };
|
||||
ResetStatCounterArray(zeroedCounters);
|
||||
|
||||
StoreStatCountersFromValues(tupleStore, tupleDescriptor, zeroedCounters,
|
||||
zeroedTimestamp);
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreStatCountersFromValues stores the stat counters stored in given
|
||||
* counter array and the timestamp into the given tuple store.
|
||||
*
|
||||
* Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS.
|
||||
*/
|
||||
static void
|
||||
StoreStatCountersFromValues(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor,
|
||||
pg_atomic_uint64 *counters, TimestampTz statsResetTimestamp)
|
||||
{
|
||||
Datum values[N_CITUS_STAT_COUNTERS + 1] = { 0 };
|
||||
bool isNulls[N_CITUS_STAT_COUNTERS + 1] = { 0 };
|
||||
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
values[statIdx] = UInt64GetDatum(pg_atomic_read_u64(&counters[statIdx]));
|
||||
}
|
||||
|
||||
if (statsResetTimestamp == 0)
|
||||
{
|
||||
isNulls[N_CITUS_STAT_COUNTERS] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
values[N_CITUS_STAT_COUNTERS] = TimestampTzGetDatum(statsResetTimestamp);
|
||||
}
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetStatCountersForDbId resets the stat counters for the specified database or
|
||||
* for all databases if InvalidOid is provided.
|
||||
*/
|
||||
static void
|
||||
ResetStatCountersForDbId(Oid dbId)
|
||||
{
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
if (OidIsValid(dbId))
|
||||
{
|
||||
StatCountersHashEntry *dbEntry = hash_search(CitusStatCountersSharedHash,
|
||||
(void *) &dbId,
|
||||
HASH_FIND, NULL);
|
||||
|
||||
/* skip if we don't have an entry for this database */
|
||||
if (dbEntry)
|
||||
{
|
||||
ResetStatCounterArray(dbEntry->counters);
|
||||
dbEntry->statsResetTimestamp = GetCurrentTimestamp();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
HASH_SEQ_STATUS hashSeqState = { 0 };
|
||||
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
|
||||
|
||||
StatCountersHashEntry *dbEntry = NULL;
|
||||
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
|
||||
{
|
||||
ResetStatCounterArray(dbEntry->counters);
|
||||
dbEntry->statsResetTimestamp = GetCurrentTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetStatCounterArray resets the stat counters stored in the given
|
||||
* counter array.
|
||||
*
|
||||
* Given counter array is assumed to be of length N_CITUS_STAT_COUNTERS.
|
||||
*/
|
||||
static void
|
||||
ResetStatCounterArray(pg_atomic_uint64 *counters)
|
||||
{
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
pg_atomic_write_u64(&counters[statIdx], 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldByPassLocalCounters returns true if we should immediately increment
|
||||
* the shared memory counters without buffering them in the local counters.
|
||||
*/
|
||||
static bool
|
||||
ShouldByPassLocalCounters(void)
|
||||
{
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
return StatCountersFlushInterval == 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IncrementSharedStatCounterForMyDb increments the stat counter for the given statId
|
||||
* of the current database in the shared memory.
|
||||
*/
|
||||
static void
|
||||
IncrementSharedStatCounterForMyDb(int statId)
|
||||
{
|
||||
Oid dbId = MyDatabaseId;
|
||||
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
/*
|
||||
* XXX: Can we cache the entry for the current database?
|
||||
*
|
||||
* From one perspective, doing so should be fine since dropping a
|
||||
* database succeeds only after all the backends connected to it
|
||||
* are disconnected, so we cannot have a backend that has a dangling
|
||||
* entry.
|
||||
*
|
||||
* On the other hand, we're not sure if a potential hash table resize
|
||||
* would invalidate the cached entry.
|
||||
*/
|
||||
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
|
||||
CitusStatCountersSharedHash,
|
||||
(void *) &dbId,
|
||||
HASH_FIND,
|
||||
NULL);
|
||||
if (!dbEntry)
|
||||
{
|
||||
/* promote the lock to exclusive to insert the new entry for this database */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
|
||||
|
||||
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
|
||||
|
||||
/* downgrade the lock to shared */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
}
|
||||
|
||||
pg_atomic_fetch_add_u64(&dbEntry->counters[statId], 1);
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FlushPendingCountersIfNeeded flushes PendingStatCounters to the shared
|
||||
* memory if StatCountersFlushInterval has elapsed since the last flush
|
||||
* or if force is true.
|
||||
*/
|
||||
static void
|
||||
FlushPendingCountersIfNeeded(bool force)
|
||||
{
|
||||
static instr_time LastFlushTime = { 0 };
|
||||
|
||||
if (!force)
|
||||
{
|
||||
if (INSTR_TIME_IS_ZERO(LastFlushTime))
|
||||
{
|
||||
/* assume that the first increment is the start of the flush interval */
|
||||
INSTR_TIME_SET_CURRENT(LastFlushTime);
|
||||
return;
|
||||
}
|
||||
|
||||
instr_time now = { 0 };
|
||||
INSTR_TIME_SET_CURRENT(now);
|
||||
INSTR_TIME_SUBTRACT(now, LastFlushTime);
|
||||
if (INSTR_TIME_GET_MILLISEC(now) < StatCountersFlushInterval)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Oid dbId = MyDatabaseId;
|
||||
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
/* XXX: Same here, can we cache the entry for the current database? */
|
||||
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
|
||||
CitusStatCountersSharedHash,
|
||||
(void *) &dbId,
|
||||
HASH_FIND,
|
||||
NULL);
|
||||
if (!dbEntry)
|
||||
{
|
||||
/* promote the lock to exclusive to insert the new entry for this database */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
|
||||
|
||||
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
|
||||
|
||||
/* downgrade the lock to shared */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
}
|
||||
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
if (PendingStatCounters[statIdx] == 0)
|
||||
{
|
||||
/* nothing to flush for this stat, avoid unnecessary lock contention */
|
||||
continue;
|
||||
}
|
||||
|
||||
pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx],
|
||||
PendingStatCounters[statIdx]);
|
||||
|
||||
PendingStatCounters[statIdx] = 0;
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(LastFlushTime);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusStatCountersShmemInit initializes the shared memory used
|
||||
* for keeping track of stat counters and restores the stat counters from
|
||||
* the dump file.
|
||||
*/
|
||||
static void
|
||||
CitusStatCountersShmemInit(void)
|
||||
{
|
||||
if (prev_shmem_startup_hook != NULL)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
bool alreadyInitialized = false;
|
||||
CitusStatCountersSharedState = ShmemInitStruct(STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME,
|
||||
sizeof(StatCountersState),
|
||||
&alreadyInitialized);
|
||||
|
||||
|
||||
if (!alreadyInitialized)
|
||||
{
|
||||
CitusStatCountersSharedState->lock = &(GetNamedLWLockTranche(
|
||||
STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME))
|
||||
->lock;
|
||||
}
|
||||
|
||||
HASHCTL hashInfo = {
|
||||
.keysize = sizeof(Oid),
|
||||
.entrysize = sizeof(StatCountersHashEntry),
|
||||
.hash = uint32_hash,
|
||||
};
|
||||
|
||||
CitusStatCountersSharedHash = ShmemInitHash("Citus stat counters Hash",
|
||||
STAT_COUNTERS_INIT_DATABASES,
|
||||
STAT_COUNTERS_MAX_DATABASES, &hashInfo,
|
||||
HASH_ELEM | HASH_FUNCTION);
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
if (!IsUnderPostmaster)
|
||||
{
|
||||
/* postmaster dumps the stat counters on shutdown */
|
||||
on_shmem_exit(CitusStatCountersShmemShutdown, (Datum) 0);
|
||||
}
|
||||
|
||||
if (alreadyInitialized)
|
||||
{
|
||||
/*
|
||||
* alreadyInitialized being true doesn't necessarily mean that the
|
||||
* backend that initialized the shared memory has completed restoring
|
||||
* the stat counters from the dump file.
|
||||
*
|
||||
* However, it's still fine to return here as commented below.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* We know that only one backend can be here at this point, so we're
|
||||
* the only one that restores the stat counters from the dump file.
|
||||
*
|
||||
* However, we don't block other backends from accessing the shared
|
||||
* memory while we're restoring the stat counters - need to be careful.
|
||||
*/
|
||||
FILE *file = AllocateFile(CITUS_STAT_COUNTERS_DUMP_FILE, PG_BINARY_R);
|
||||
if (file == NULL)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
{
|
||||
/* ignore not-found error */
|
||||
return;
|
||||
}
|
||||
goto error;
|
||||
}
|
||||
|
||||
uint32 fileVersion = 0;
|
||||
if (fread(&fileVersion, sizeof(uint32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* check if version is correct, atm we only have one */
|
||||
if (fileVersion != CITUS_STAT_COUNTERS_FILE_VERSION)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("citus_stat_counters file version mismatch: expected %u, got %u",
|
||||
CITUS_STAT_COUNTERS_FILE_VERSION, fileVersion)));
|
||||
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* get number of entries */
|
||||
int32 numEntries = 0;
|
||||
if (fread(&numEntries, sizeof(int32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numEntries; i++)
|
||||
{
|
||||
Oid dbId = InvalidOid;
|
||||
if (fread(&dbId, sizeof(Oid), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 };
|
||||
if (fread(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) !=
|
||||
N_CITUS_STAT_COUNTERS)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
StatCountersHashEntry *dbEntry = (StatCountersHashEntry *) hash_search(
|
||||
CitusStatCountersSharedHash,
|
||||
(void *) &dbId,
|
||||
HASH_FIND,
|
||||
NULL);
|
||||
if (!dbEntry)
|
||||
{
|
||||
/* promote the lock to exclusive to insert the new entry for this database */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_EXCLUSIVE);
|
||||
|
||||
dbEntry = CitusStatCountersHashEntryAllocIfNotExists(dbId);
|
||||
|
||||
/* downgrade the lock to shared */
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
}
|
||||
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
/*
|
||||
* If no other backend has created the entry for us, then we do that
|
||||
* via the above call made to CitusStatCountersHashEntryAllocIfNotExists()
|
||||
* and there we init the counters if we just created the entry.
|
||||
*
|
||||
* So here we always "fetch_add" instead of "write" or "init" to avoid
|
||||
* overwriting the counters-increments made by other backends so far, if
|
||||
* it's not us who created the entry.
|
||||
*/
|
||||
pg_atomic_fetch_add_u64(&dbEntry->counters[statIdx], statCounters[statIdx]);
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
}
|
||||
|
||||
FreeFile(file);
|
||||
|
||||
/*
|
||||
* Remove the file so it's not included in backups/replicas, etc. A new file will be
|
||||
* written on next shutdown.
|
||||
*/
|
||||
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read citus_stat_counters file \"%s\": %m",
|
||||
CITUS_STAT_COUNTERS_DUMP_FILE)));
|
||||
|
||||
if (file)
|
||||
{
|
||||
FreeFile(file);
|
||||
}
|
||||
|
||||
/* delete bogus file, don't care of errors in this case */
|
||||
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusStatCountersShmemShutdown is called on shutdown by postmaster to dump the
|
||||
* stat counters to CITUS_STAT_COUNTERS_DUMP_FILE.
|
||||
*/
|
||||
static void
|
||||
CitusStatCountersShmemShutdown(int code, Datum arg)
|
||||
{
|
||||
/* don't try to dump during a crash */
|
||||
if (code)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Assert(IsCitusStatCountersEnabled());
|
||||
|
||||
LWLockAcquire(CitusStatCountersSharedState->lock, LW_SHARED);
|
||||
|
||||
FILE *file = AllocateFile(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, PG_BINARY_W);
|
||||
if (file == NULL)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (fwrite(&CITUS_STAT_COUNTERS_FILE_VERSION, sizeof(uint32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
int32 numEntries = hash_get_num_entries(CitusStatCountersSharedHash);
|
||||
if (fwrite(&numEntries, sizeof(int32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
StatCountersHashEntry *dbEntry = NULL;
|
||||
|
||||
HASH_SEQ_STATUS hashSeqState = { 0 };
|
||||
hash_seq_init(&hashSeqState, CitusStatCountersSharedHash);
|
||||
while ((dbEntry = hash_seq_search(&hashSeqState)) != NULL)
|
||||
{
|
||||
if (fwrite(&dbEntry->dbId, sizeof(Oid), 1, file) != 1)
|
||||
{
|
||||
/* we assume hash_seq_term won't change errno */
|
||||
hash_seq_term(&hashSeqState);
|
||||
goto error;
|
||||
}
|
||||
|
||||
uint64 statCounters[N_CITUS_STAT_COUNTERS] = { 0 };
|
||||
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
statCounters[statIdx] = pg_atomic_read_u64(&dbEntry->counters[statIdx]);
|
||||
}
|
||||
|
||||
if (fwrite(&statCounters, sizeof(uint64), N_CITUS_STAT_COUNTERS, file) !=
|
||||
N_CITUS_STAT_COUNTERS)
|
||||
{
|
||||
/* we assume hash_seq_term won't change errno */
|
||||
hash_seq_term(&hashSeqState);
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
if (FreeFile(file))
|
||||
{
|
||||
file = NULL;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* rename the file inplace */
|
||||
if (rename(CITUS_STAT_COUNTERS_TMP_DUMP_FILE, CITUS_STAT_COUNTERS_DUMP_FILE) != 0)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not rename citus_stat_counters file \"%s\" to \"%s\": %m",
|
||||
CITUS_STAT_COUNTERS_TMP_DUMP_FILE,
|
||||
CITUS_STAT_COUNTERS_DUMP_FILE)));
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write citus_stat_counters file \"%s\": %m",
|
||||
CITUS_STAT_COUNTERS_TMP_DUMP_FILE)));
|
||||
|
||||
if (file)
|
||||
{
|
||||
FreeFile(file);
|
||||
}
|
||||
|
||||
LWLockRelease(CitusStatCountersSharedState->lock);
|
||||
|
||||
unlink(CITUS_STAT_COUNTERS_DUMP_FILE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusStatCountersHashEntryAllocIfNotExists allocates a new entry in the
|
||||
* stat counters hash table if it doesn't already exist.
|
||||
*
|
||||
* Assumes that the caller has exclusive access to the hash table.
|
||||
*/
|
||||
static StatCountersHashEntry *
|
||||
CitusStatCountersHashEntryAllocIfNotExists(Oid dbId)
|
||||
{
|
||||
bool councurrentlyInserted = false;
|
||||
StatCountersHashEntry *dbEntry =
|
||||
(StatCountersHashEntry *) hash_search(CitusStatCountersSharedHash, (void *) &dbId,
|
||||
HASH_ENTER,
|
||||
&councurrentlyInserted);
|
||||
|
||||
/*
|
||||
* Initialize the counters to 0 if someone else didn't race while we
|
||||
* were promoting the lock.
|
||||
*/
|
||||
if (!councurrentlyInserted)
|
||||
{
|
||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||
{
|
||||
pg_atomic_init_u64(&dbEntry->counters[statIdx], 0);
|
||||
}
|
||||
|
||||
dbEntry->statsResetTimestamp = 0;
|
||||
}
|
||||
|
||||
return dbEntry;
|
||||
}
|
|
@ -57,7 +57,6 @@
|
|||
#include "distributed/query_stats.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
#include "distributed/stat_counters.h"
|
||||
#include "distributed/statistics_collection.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
@ -490,7 +489,6 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz lastShardCleanTime = 0;
|
||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||
TimestampTz lastStatCountersPurgeTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
/* state kept for the background tasks queue monitor */
|
||||
|
@ -821,36 +819,6 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
timeout = Min(timeout, (StatStatementsPurgeInterval * 1000));
|
||||
}
|
||||
|
||||
if (IsCitusStatCountersEnabled() &&
|
||||
TimestampDifferenceExceeds(lastStatCountersPurgeTime, GetCurrentTimestamp(),
|
||||
StatCountersPurgeInterval))
|
||||
{
|
||||
StartTransactionCommand();
|
||||
|
||||
if (!LockCitusExtension())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||
"skipping to purge the stat counter "
|
||||
"shared memory entries for dropped "
|
||||
"databases")));
|
||||
}
|
||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||
{
|
||||
/*
|
||||
* Record last time we perform the purge to ensure we run once per
|
||||
* StatCountersPurgeInterval.
|
||||
*/
|
||||
lastStatCountersPurgeTime = GetCurrentTimestamp();
|
||||
|
||||
CitusStatCountersRemoveDroppedDatabases();
|
||||
}
|
||||
|
||||
CommitTransactionCommand();
|
||||
|
||||
/* make sure we don't wait too long */
|
||||
timeout = Min(timeout, (StatCountersPurgeInterval));
|
||||
}
|
||||
|
||||
pid_t backgroundTaskQueueWorkerPid = 0;
|
||||
BgwHandleStatus backgroundTaskQueueWorkerStatus =
|
||||
backgroundTasksQueueBgwHandle != NULL ? GetBackgroundWorkerPid(
|
||||
|
|
|
@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
|
|||
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
|
||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||
extern void UnclaimConnection(MultiConnection *connection);
|
||||
extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection);
|
||||
extern void MarkConnectionConnected(MultiConnection *connection);
|
||||
|
||||
/* waiteventset utilities */
|
||||
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* stat_counters.h
|
||||
*
|
||||
* This file contains the exported functions to track various statistic
|
||||
* counters for Citus.
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef STAT_COUNTERS_H
|
||||
#define STAT_COUNTERS_H
|
||||
|
||||
#include <port/atomics.h>
|
||||
|
||||
#include "distributed/time_constants.h"
|
||||
|
||||
#define STAT_COUNTERS_STATE_LOCK_TRANCHE_NAME "citus_stat_counters_lock_tranche"
|
||||
|
||||
#define DEFAULT_STAT_COUNTERS_FLUSH_INTERVAL (30 * MS_PER_SECOND)
|
||||
#define DISABLE_STAT_COUNTERS_FLUSH_INTERVAL -1
|
||||
|
||||
#define DEFAULT_STAT_COUNTERS_PURGE_INTERVAL (5 * MS_PER_MINUTE)
|
||||
|
||||
/*
|
||||
* Must be in the same order as the columns defined in citus_stat_counters view,
|
||||
* see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
|
||||
STAT_CONNECTION_ESTABLISHMENT_FAILED,
|
||||
STAT_CONNECTION_REUSED,
|
||||
|
||||
STAT_QUERY_EXECUTION_SINGLE_SHARD,
|
||||
STAT_QUERY_EXECUTION_MULTI_SHARD,
|
||||
|
||||
/* do not use this and ensure it is the last entry */
|
||||
N_CITUS_STAT_COUNTERS
|
||||
} StatType;
|
||||
|
||||
extern int StatCountersFlushInterval;
|
||||
extern int StatCountersPurgeInterval;
|
||||
|
||||
extern bool IsCitusStatCountersEnabled(void);
|
||||
extern void IncrementStatCounterForMyDb(int statId);
|
||||
|
||||
extern void InitializeStatCountersArrayMem(void);
|
||||
extern void CitusStatCountersFlushAtExit(int code, Datum arg);
|
||||
extern Size StatCountersArrayShmemSize(void);
|
||||
extern void CitusStatCountersRemoveDroppedDatabases(void);
|
||||
|
||||
#endif /* STAT_COUNTERS_H */
|
|
@ -1480,11 +1480,8 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
|
||||
| function citus_internal.update_relation_colocation(oid,integer) void
|
||||
| function citus_is_primary_node() boolean
|
||||
| function citus_stat_counters(oid) SETOF record
|
||||
| function citus_stat_counters_reset(oid) void
|
||||
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
|
||||
| view citus_stat_counters
|
||||
(30 rows)
|
||||
(27 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -179,8 +179,6 @@ ORDER BY 1;
|
|||
function citus_shards_on_worker()
|
||||
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
|
||||
function citus_stat_activity()
|
||||
function citus_stat_counters(oid)
|
||||
function citus_stat_counters_reset(oid)
|
||||
function citus_stat_statements()
|
||||
function citus_stat_statements_reset()
|
||||
function citus_stat_tenants(boolean)
|
||||
|
@ -386,12 +384,11 @@ ORDER BY 1;
|
|||
view citus_shards
|
||||
view citus_shards_on_worker
|
||||
view citus_stat_activity
|
||||
view citus_stat_counters
|
||||
view citus_stat_statements
|
||||
view citus_stat_tenants
|
||||
view citus_stat_tenants_local
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(361 rows)
|
||||
(358 rows)
|
||||
|
||||
DROP TABLE extension_basic_types;
|
||||
|
|
Loading…
Reference in New Issue