diff --git a/src/backend/distributed/stats/stat_counters.c b/src/backend/distributed/stats/stat_counters.c index 857af3a6c..5136b3684 100644 --- a/src/backend/distributed/stats/stat_counters.c +++ b/src/backend/distributed/stats/stat_counters.c @@ -215,6 +215,14 @@ static void ResetSavedBackendStats(Oid databaseId, bool force); static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId); +/* helper functions for StatCounters and AtomicStatCounters */ +static void AccumulateStatCounters(StatCounters *sourceCounters, + StatCounters *targetCounters); +static void AccumulateAtomicStatCounters(AtomicStatCounters *sourceCounters, + StatCounters *targetCounters); +static void ResetStatCounters(StatCounters *counters); +static void ResetAtomicStatCounters(AtomicStatCounters *counters); + /* sql exports */ PG_FUNCTION_INFO_V1(citus_stat_counters); @@ -551,23 +559,15 @@ SaveBackendStatsIntoSavedBackendStatsHash(void) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) - { - dbSavedBackendStatsEntry->counters.simpleCounters[statIdx] += - pg_atomic_read_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx]); + AccumulateAtomicStatCounters(&myBackendStatsSlot->counters, + &dbSavedBackendStatsEntry->counters); - /* - * Given that this function is only called when a backend exits, later on - * another backend might be assigned to the same slot. So, we reset each - * stat counter of this slot to 0 after saving it. - */ - pg_atomic_write_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx], 0); - } - - dbSavedBackendStatsEntry->counters.connEstabFailedCounter += - pg_atomic_read_u64(&myBackendStatsSlot->counters.connEstabFailedCounter); - - pg_atomic_write_u64(&myBackendStatsSlot->counters.connEstabFailedCounter, 0); + /* + * Given that this function is only called when a backend exits, later on + * another backend might be assigned to the same slot. So, we reset each + * stat counter of this slot to 0 after saving it. + */ + ResetAtomicStatCounters(&myBackendStatsSlot->counters); SpinLockRelease(&dbSavedBackendStatsEntry->mutex); @@ -683,7 +683,6 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx) { PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx); - if (backendProc->pid == 0) { /* unused slot */ @@ -709,17 +708,9 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) DatabaseStatsHashEntry *dbStatsEntry = DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats); - BackendStatsSlot *backendStatsSlot = - &SharedBackendStatsSlotArray[backendSlotIdx]; - - for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) - { - dbStatsEntry->counters.simpleCounters[statIdx] += - pg_atomic_read_u64(&backendStatsSlot->counters.simpleCounters[statIdx]); - } - - dbStatsEntry->counters.connEstabFailedCounter += - pg_atomic_read_u64(&backendStatsSlot->counters.connEstabFailedCounter); + BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx]; + AccumulateAtomicStatCounters(&backendStatsSlot->counters, + &dbStatsEntry->counters); } } @@ -757,17 +748,9 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) - { - dbStatsEntry->counters.simpleCounters[statIdx] += - dbSavedBackendStatsEntry->counters.simpleCounters[statIdx]; - } - - dbStatsEntry->counters.connEstabFailedCounter += - dbSavedBackendStatsEntry->counters.connEstabFailedCounter; - - dbStatsEntry->resetTimestamp = - dbSavedBackendStatsEntry->resetTimestamp; + AccumulateStatCounters(&dbSavedBackendStatsEntry->counters, + &dbStatsEntry->counters); + dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp; SpinLockRelease(&dbSavedBackendStatsEntry->mutex); } @@ -786,17 +769,9 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats) SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) - { - dbStatsEntry->counters.simpleCounters[statIdx] += - dbSavedBackendStatsEntry->counters.simpleCounters[statIdx]; - } - - dbStatsEntry->counters.connEstabFailedCounter += - dbSavedBackendStatsEntry->counters.connEstabFailedCounter; - - dbStatsEntry->resetTimestamp = - dbSavedBackendStatsEntry->resetTimestamp; + AccumulateStatCounters(&dbSavedBackendStatsEntry->counters, + &dbStatsEntry->counters); + dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp; SpinLockRelease(&dbSavedBackendStatsEntry->mutex); } @@ -821,8 +796,7 @@ DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats) if (!found) { - MemSet(dbStatsEntry->counters.simpleCounters, 0, sizeof(dbStatsEntry->counters.simpleCounters)); - dbStatsEntry->counters.connEstabFailedCounter = 0; + ResetStatCounters(&dbStatsEntry->counters); dbStatsEntry->resetTimestamp = 0; } @@ -929,13 +903,7 @@ ResetActiveBackendStats(Oid databaseId) BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx]; - - for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) - { - pg_atomic_write_u64(&backendStatsSlot->counters.simpleCounters[statIdx], 0); - } - - pg_atomic_write_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0); + ResetAtomicStatCounters(&backendStatsSlot->counters); } return foundAny; @@ -997,9 +965,7 @@ ResetSavedBackendStats(Oid databaseId, bool force) { SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); - memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, - sizeof(dbSavedBackendStatsEntry->counters.simpleCounters)); - dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0; + ResetStatCounters(&dbSavedBackendStatsEntry->counters); dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp(); SpinLockRelease(&dbSavedBackendStatsEntry->mutex); @@ -1041,15 +1007,71 @@ SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId) if (!found) { - memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, - sizeof(dbSavedBackendStatsEntry->counters.simpleCounters)); - - dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0; - + ResetStatCounters(&dbSavedBackendStatsEntry->counters); dbSavedBackendStatsEntry->resetTimestamp = 0; - SpinLockInit(&dbSavedBackendStatsEntry->mutex); } return dbSavedBackendStatsEntry; } + + +/* + * AccumulateStatCounters accumulates source StatCounters into target StatCounters. + */ +static void +AccumulateStatCounters(StatCounters *sourceCounters, StatCounters *targetCounters) +{ + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) + { + targetCounters->simpleCounters[statIdx] += + sourceCounters->simpleCounters[statIdx]; + } + + targetCounters->connEstabFailedCounter += sourceCounters->connEstabFailedCounter; +} + + +/* + * AccumulateAtomicStatCounters accumulates source AtomicStatCounters into + * target StatCounters. + */ +static void +AccumulateAtomicStatCounters(AtomicStatCounters *sourceCounters, + StatCounters *targetCounters) +{ + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) + { + targetCounters->simpleCounters[statIdx] += + pg_atomic_read_u64(&sourceCounters->simpleCounters[statIdx]); + } + + targetCounters->connEstabFailedCounter += + pg_atomic_read_u64(&sourceCounters->connEstabFailedCounter); +} + + +/* + * ResetStatCounters resets all counters in given StatCounters to 0. + */ +static void +ResetStatCounters(StatCounters *counters) +{ + memset(counters->simpleCounters, 0, sizeof(counters->simpleCounters)); + counters->connEstabFailedCounter = 0; +} + + +/* + * ResetAtomicStatCounters resets all atomic counters in given AtomicStatCounters to 0. + */ +static void +ResetAtomicStatCounters(AtomicStatCounters *counters) +{ + for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) + { + pg_atomic_write_u64(&counters->simpleCounters[statIdx], 0); + } + + pg_atomic_write_u64(&counters->connEstabFailedCounter, 0); +}