diff --git a/hash_query.c b/hash_query.c index 6c01a52..df39b9b 100644 --- a/hash_query.c +++ b/hash_query.c @@ -146,7 +146,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); if (!found) { - pgss->bucket_entry[pgss->current_wbucket]++; + pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++; /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); @@ -236,22 +236,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) pgssEntry *entry = NULL; List *pending_entries = NIL; ListCell *pending_entry; - - /* - * During transition to a new bucket id, a rare but possible race - * condition may happen while reading pgss->current_wbucket. If a - * different thread/process updates pgss->current_wbucket before this - * function is called, it may happen that old_bucket_id == new_bucket_id. - * If that is the case, we adjust the old bucket id here instead of using - * a lock in order to avoid the overhead. - */ - if (old_bucket_id != -1 && old_bucket_id == new_bucket_id) - { - if (old_bucket_id == 0) - old_bucket_id = PGSM_MAX_BUCKETS - 1; - else - old_bucket_id--; - } hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) @@ -344,7 +328,7 @@ hash_entry_reset() { hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } - pgss->current_wbucket = 0; + pg_atomic_write_u64(&pgss->current_wbucket, 0); LWLockRelease(pgss->lock); } diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index b319b79..52b9760 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1490,6 +1490,7 @@ pgss_store(uint64 queryid, int application_name_len = pg_get_application_name(application_name); bool reset = false; uint64 bucketid; + uint64 prev_bucket_id; uint64 userid; int con; uint64 dbid = MyDatabaseId; @@ -1511,15 +1512,14 @@ pgss_store(uint64 queryid, extract_query_comments(query, comments, sizeof(comments)); /* Safety check... */ - if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket]) + if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) return; + prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); - if (bucketid != pgss->current_wbucket) - { + + if (bucketid != prev_bucket_id) reset = true; - pgss->current_wbucket = bucketid; - } LWLockAcquire(pgss->lock, LW_EXCLUSIVE); @@ -1990,40 +1990,76 @@ static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; - uint64 current_usec; - uint64 bucket_id; - struct tm *lt; + uint64 current_usec; + uint64 current_bucket_usec; + uint64 new_bucket_id; + uint64 prev_bucket_id; + struct tm *lt; + bool update_bucket = false; gettimeofday(&tv,NULL); current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); - if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + /* + * If current bucket expired we loop attempting to update prev_bucket_usec. + * + * pg_atomic_compare_exchange_u64 may fail in two possible ways: + * 1. Another thread/process updated the variable before us. + * 2. A spurious failure / hardware event. + * + * In both failure cases we read prev_bucket_usec from memory again, if it was + * a spurious failure then the value of prev_bucket_usec must be the same as + * before, which will cause the while loop to execute again. + * + * If another thread updated prev_bucket_usec, then its current value will + * definitely make the while condition to fail, we can stop the loop as another + * thread has already updated prev_bucket_usec. + */ + while ((current_usec - current_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + { + if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, ¤t_bucket_usec, current_usec)) + { + update_bucket = true; + break; + } + + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); + } + + if (update_bucket) { unsigned char *buf; char file_name[1024]; int sec = 0; - bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - buf = pgss_qbuf[bucket_id]; - hash_entry_dealloc(bucket_id, pgss->current_wbucket); - hash_query_entry_dealloc(bucket_id, buf); + new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); + /* Update bucket id and retrieve the previous one. */ + prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + buf = pgss_qbuf[new_bucket_id]; + hash_entry_dealloc(new_bucket_id, prev_bucket_id); + hash_query_entry_dealloc(new_bucket_id, buf); + + snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); LWLockRelease(pgss->lock); - pgss->prev_bucket_usec = current_usec; + lt = localtime(&tv.tv_sec); sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME); if (sec < 0) sec = 0; - snprintf(pgss->bucket_start_time[bucket_id], sizeof(pgss->bucket_start_time[bucket_id]), + snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]), "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec); - return bucket_id; + + return new_bucket_id; } - return pgss->current_wbucket; + + return pg_atomic_read_u64(&pgss->current_wbucket); } #if PG_VERSION_NUM < 140000 @@ -3024,7 +3060,7 @@ pgss_store_query_info(uint64 bucketid, pgssStoreKind kind) { pgssSharedState *pgss = pgsm_get_ss(); - unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; + unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]; pgssQueryEntry *entry; if (query_len > PGSM_QUERY_MAX_LEN) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index e9c4f62..85d5261 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -301,16 +301,16 @@ typedef struct pgssEntry */ typedef struct pgssSharedState { - LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ - slock_t mutex; /* protects following fields only: */ - Size extent; /* current extent of query file */ - int64 n_writers; /* number of active writers to query file */ - uint64 current_wbucket; - uint64 prev_bucket_usec; - uint64 bucket_entry[MAX_BUCKETS]; - int64 query_buf_size_bucket; - char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ + LWLock *lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ + slock_t mutex; /* protects following fields only: */ + Size extent; /* current extent of query file */ + int64 n_writers; /* number of active writers to query file */ + pg_atomic_uint64 current_wbucket; + pg_atomic_uint64 prev_bucket_usec; + uint64 bucket_entry[MAX_BUCKETS]; + int64 query_buf_size_bucket; + char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ } pgssSharedState; #define ResetSharedState(x) \ @@ -318,8 +318,8 @@ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->n_writers = 0; \ - x->current_wbucket = 0; \ - x->prev_bucket_usec = 0; \ + pg_atomic_init_u64(&x->current_wbucket, 0); \ + pg_atomic_init_u64(&x->prev_bucket_usec, 0); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0)