From 89743e924315ecd08869ca49b5808c032ceecd8f Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Thu, 30 Sep 2021 17:13:27 -0300 Subject: [PATCH 1/2] PG-244: Fix race condition in get_next_wbucket(). The if condition bellow in geta_next_wbucket() was subject to a race condition: if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) Two or more threads/processes could easily evaluate this condition to true, thus executing more than once the block that would calculate a new bucket id, clear/move old entries in the pgss_query_hash and pgss_hash hash tables. To avoid this problem, we define prev_bucket_usec and current_wbucket variables as atomic and execute a loop to check if another thread has updated prev_bucket_usec before the current one. --- hash_query.c | 20 ++----------- pg_stat_monitor.c | 76 ++++++++++++++++++++++++++++++++++------------- pg_stat_monitor.h | 24 +++++++-------- 3 files changed, 70 insertions(+), 50 deletions(-) diff --git a/hash_query.c b/hash_query.c index 6c01a52..df39b9b 100644 --- a/hash_query.c +++ b/hash_query.c @@ -146,7 +146,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); if (!found) { - pgss->bucket_entry[pgss->current_wbucket]++; + pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++; /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); @@ -236,22 +236,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) pgssEntry *entry = NULL; List *pending_entries = NIL; ListCell *pending_entry; - - /* - * During transition to a new bucket id, a rare but possible race - * condition may happen while reading pgss->current_wbucket. If a - * different thread/process updates pgss->current_wbucket before this - * function is called, it may happen that old_bucket_id == new_bucket_id. - * If that is the case, we adjust the old bucket id here instead of using - * a lock in order to avoid the overhead. - */ - if (old_bucket_id != -1 && old_bucket_id == new_bucket_id) - { - if (old_bucket_id == 0) - old_bucket_id = PGSM_MAX_BUCKETS - 1; - else - old_bucket_id--; - } hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) @@ -344,7 +328,7 @@ hash_entry_reset() { hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } - pgss->current_wbucket = 0; + pg_atomic_write_u64(&pgss->current_wbucket, 0); LWLockRelease(pgss->lock); } diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index b319b79..52b9760 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1490,6 +1490,7 @@ pgss_store(uint64 queryid, int application_name_len = pg_get_application_name(application_name); bool reset = false; uint64 bucketid; + uint64 prev_bucket_id; uint64 userid; int con; uint64 dbid = MyDatabaseId; @@ -1511,15 +1512,14 @@ pgss_store(uint64 queryid, extract_query_comments(query, comments, sizeof(comments)); /* Safety check... */ - if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket]) + if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) return; + prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); - if (bucketid != pgss->current_wbucket) - { + + if (bucketid != prev_bucket_id) reset = true; - pgss->current_wbucket = bucketid; - } LWLockAcquire(pgss->lock, LW_EXCLUSIVE); @@ -1990,40 +1990,76 @@ static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; - uint64 current_usec; - uint64 bucket_id; - struct tm *lt; + uint64 current_usec; + uint64 current_bucket_usec; + uint64 new_bucket_id; + uint64 prev_bucket_id; + struct tm *lt; + bool update_bucket = false; gettimeofday(&tv,NULL); current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); - if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + /* + * If current bucket expired we loop attempting to update prev_bucket_usec. + * + * pg_atomic_compare_exchange_u64 may fail in two possible ways: + * 1. Another thread/process updated the variable before us. + * 2. A spurious failure / hardware event. + * + * In both failure cases we read prev_bucket_usec from memory again, if it was + * a spurious failure then the value of prev_bucket_usec must be the same as + * before, which will cause the while loop to execute again. + * + * If another thread updated prev_bucket_usec, then its current value will + * definitely make the while condition to fail, we can stop the loop as another + * thread has already updated prev_bucket_usec. + */ + while ((current_usec - current_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + { + if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, ¤t_bucket_usec, current_usec)) + { + update_bucket = true; + break; + } + + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); + } + + if (update_bucket) { unsigned char *buf; char file_name[1024]; int sec = 0; - bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - buf = pgss_qbuf[bucket_id]; - hash_entry_dealloc(bucket_id, pgss->current_wbucket); - hash_query_entry_dealloc(bucket_id, buf); + new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); + /* Update bucket id and retrieve the previous one. */ + prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + buf = pgss_qbuf[new_bucket_id]; + hash_entry_dealloc(new_bucket_id, prev_bucket_id); + hash_query_entry_dealloc(new_bucket_id, buf); + + snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); LWLockRelease(pgss->lock); - pgss->prev_bucket_usec = current_usec; + lt = localtime(&tv.tv_sec); sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME); if (sec < 0) sec = 0; - snprintf(pgss->bucket_start_time[bucket_id], sizeof(pgss->bucket_start_time[bucket_id]), + snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]), "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec); - return bucket_id; + + return new_bucket_id; } - return pgss->current_wbucket; + + return pg_atomic_read_u64(&pgss->current_wbucket); } #if PG_VERSION_NUM < 140000 @@ -3024,7 +3060,7 @@ pgss_store_query_info(uint64 bucketid, pgssStoreKind kind) { pgssSharedState *pgss = pgsm_get_ss(); - unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; + unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]; pgssQueryEntry *entry; if (query_len > PGSM_QUERY_MAX_LEN) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index e9c4f62..85d5261 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -301,16 +301,16 @@ typedef struct pgssEntry */ typedef struct pgssSharedState { - LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ - slock_t mutex; /* protects following fields only: */ - Size extent; /* current extent of query file */ - int64 n_writers; /* number of active writers to query file */ - uint64 current_wbucket; - uint64 prev_bucket_usec; - uint64 bucket_entry[MAX_BUCKETS]; - int64 query_buf_size_bucket; - char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ + LWLock *lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ + slock_t mutex; /* protects following fields only: */ + Size extent; /* current extent of query file */ + int64 n_writers; /* number of active writers to query file */ + pg_atomic_uint64 current_wbucket; + pg_atomic_uint64 prev_bucket_usec; + uint64 bucket_entry[MAX_BUCKETS]; + int64 query_buf_size_bucket; + char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ } pgssSharedState; #define ResetSharedState(x) \ @@ -318,8 +318,8 @@ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->n_writers = 0; \ - x->current_wbucket = 0; \ - x->prev_bucket_usec = 0; \ + pg_atomic_init_u64(&x->current_wbucket, 0); \ + pg_atomic_init_u64(&x->prev_bucket_usec, 0); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0) From a959acb3d5a7e490015382b86e4be70e3a32e1c2 Mon Sep 17 00:00:00 2001 From: Diego Fronza Date: Fri, 1 Oct 2021 15:27:45 -0300 Subject: [PATCH 2/2] PG-244: Move pending queries' text to new bucket after bucket expiration. Added code to move all pending queries text from an expired bucket's query buffer to the next, active query buffer. The previous implementation was not very efficient, it worked like this, as soon as a query is processed and a bucket expires: 1. Allocate memory to save the contents of the next query buffer. 2. Clear the next query buffer. 3. Iterate over pgss_query_hash, then, for each entry: - If the entry's bucket id is equal to the next bucket then: -- If the query for this entry has finished or ended in error, then remove it from the hash table. -- Else, if the query is not yet finished, copy the query from the backup query buffer to the new query buffer. Now, this copy was really expensive, because it was implemented using read_query() / SaveQueryText(), and read_query() scans the whole query buffer looking for some query ID, since we do this extra lookup loop for each pending query we end up with a O(n^2) algorithm. 4. Release the backup query buffer. Since now we always move pending queries from an expired bucket to the next one, there is no need to scan the next query buffer for pending queries (the pending queries are always in the current bucket, and when it expires we move them to the next one). Taking that into consideration, the new implementation works as follows, whenever a bucket expires: 1. Clear the next query buffer (all entries). 2. Define an array to store pending query ids from the expired bucket, we use this array later on in the algorithm. 3. Iterate over pgss_query_hash, then, for each entry: - If the entry's bucket id is equal to the next bucket then: -- If the query for this entry has finished or ended in error, then remove it from the hash table. This is equal to the previous implementation. - Else, if the entry's bucket id is equal to the just expired bucket id (old bucket id) and the query state is pending (not yet finished), then add this query ID to the array of pending query IDs. Note: We define the array to hold up to 128 pending entries, if there are more entries than this we try to allocate memory in the heap to store them, then, if the allocation fails we manually copy every pending query past the 128th to the next query buffer, using the previous algorithm (read_query() / SaveQueryText), this should be a very rare situation. 4. Finally, if there are pending queries, copy them from the previous query buffer to the next one using copy_queries. Now, copy_queries() is better than looping through each query entry and calling read_query() / SaveQueryText() to copy each of them to the new buffer, as explained, read_query() scans the whole query buffer for every call. copy_queries(), instead, scans the query buffer only once, and for every element it checks if the current query id is in the list of queries to be copied, this is an array of uint64 that is small enough to fit in L1 cache. Another important fix in this commit is the addition of the line 1548 in pg_stat_monitor.c / pgss_store(): query_entry->state = kind; Before the addition of this line, all entries in the pgss_query_hash hash table were not having their status updated when the query entered execution / finished or ended in error, effectively leaving all entries as pending, thus whenever a bucket expired all entries were being copied from the expired bucket to the next one. --- hash_query.c | 152 ++++++++++++++++++++++++++++++++++++++++------ pg_stat_monitor.c | 5 +- pg_stat_monitor.h | 2 +- 3 files changed, 138 insertions(+), 21 deletions(-) diff --git a/hash_query.c b/hash_query.c index df39b9b..078fa47 100644 --- a/hash_query.c +++ b/hash_query.c @@ -25,6 +25,15 @@ static HTAB *pgss_hash; static HTAB *pgss_query_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); +/* + * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] + * whose query ids are found in the array 'query_ids', of length 'n_queries'. + */ +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -178,47 +187,107 @@ hash_query_entryies_reset() /* - * Deallocate finished entries. + * Deallocate finished entries in new_bucket_id. + * + * Move all pending queries in query_buffer[old_bucket_id] to + * query_buffer[new_bucket_id]. * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entry_dealloc(int bucket, unsigned char *buf) +hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) { HASH_SEQ_STATUS hash_seq; pgssQueryEntry *entry; - unsigned char *old_buf; pgssSharedState *pgss = pgsm_get_ss(); + /* + * Store pending query ids from the previous bucket. + * If there are more pending queries than MAX_PENDING_QUERIES then + * we try to dynamically allocate memory for them. + */ +#define MAX_PENDING_QUERIES 128 + uint64 pending_query_ids[MAX_PENDING_QUERIES]; + uint64 *pending_query_ids_buf = NULL; + size_t n_pending_queries = 0; + bool out_of_memory = false; - old_buf = palloc0(pgss->query_buf_size_bucket); - memcpy(old_buf, buf, pgss->query_buf_size_bucket); - - memset(buf, 0, pgss->query_buf_size_bucket); + /* Clear all queries in the query buffer for the new bucket. */ + memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); hash_seq_init(&hash_seq, pgss_query_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket) + /* Remove previous finished query entries matching new bucket id. */ + if (entry->key.bucket_id == new_bucket_id) { if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) { entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); } + } + /* Set up a list of pending query ids from the previous bucket. */ + else if (entry->key.bucket_id == old_bucket_id && + (entry->state == PGSS_PARSE || + entry->state == PGSS_PLAN || + entry->state == PGSS_EXEC)) + { + if (n_pending_queries < MAX_PENDING_QUERIES) + { + pending_query_ids[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } else { - int len; - char query_txt[1024]; - if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0) + /* + * No. of pending queries exceeds MAX_PENDING_QUERIES. + * Try to allocate memory from heap to keep track of pending query ids. + * If allocation fails we manually copy pending query to the next query buffer. + */ + if (!out_of_memory && !pending_query_ids_buf) { - len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(query_txt, 32, "%s", ""); + /* Allocate enough room for query ids. */ + pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash)); + if (pending_query_ids_buf != NULL) + memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); + else + out_of_memory = true; + } + + if (!out_of_memory) + { + /* Store pending query id in the dynamic buffer. */ + pending_query_ids_buf[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } + else + { + /* No memory, manually copy query from previous buffer. */ + char query_txt[1024]; + + if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 + || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) + { + SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); + } + else + /* There was no space available to store the pending query text. */ + elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s", + entry->key.queryid, + (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? + "insufficient shared space for query" : + "I/O error reading query from disk"); } - SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt)); } } } - pfree(old_buf); + + /* Copy all detected pending queries from previous bucket id to the new one. */ + if (n_pending_queries > 0) { + if (n_pending_queries < MAX_PENDING_QUERIES) + pending_query_ids_buf = pending_query_ids; + + copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); + } } /* @@ -274,7 +343,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) /* Update key to use the new bucket id. */ bkp_entry->key.bucket_id = new_bucket_id; - /* Add the entry to a list of noded to be processed later. */ + /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); /* Finally remove the pending query from the expired bucket id. */ @@ -378,3 +447,52 @@ IsHashInitialize(void) pgss_hash != NULL); } +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries) +{ + bool found; + uint64 query_id = 0; + uint64 query_len = 0; + uint64 rlen = 0; + uint64 buf_len = 0; + unsigned char *src_buffer = query_buffer[old_bucket_id]; + size_t i; + + memcpy(&buf_len, src_buffer, sizeof (uint64)); + if (buf_len <= 0) + return; + + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ + while (rlen < buf_len) + { + found = false; + memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ + for (i = 0; i < n_queries; ++i) + { + if (query_id == query_ids[i]) + { + found = true; + break; + } + } + + rlen += sizeof (uint64); + if (buf_len <= rlen) + break; + + memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ + rlen += sizeof (uint64); + if (buf_len < rlen + query_len) + break; + + if (found) { + SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], + (const char *)&src_buffer[rlen], query_len); + } + + rlen += query_len; + } +} \ No newline at end of file diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 52b9760..f3c8eff 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1545,6 +1545,7 @@ pgss_store(uint64 queryid, out_of_memory = true; break; } + query_entry->state = kind; entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid); if (entry == NULL) { @@ -2030,7 +2031,6 @@ get_next_wbucket(pgssSharedState *pgss) if (update_bucket) { - unsigned char *buf; char file_name[1024]; int sec = 0; @@ -2040,9 +2040,8 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - buf = pgss_qbuf[new_bucket_id]; hash_entry_dealloc(new_bucket_id, prev_bucket_id); - hash_query_entry_dealloc(new_bucket_id, buf); + hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 85d5261..94a0f83 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -379,7 +379,7 @@ HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); -void hash_query_entry_dealloc(int bucket, unsigned char *buf); +void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void);