diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 79dc4719a..fba228c2b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2622,11 +2622,11 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) 0; if (copiedShardCount <= 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } } @@ -3174,11 +3174,11 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag) if (list_length(shardIntervalList) <= 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } table_close(distributedRelation, AccessShareLock); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 407de776b..06e5d3728 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -364,7 +364,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, */ if (PQstatus(connection->pgConn) == CONNECTION_OK) { - IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED); + IncrementSimpleStatCounterForMyDb(STAT_CONNECTION_REUSED); } return connection; @@ -1007,7 +1007,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) * Here we count the connections establishments that failed and that * we won't wait anymore. */ - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); } } @@ -1110,7 +1110,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) eventMask, NULL); if (!success) { - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("connection establishment for node %s:%d " "failed", connection->hostname, @@ -1220,7 +1220,7 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) /* close connection, otherwise we take up resource on the other side */ CitusPQFinish(connection); - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); } } @@ -1644,7 +1644,7 @@ MarkConnectionConnected(MultiConnection *connection, bool newConnection) if (newConnection) { - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED); + IncrementSimpleStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED); } } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 677535591..1c115c346 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2036,7 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution) else { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); } @@ -2854,7 +2854,7 @@ MarkEstablishingSessionsTimedOut(WorkerPool *workerPool) connection->connectionState == MULTI_CONNECTION_INITIAL) { connection->connectionState = MULTI_CONNECTION_TIMED_OUT; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); } } } @@ -3039,7 +3039,7 @@ ConnectionStateMachine(WorkerSession *session) else if (status == CONNECTION_BAD) { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); break; } @@ -3055,7 +3055,7 @@ ConnectionStateMachine(WorkerSession *session) if (pollMode == PGRES_POLLING_FAILED) { connection->connectionState = MULTI_CONNECTION_FAILED; - IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED); + IncrementConnEstabFailedStatCounterForMyDb(); } else if (pollMode == PGRES_POLLING_READING) { diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 53b0ccb0f..e5e26ada6 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -269,11 +269,11 @@ CitusExecScan(CustomScanState *node) if (isMultiTaskPlan) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } scanState->finishedRemoteScan = true; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 58c172c66..a58849d95 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -189,11 +189,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) * MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this * here. */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } /* @@ -218,11 +218,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) if (list_length(taskList) <= 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } executorState->es_processed = rowsInserted; @@ -302,11 +302,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) if (list_length(prunedTaskList) <= 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } } else diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index 56bde62bc..3f35e07af 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -176,11 +176,11 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) * most probably choose to use ExecuteSourceAtCoordAndRedistribution(), * but we still keep this here. */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } ereport(DEBUG1, (errmsg("Executing final MERGE on workers using " @@ -213,11 +213,11 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) if (list_length(taskList) <= 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } executorState->es_processed = rowsMerged; @@ -318,7 +318,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) * No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD * as per our convention. */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); return; } @@ -341,11 +341,11 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) if (list_length(prunedTaskList) == 1) { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } else { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); } executorState->es_processed = rowsMerged; diff --git a/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql b/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql index 815d4e794..ba66ddfc7 100644 --- a/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql +++ b/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql @@ -3,3 +3,4 @@ #include "udfs/citus_prepare_pg_upgrade/14.0-1.sql" #include "udfs/citus_finish_pg_upgrade/14.0-1.sql" +#include "udfs/citus_stat_counters/14.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql b/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql index 199030339..9c5a0d5f3 100644 --- a/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql @@ -3,3 +3,7 @@ #include "../udfs/citus_prepare_pg_upgrade/13.0-1.sql" #include "../udfs/citus_finish_pg_upgrade/13.2-1.sql" + +DROP VIEW pg_catalog.citus_stat_counters; +DROP FUNCTION pg_catalog.citus_stat_counters(oid); +#include "../udfs/citus_stat_counters/13.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/14.0-1.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/14.0-1.sql new file mode 100644 index 000000000..d1aad9221 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_counters/14.0-1.sql @@ -0,0 +1,53 @@ +DROP VIEW pg_catalog.citus_stat_counters; +DROP FUNCTION pg_catalog.citus_stat_counters(oid); + +-- See the comments for the function in +-- src/backend/distributed/stats/stat_counters.c for more details. +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( + database_id oid DEFAULT 0, + + -- must always be the first column or you should accordingly update + -- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c + OUT database_id oid, + + -- Following stat counter columns must be in the same order as the + -- StatType enum defined in src/include/distributed/stats/stat_counters.h + OUT connection_establishment_succeeded bigint, + OUT connection_reused bigint, + OUT query_execution_single_shard bigint, + OUT query_execution_multi_shard bigint, + + OUT connection_establishment_failed bigint, + + -- must always be the last column or you should accordingly update + -- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c + OUT stats_reset timestamp with time zone +) +RETURNS SETOF RECORD +LANGUAGE C STRICT VOLATILE PARALLEL SAFE +AS 'MODULE_PATHNAME', $$citus_stat_counters$$; +COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.'; + +-- returns the stat counters for all the databases in local node +CREATE OR REPLACE VIEW citus.citus_stat_counters AS +SELECT pg_database.oid, + pg_database.datname as name, + + -- We always COALESCE the counters to 0 because the LEFT JOIN + -- will bring the databases that have never been connected to + -- since the last restart with NULL counters, but we want to + -- show them with 0 counters in the view. + COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded, + COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed, + COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused, + COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard, + COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard, + + citus_stat_counters.stats_reset +FROM pg_catalog.pg_database +LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters +ON (oid = database_id); + +ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog; + +GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql index 3792bf7c8..d1aad9221 100644 --- a/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql @@ -1,3 +1,6 @@ +DROP VIEW pg_catalog.citus_stat_counters; +DROP FUNCTION pg_catalog.citus_stat_counters(oid); + -- See the comments for the function in -- src/backend/distributed/stats/stat_counters.c for more details. CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( @@ -10,11 +13,12 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters( -- Following stat counter columns must be in the same order as the -- StatType enum defined in src/include/distributed/stats/stat_counters.h OUT connection_establishment_succeeded bigint, - OUT connection_establishment_failed bigint, OUT connection_reused bigint, OUT query_execution_single_shard bigint, OUT query_execution_multi_shard bigint, + OUT connection_establishment_failed bigint, + -- must always be the last column or you should accordingly update -- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c OUT stats_reset timestamp with time zone @@ -25,7 +29,7 @@ AS 'MODULE_PATHNAME', $$citus_stat_counters$$; COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.'; -- returns the stat counters for all the databases in local node -CREATE VIEW citus.citus_stat_counters AS +CREATE OR REPLACE VIEW citus.citus_stat_counters AS SELECT pg_database.oid, pg_database.datname as name, diff --git a/src/backend/distributed/stats/stat_counters.c b/src/backend/distributed/stats/stat_counters.c index 03151befd..857af3a6c 100644 --- a/src/backend/distributed/stats/stat_counters.c +++ b/src/backend/distributed/stats/stat_counters.c @@ -7,7 +7,7 @@ * * We create an array of "BackendStatsSlot"s in shared memory, one for * each backend. Each backend increments its own stat counters in its - * own slot via IncrementStatCounterForMyDb(). And when a backend exits, + * own slot via IncrementSimpleStatCounterForMyDb(). And when a backend exits, * it saves its stat counters from its slot via * SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in * shared memory, whose entries are "SavedBackendStatsHashEntry"s and @@ -48,7 +48,7 @@ * There is chance that citus_stat_counters_reset() might race with a * backend that is trying to increment one of the counters in its slot * and as a result it can effectively fail to reset that counter due to - * the reasons documented in IncrementStatCounterForMyDb() function. + * the reasons documented in IncrementSimpleStatCounterForMyDb() function. * However, this should be a very rare case and we can live with that * for now. * @@ -97,8 +97,17 @@ /* fixed size array types to store the stat counters */ -typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS]; -typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS]; +typedef struct AtomicStatCounters +{ + pg_atomic_uint64 simpleCounters[N_SIMPLE_CITUS_STAT_COUNTERS]; + pg_atomic_uint64 connEstabFailedCounter; +} AtomicStatCounters; + +typedef struct StatCounters +{ + uint64 simpleCounters[N_SIMPLE_CITUS_STAT_COUNTERS]; + uint64 connEstabFailedCounter; +} StatCounters; /* * saved backend stats - hash entry definition @@ -153,7 +162,7 @@ typedef struct BackendStatsSlot * GUC variable * * This only controls whether we track the stat counters or not, via - * IncrementStatCounterForMyDb() and + * IncrementSimpleStatCounterForMyDb() and * SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even * when the GUC is disabled, we still allocate the shared memory * structures etc. and citus_stat_counters() / citus_stat_counters_reset() @@ -394,11 +403,11 @@ StatCountersShmemSize(void) /* - * IncrementStatCounterForMyDb increments the stat counter for the given statId + * IncrementSimpleStatCounterForMyDb increments the stat counter for the given statId * for this backend. */ void -IncrementStatCounterForMyDb(int statId) +IncrementSimpleStatCounterForMyDb(int statId) { if (!EnableStatCounters) { @@ -430,7 +439,45 @@ IncrementStatCounterForMyDb(int statId) * But this should be a rare case and we can live with that, for the * sake of lock-free implementation of this function. */ - pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId]; + pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters.simpleCounters[statId]; + pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1); +} + + +void +IncrementConnEstabFailedStatCounterForMyDb(void) +{ + if (!EnableStatCounters) + { + return; + } + + /* just to be on the safe side */ + if (!EnsureStatCountersShmemInitDone()) + { + return; + } + + int myBackendSlotIdx = getProcNo_compat(MyProc); + BackendStatsSlot *myBackendStatsSlot = + &SharedBackendStatsSlotArray[myBackendSlotIdx]; + + /* + * When there cannot be any other writers, incrementing an atomic + * counter via pg_atomic_read_u64() and pg_atomic_write_u64() is + * same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the + * former is cheaper than the latter because the latter has to do + * extra work to deal with concurrent writers. + * + * In our case, the only concurrent writer could be the backend that + * is executing citus_stat_counters_reset(). So, there is chance that + * we read the counter value, then it gets reset by a concurrent call + * made to citus_stat_counters_reset() and then we write the + * incremented value back, by effectively overriding the reset value. + * But this should be a rare case and we can live with that, for the + * sake of lock-free implementation of this function. + */ + pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters.connEstabFailedCounter; pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1); } @@ -504,19 +551,24 @@ SaveBackendStatsIntoSavedBackendStatsHash(void) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - dbSavedBackendStatsEntry->counters[statIdx] += - pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]); + dbSavedBackendStatsEntry->counters.simpleCounters[statIdx] += + pg_atomic_read_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx]); /* * Given that this function is only called when a backend exits, later on * another backend might be assigned to the same slot. So, we reset each * stat counter of this slot to 0 after saving it. */ - pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0); + pg_atomic_write_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx], 0); } + dbSavedBackendStatsEntry->counters.connEstabFailedCounter += + pg_atomic_read_u64(&myBackendStatsSlot->counters.connEstabFailedCounter); + + pg_atomic_write_u64(&myBackendStatsSlot->counters.connEstabFailedCounter, 0); + SpinLockRelease(&dbSavedBackendStatsEntry->mutex); LWLockRelease(*SharedSavedBackendStatsHashLock); @@ -570,10 +622,13 @@ StatCountersShmemInit(void) BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx]; - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0); + pg_atomic_init_u64(&backendStatsSlot->counters.simpleCounters[statIdx], + 0); } + + pg_atomic_init_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0); } *SharedSavedBackendStatsHashLock = &( @@ -657,11 +712,14 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx]; - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - dbStatsEntry->counters[statIdx] += - pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]); + dbStatsEntry->counters.simpleCounters[statIdx] += + pg_atomic_read_u64(&backendStatsSlot->counters.simpleCounters[statIdx]); } + + dbStatsEntry->counters.connEstabFailedCounter += + pg_atomic_read_u64(&backendStatsSlot->counters.connEstabFailedCounter); } } @@ -699,12 +757,15 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - dbStatsEntry->counters[statIdx] += - dbSavedBackendStatsEntry->counters[statIdx]; + dbStatsEntry->counters.simpleCounters[statIdx] += + dbSavedBackendStatsEntry->counters.simpleCounters[statIdx]; } + dbStatsEntry->counters.connEstabFailedCounter += + dbSavedBackendStatsEntry->counters.connEstabFailedCounter; + dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp; @@ -725,12 +786,15 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - dbStatsEntry->counters[statIdx] += - dbSavedBackendStatsEntry->counters[statIdx]; + dbStatsEntry->counters.simpleCounters[statIdx] += + dbSavedBackendStatsEntry->counters.simpleCounters[statIdx]; } + dbStatsEntry->counters.connEstabFailedCounter += + dbSavedBackendStatsEntry->counters.connEstabFailedCounter; + dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp; @@ -757,7 +821,8 @@ DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats) if (!found) { - MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters)); + MemSet(dbStatsEntry->counters.simpleCounters, 0, sizeof(dbStatsEntry->counters.simpleCounters)); + dbStatsEntry->counters.connEstabFailedCounter = 0; dbStatsEntry->resetTimestamp = 0; } @@ -779,29 +844,39 @@ StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore, DatabaseStatsHashEntry *dbStatsEntry = NULL; while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL) { - /* +2 for database_id (first) and the stats_reset (last) column */ - Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 }; - bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 }; + /* + * +3 for: + * database_id (first) + * conn_estab_failed_count (after the simple counters) + * the stats_reset (last) + */ + Datum values[N_SIMPLE_CITUS_STAT_COUNTERS + 3] = { 0 }; + bool isNulls[N_SIMPLE_CITUS_STAT_COUNTERS + 3] = { 0 }; - values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId); + int writeIdx = 0; + values[writeIdx++] = ObjectIdGetDatum(dbStatsEntry->databaseId); - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - uint64 statCounter = dbStatsEntry->counters[statIdx]; - values[statIdx + 1] = UInt64GetDatum(statCounter); + uint64 statCounter = dbStatsEntry->counters.simpleCounters[statIdx]; + values[writeIdx++] = UInt64GetDatum(statCounter); } + values[writeIdx++] = UInt64GetDatum( + dbStatsEntry->counters.connEstabFailedCounter); + /* set stats_reset column to NULL if it was never reset */ if (dbStatsEntry->resetTimestamp == 0) { - isNulls[N_CITUS_STAT_COUNTERS + 1] = true; + isNulls[writeIdx++] = true; } else { - values[N_CITUS_STAT_COUNTERS + 1] = - TimestampTzGetDatum(dbStatsEntry->resetTimestamp); + values[writeIdx++] = TimestampTzGetDatum(dbStatsEntry->resetTimestamp); } + Assert(writeIdx == lengthof(values)); + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } } @@ -855,10 +930,12 @@ ResetActiveBackendStats(Oid databaseId) BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx]; - for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++) + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) { - pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0); + pg_atomic_write_u64(&backendStatsSlot->counters.simpleCounters[statIdx], 0); } + + pg_atomic_write_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0); } return foundAny; @@ -920,7 +997,9 @@ ResetSavedBackendStats(Oid databaseId, bool force) { SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters)); + memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, + sizeof(dbSavedBackendStatsEntry->counters.simpleCounters)); + dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0; dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp(); SpinLockRelease(&dbSavedBackendStatsEntry->mutex); @@ -962,7 +1041,10 @@ SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId) if (!found) { - memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters)); + memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, + sizeof(dbSavedBackendStatsEntry->counters.simpleCounters)); + + dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0; dbSavedBackendStatsEntry->resetTimestamp = 0; diff --git a/src/include/distributed/stats/stat_counters.h b/src/include/distributed/stats/stat_counters.h index c673c062c..43f409d27 100644 --- a/src/include/distributed/stats/stat_counters.h +++ b/src/include/distributed/stats/stat_counters.h @@ -31,7 +31,6 @@ typedef enum * adaptive_executor.c. */ STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED, - STAT_CONNECTION_ESTABLISHMENT_FAILED, STAT_CONNECTION_REUSED, /* @@ -48,8 +47,8 @@ typedef enum STAT_QUERY_EXECUTION_MULTI_SHARD, /* do not use this and ensure it is the last entry */ - N_CITUS_STAT_COUNTERS -} StatType; + N_SIMPLE_CITUS_STAT_COUNTERS +} SimpleStatType; /* GUC variable */ @@ -60,8 +59,9 @@ extern bool EnableStatCounters; extern void InitializeStatCountersShmem(void); extern Size StatCountersShmemSize(void); -/* main entry point for the callers who want to increment the stat counters */ -extern void IncrementStatCounterForMyDb(int statId); +/* main entry point for the callers who want to increment the simple stat counters */ +extern void IncrementSimpleStatCounterForMyDb(int statId); +extern void IncrementConnEstabFailedStatCounterForMyDb(void); /* * Exported to define a before_shmem_exit() callback that saves