refactor: separate common logic to helpers

stat-counters-conn-failure-target-node
Onur Tirtir 2025-10-13 15:05:32 +03:00
parent 76320723ef
commit f43dcd4614
1 changed files with 90 additions and 68 deletions

View File

@ -215,6 +215,14 @@ static void ResetSavedBackendStats(Oid databaseId, bool force);
static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid
databaseId); databaseId);
/* helper functions for StatCounters and AtomicStatCounters */
static void AccumulateStatCounters(StatCounters *sourceCounters,
StatCounters *targetCounters);
static void AccumulateAtomicStatCounters(AtomicStatCounters *sourceCounters,
StatCounters *targetCounters);
static void ResetStatCounters(StatCounters *counters);
static void ResetAtomicStatCounters(AtomicStatCounters *counters);
/* sql exports */ /* sql exports */
PG_FUNCTION_INFO_V1(citus_stat_counters); PG_FUNCTION_INFO_V1(citus_stat_counters);
@ -551,23 +559,15 @@ SaveBackendStatsIntoSavedBackendStatsHash(void)
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) AccumulateAtomicStatCounters(&myBackendStatsSlot->counters,
{ &dbSavedBackendStatsEntry->counters);
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx] +=
pg_atomic_read_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx]);
/* /*
* Given that this function is only called when a backend exits, later on * Given that this function is only called when a backend exits, later on
* another backend might be assigned to the same slot. So, we reset each * another backend might be assigned to the same slot. So, we reset each
* stat counter of this slot to 0 after saving it. * stat counter of this slot to 0 after saving it.
*/ */
pg_atomic_write_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx], 0); ResetAtomicStatCounters(&myBackendStatsSlot->counters);
}
dbSavedBackendStatsEntry->counters.connEstabFailedCounter +=
pg_atomic_read_u64(&myBackendStatsSlot->counters.connEstabFailedCounter);
pg_atomic_write_u64(&myBackendStatsSlot->counters.connEstabFailedCounter, 0);
SpinLockRelease(&dbSavedBackendStatsEntry->mutex); SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
@ -683,7 +683,6 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx) for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{ {
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx); PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
if (backendProc->pid == 0) if (backendProc->pid == 0)
{ {
/* unused slot */ /* unused slot */
@ -709,17 +708,9 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
DatabaseStatsHashEntry *dbStatsEntry = DatabaseStatsHashEntry *dbStatsEntry =
DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats); DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats);
BackendStatsSlot *backendStatsSlot = BackendStatsSlot *backendStatsSlot = &SharedBackendStatsSlotArray[backendSlotIdx];
&SharedBackendStatsSlotArray[backendSlotIdx]; AccumulateAtomicStatCounters(&backendStatsSlot->counters,
&dbStatsEntry->counters);
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters.simpleCounters[statIdx] +=
pg_atomic_read_u64(&backendStatsSlot->counters.simpleCounters[statIdx]);
}
dbStatsEntry->counters.connEstabFailedCounter +=
pg_atomic_read_u64(&backendStatsSlot->counters.connEstabFailedCounter);
} }
} }
@ -757,17 +748,9 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) AccumulateStatCounters(&dbSavedBackendStatsEntry->counters,
{ &dbStatsEntry->counters);
dbStatsEntry->counters.simpleCounters[statIdx] += dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp;
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx];
}
dbStatsEntry->counters.connEstabFailedCounter +=
dbSavedBackendStatsEntry->counters.connEstabFailedCounter;
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex); SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
} }
@ -786,17 +769,9 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++) AccumulateStatCounters(&dbSavedBackendStatsEntry->counters,
{ &dbStatsEntry->counters);
dbStatsEntry->counters.simpleCounters[statIdx] += dbStatsEntry->resetTimestamp = dbSavedBackendStatsEntry->resetTimestamp;
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx];
}
dbStatsEntry->counters.connEstabFailedCounter +=
dbSavedBackendStatsEntry->counters.connEstabFailedCounter;
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex); SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
} }
@ -821,8 +796,7 @@ DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats)
if (!found) if (!found)
{ {
MemSet(dbStatsEntry->counters.simpleCounters, 0, sizeof(dbStatsEntry->counters.simpleCounters)); ResetStatCounters(&dbStatsEntry->counters);
dbStatsEntry->counters.connEstabFailedCounter = 0;
dbStatsEntry->resetTimestamp = 0; dbStatsEntry->resetTimestamp = 0;
} }
@ -929,13 +903,7 @@ ResetActiveBackendStats(Oid databaseId)
BackendStatsSlot *backendStatsSlot = BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx]; &SharedBackendStatsSlotArray[backendSlotIdx];
ResetAtomicStatCounters(&backendStatsSlot->counters);
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_write_u64(&backendStatsSlot->counters.simpleCounters[statIdx], 0);
}
pg_atomic_write_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0);
} }
return foundAny; return foundAny;
@ -997,9 +965,7 @@ ResetSavedBackendStats(Oid databaseId, bool force)
{ {
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex); SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, ResetStatCounters(&dbSavedBackendStatsEntry->counters);
sizeof(dbSavedBackendStatsEntry->counters.simpleCounters));
dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0;
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp(); dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
SpinLockRelease(&dbSavedBackendStatsEntry->mutex); SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
@ -1041,15 +1007,71 @@ SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId)
if (!found) if (!found)
{ {
memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0, ResetStatCounters(&dbSavedBackendStatsEntry->counters);
sizeof(dbSavedBackendStatsEntry->counters.simpleCounters));
dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0;
dbSavedBackendStatsEntry->resetTimestamp = 0; dbSavedBackendStatsEntry->resetTimestamp = 0;
SpinLockInit(&dbSavedBackendStatsEntry->mutex); SpinLockInit(&dbSavedBackendStatsEntry->mutex);
} }
return dbSavedBackendStatsEntry; return dbSavedBackendStatsEntry;
} }
/*
* AccumulateStatCounters accumulates source StatCounters into target StatCounters.
*/
static void
AccumulateStatCounters(StatCounters *sourceCounters, StatCounters *targetCounters)
{
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
{
targetCounters->simpleCounters[statIdx] +=
sourceCounters->simpleCounters[statIdx];
}
targetCounters->connEstabFailedCounter += sourceCounters->connEstabFailedCounter;
}
/*
* AccumulateAtomicStatCounters accumulates source AtomicStatCounters into
* target StatCounters.
*/
static void
AccumulateAtomicStatCounters(AtomicStatCounters *sourceCounters,
StatCounters *targetCounters)
{
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
{
targetCounters->simpleCounters[statIdx] +=
pg_atomic_read_u64(&sourceCounters->simpleCounters[statIdx]);
}
targetCounters->connEstabFailedCounter +=
pg_atomic_read_u64(&sourceCounters->connEstabFailedCounter);
}
/*
* ResetStatCounters resets all counters in given StatCounters to 0.
*/
static void
ResetStatCounters(StatCounters *counters)
{
memset(counters->simpleCounters, 0, sizeof(counters->simpleCounters));
counters->connEstabFailedCounter = 0;
}
/*
* ResetAtomicStatCounters resets all atomic counters in given AtomicStatCounters to 0.
*/
static void
ResetAtomicStatCounters(AtomicStatCounters *counters)
{
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_write_u64(&counters->simpleCounters[statIdx], 0);
}
pg_atomic_write_u64(&counters->connEstabFailedCounter, 0);
}