PG-501: Missing Buckets and incorrect calls count. (#298)

prev_bucket_sec holds the actual time at which the previous bucket was created
and it is used to compute if the previous bucket time has elapsed and when is
the time to create a new one. But since the bucket start time is rounded down
to logical time window start, that makes the prev_bucket_sec and bucket start
time out of sync with each other, and depending on the query arrival time there
is a high probability that a bucket gets missed especially when the last bucket
was created around the end of the bucket time window.

Solution is to keep the prev_bucket_sec and bucket start time in-sync.

Moreover, we are using the unint64 for storing the prev_bucket_sec which is kind
of an overkill and a simple uint should be good enough for the purpose. But that
change can be taken up as part of the create-bucket function refactoring task.
pull/299/head
Muhammad Usama 2022-08-29 17:31:01 +05:00 committed by GitHub
parent e2d4603a13
commit bb5e6a4e88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 5 additions and 4 deletions

View File

@ -2146,7 +2146,6 @@ static uint64
get_next_wbucket(pgssSharedState *pgss)
{
struct timeval tv;
uint64 current_sec;
uint64 current_bucket_sec;
uint64 new_bucket_id;
uint64 prev_bucket_id;
@ -2154,7 +2153,6 @@ get_next_wbucket(pgssSharedState *pgss)
bool update_bucket = false;
gettimeofday(&tv, NULL);
current_sec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
current_bucket_sec = pg_atomic_read_u64(&pgss->prev_bucket_sec);
/*
@ -2172,9 +2170,9 @@ get_next_wbucket(pgssSharedState *pgss)
* definitely make the while condition to fail, we can stop the loop as
* another thread has already updated prev_bucket_sec.
*/
while ((current_sec - current_bucket_sec) > ((uint64)PGSM_BUCKET_TIME))
while ((tv.tv_sec - (uint)current_bucket_sec) > ((uint)PGSM_BUCKET_TIME))
{
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_sec, &current_bucket_sec, current_sec))
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_sec, &current_bucket_sec, (uint64)tv.tv_sec))
{
update_bucket = true;
break;
@ -2217,6 +2215,9 @@ get_next_wbucket(pgssSharedState *pgss)
tv.tv_sec = (tv.tv_sec) - (tv.tv_sec % PGSM_BUCKET_TIME);
lt = localtime(&tv.tv_sec);
/* Allign the value in prev_bucket_sec to the bucket start time */
pg_atomic_exchange_u64(&pgss->prev_bucket_sec, (uint64)tv.tv_sec);
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec);