PG-244: Fix race condition in get_next_wbucket().

The if condition bellow in geta_next_wbucket() was subject to a race
condition:
if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 *
1000))

Two or more threads/processes could easily evaluate this condition to
true, thus executing more than once the block that would calculate a
new bucket id, clear/move old entries in the pgss_query_hash and
pgss_hash hash tables.

To avoid this problem, we define prev_bucket_usec and current_wbucket
variables as atomic and execute a loop to check if another thread has
updated prev_bucket_usec before the current one.
pull/114/head
Diego Fronza 2021-09-30 17:13:27 -03:00
parent f269af3da2
commit 89743e9243
3 changed files with 70 additions and 50 deletions

View File

@ -146,7 +146,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found);
if (!found) if (!found)
{ {
pgss->bucket_entry[pgss->current_wbucket]++; pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++;
/* New entry, initialize it */ /* New entry, initialize it */
/* reset the statistics */ /* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters)); memset(&entry->counters, 0, sizeof(Counters));
@ -237,22 +237,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
List *pending_entries = NIL; List *pending_entries = NIL;
ListCell *pending_entry; ListCell *pending_entry;
/*
* During transition to a new bucket id, a rare but possible race
* condition may happen while reading pgss->current_wbucket. If a
* different thread/process updates pgss->current_wbucket before this
* function is called, it may happen that old_bucket_id == new_bucket_id.
* If that is the case, we adjust the old bucket id here instead of using
* a lock in order to avoid the overhead.
*/
if (old_bucket_id != -1 && old_bucket_id == new_bucket_id)
{
if (old_bucket_id == 0)
old_bucket_id = PGSM_MAX_BUCKETS - 1;
else
old_bucket_id--;
}
hash_seq_init(&hash_seq, pgss_hash); hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL) while ((entry = hash_seq_search(&hash_seq)) != NULL)
{ {
@ -344,7 +328,7 @@ hash_entry_reset()
{ {
hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
} }
pgss->current_wbucket = 0; pg_atomic_write_u64(&pgss->current_wbucket, 0);
LWLockRelease(pgss->lock); LWLockRelease(pgss->lock);
} }

View File

@ -1490,6 +1490,7 @@ pgss_store(uint64 queryid,
int application_name_len = pg_get_application_name(application_name); int application_name_len = pg_get_application_name(application_name);
bool reset = false; bool reset = false;
uint64 bucketid; uint64 bucketid;
uint64 prev_bucket_id;
uint64 userid; uint64 userid;
int con; int con;
uint64 dbid = MyDatabaseId; uint64 dbid = MyDatabaseId;
@ -1511,15 +1512,14 @@ pgss_store(uint64 queryid,
extract_query_comments(query, comments, sizeof(comments)); extract_query_comments(query, comments, sizeof(comments));
/* Safety check... */ /* Safety check... */
if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket]) if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)])
return; return;
prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket);
bucketid = get_next_wbucket(pgss); bucketid = get_next_wbucket(pgss);
if (bucketid != pgss->current_wbucket)
{ if (bucketid != prev_bucket_id)
reset = true; reset = true;
pgss->current_wbucket = bucketid;
}
LWLockAcquire(pgss->lock, LW_EXCLUSIVE); LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
@ -1991,39 +1991,75 @@ get_next_wbucket(pgssSharedState *pgss)
{ {
struct timeval tv; struct timeval tv;
uint64 current_usec; uint64 current_usec;
uint64 bucket_id; uint64 current_bucket_usec;
uint64 new_bucket_id;
uint64 prev_bucket_id;
struct tm *lt; struct tm *lt;
bool update_bucket = false;
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec;
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) /*
* If current bucket expired we loop attempting to update prev_bucket_usec.
*
* pg_atomic_compare_exchange_u64 may fail in two possible ways:
* 1. Another thread/process updated the variable before us.
* 2. A spurious failure / hardware event.
*
* In both failure cases we read prev_bucket_usec from memory again, if it was
* a spurious failure then the value of prev_bucket_usec must be the same as
* before, which will cause the while loop to execute again.
*
* If another thread updated prev_bucket_usec, then its current value will
* definitely make the while condition to fail, we can stop the loop as another
* thread has already updated prev_bucket_usec.
*/
while ((current_usec - current_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000))
{
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, &current_bucket_usec, current_usec))
{
update_bucket = true;
break;
}
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
}
if (update_bucket)
{ {
unsigned char *buf; unsigned char *buf;
char file_name[1024]; char file_name[1024];
int sec = 0; int sec = 0;
bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
buf = pgss_qbuf[bucket_id];
hash_entry_dealloc(bucket_id, pgss->current_wbucket);
hash_query_entry_dealloc(bucket_id, buf);
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); /* Update bucket id and retrieve the previous one. */
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
buf = pgss_qbuf[new_bucket_id];
hash_entry_dealloc(new_bucket_id, prev_bucket_id);
hash_query_entry_dealloc(new_bucket_id, buf);
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id);
unlink(file_name); unlink(file_name);
LWLockRelease(pgss->lock); LWLockRelease(pgss->lock);
pgss->prev_bucket_usec = current_usec;
lt = localtime(&tv.tv_sec); lt = localtime(&tv.tv_sec);
sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME); sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME);
if (sec < 0) if (sec < 0)
sec = 0; sec = 0;
snprintf(pgss->bucket_start_time[bucket_id], sizeof(pgss->bucket_start_time[bucket_id]), snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec); "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec);
return bucket_id;
return new_bucket_id;
} }
return pgss->current_wbucket;
return pg_atomic_read_u64(&pgss->current_wbucket);
} }
#if PG_VERSION_NUM < 140000 #if PG_VERSION_NUM < 140000
@ -3024,7 +3060,7 @@ pgss_store_query_info(uint64 bucketid,
pgssStoreKind kind) pgssStoreKind kind)
{ {
pgssSharedState *pgss = pgsm_get_ss(); pgssSharedState *pgss = pgsm_get_ss();
unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)];
pgssQueryEntry *entry; pgssQueryEntry *entry;
if (query_len > PGSM_QUERY_MAX_LEN) if (query_len > PGSM_QUERY_MAX_LEN)

View File

@ -306,8 +306,8 @@ typedef struct pgssSharedState
slock_t mutex; /* protects following fields only: */ slock_t mutex; /* protects following fields only: */
Size extent; /* current extent of query file */ Size extent; /* current extent of query file */
int64 n_writers; /* number of active writers to query file */ int64 n_writers; /* number of active writers to query file */
uint64 current_wbucket; pg_atomic_uint64 current_wbucket;
uint64 prev_bucket_usec; pg_atomic_uint64 prev_bucket_usec;
uint64 bucket_entry[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS];
int64 query_buf_size_bucket; int64 query_buf_size_bucket;
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
@ -318,8 +318,8 @@ do { \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->n_writers = 0; \ x->n_writers = 0; \
x->current_wbucket = 0; \ pg_atomic_init_u64(&x->current_wbucket, 0); \
x->prev_bucket_usec = 0; \ pg_atomic_init_u64(&x->prev_bucket_usec, 0); \
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
} while(0) } while(0)