diff --git a/hash_query.c b/hash_query.c index 6c01a52..078fa47 100644 --- a/hash_query.c +++ b/hash_query.c @@ -25,6 +25,15 @@ static HTAB *pgss_hash; static HTAB *pgss_query_hash; static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); +/* + * Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id] + * whose query ids are found in the array 'query_ids', of length 'n_queries'. + */ +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries); static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) @@ -146,7 +155,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); if (!found) { - pgss->bucket_entry[pgss->current_wbucket]++; + pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++; /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); @@ -178,47 +187,107 @@ hash_query_entryies_reset() /* - * Deallocate finished entries. + * Deallocate finished entries in new_bucket_id. + * + * Move all pending queries in query_buffer[old_bucket_id] to + * query_buffer[new_bucket_id]. * * Caller must hold an exclusive lock on pgss->lock. */ void -hash_query_entry_dealloc(int bucket, unsigned char *buf) +hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]) { HASH_SEQ_STATUS hash_seq; pgssQueryEntry *entry; - unsigned char *old_buf; pgssSharedState *pgss = pgsm_get_ss(); + /* + * Store pending query ids from the previous bucket. + * If there are more pending queries than MAX_PENDING_QUERIES then + * we try to dynamically allocate memory for them. + */ +#define MAX_PENDING_QUERIES 128 + uint64 pending_query_ids[MAX_PENDING_QUERIES]; + uint64 *pending_query_ids_buf = NULL; + size_t n_pending_queries = 0; + bool out_of_memory = false; - old_buf = palloc0(pgss->query_buf_size_bucket); - memcpy(old_buf, buf, pgss->query_buf_size_bucket); - - memset(buf, 0, pgss->query_buf_size_bucket); + /* Clear all queries in the query buffer for the new bucket. */ + memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket); hash_seq_init(&hash_seq, pgss_query_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { - if (entry->key.bucket_id == bucket) + /* Remove previous finished query entries matching new bucket id. */ + if (entry->key.bucket_id == new_bucket_id) { if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR) { entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); } + } + /* Set up a list of pending query ids from the previous bucket. */ + else if (entry->key.bucket_id == old_bucket_id && + (entry->state == PGSS_PARSE || + entry->state == PGSS_PLAN || + entry->state == PGSS_EXEC)) + { + if (n_pending_queries < MAX_PENDING_QUERIES) + { + pending_query_ids[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } else { - int len; - char query_txt[1024]; - if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0) + /* + * No. of pending queries exceeds MAX_PENDING_QUERIES. + * Try to allocate memory from heap to keep track of pending query ids. + * If allocation fails we manually copy pending query to the next query buffer. + */ + if (!out_of_memory && !pending_query_ids_buf) { - len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt); - if (len != MAX_QUERY_BUFFER_BUCKET) - snprintf(query_txt, 32, "%s", ""); + /* Allocate enough room for query ids. */ + pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash)); + if (pending_query_ids_buf != NULL) + memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64)); + else + out_of_memory = true; + } + + if (!out_of_memory) + { + /* Store pending query id in the dynamic buffer. */ + pending_query_ids_buf[n_pending_queries] = entry->key.queryid; + ++n_pending_queries; + } + else + { + /* No memory, manually copy query from previous buffer. */ + char query_txt[1024]; + + if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0 + || read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET) + { + SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt)); + } + else + /* There was no space available to store the pending query text. */ + elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s", + entry->key.queryid, + (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ? + "insufficient shared space for query" : + "I/O error reading query from disk"); } - SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt)); } } } - pfree(old_buf); + + /* Copy all detected pending queries from previous bucket id to the new one. */ + if (n_pending_queries > 0) { + if (n_pending_queries < MAX_PENDING_QUERIES) + pending_query_ids_buf = pending_query_ids; + + copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries); + } } /* @@ -236,22 +305,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) pgssEntry *entry = NULL; List *pending_entries = NIL; ListCell *pending_entry; - - /* - * During transition to a new bucket id, a rare but possible race - * condition may happen while reading pgss->current_wbucket. If a - * different thread/process updates pgss->current_wbucket before this - * function is called, it may happen that old_bucket_id == new_bucket_id. - * If that is the case, we adjust the old bucket id here instead of using - * a lock in order to avoid the overhead. - */ - if (old_bucket_id != -1 && old_bucket_id == new_bucket_id) - { - if (old_bucket_id == 0) - old_bucket_id = PGSM_MAX_BUCKETS - 1; - else - old_bucket_id--; - } hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) @@ -290,7 +343,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id) /* Update key to use the new bucket id. */ bkp_entry->key.bucket_id = new_bucket_id; - /* Add the entry to a list of noded to be processed later. */ + /* Add the entry to a list of nodes to be processed later. */ pending_entries = lappend(pending_entries, bkp_entry); /* Finally remove the pending query from the expired bucket id. */ @@ -344,7 +397,7 @@ hash_entry_reset() { hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } - pgss->current_wbucket = 0; + pg_atomic_write_u64(&pgss->current_wbucket, 0); LWLockRelease(pgss->lock); } @@ -394,3 +447,52 @@ IsHashInitialize(void) pgss_hash != NULL); } +static void copy_queries(unsigned char *query_buffer[], + uint64 new_bucket_id, + uint64 old_bucket_id, + uint64 *query_ids, + size_t n_queries) +{ + bool found; + uint64 query_id = 0; + uint64 query_len = 0; + uint64 rlen = 0; + uint64 buf_len = 0; + unsigned char *src_buffer = query_buffer[old_bucket_id]; + size_t i; + + memcpy(&buf_len, src_buffer, sizeof (uint64)); + if (buf_len <= 0) + return; + + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ + while (rlen < buf_len) + { + found = false; + memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */ + for (i = 0; i < n_queries; ++i) + { + if (query_id == query_ids[i]) + { + found = true; + break; + } + } + + rlen += sizeof (uint64); + if (buf_len <= rlen) + break; + + memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */ + rlen += sizeof (uint64); + if (buf_len < rlen + query_len) + break; + + if (found) { + SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id], + (const char *)&src_buffer[rlen], query_len); + } + + rlen += query_len; + } +} \ No newline at end of file diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index b319b79..f3c8eff 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -1490,6 +1490,7 @@ pgss_store(uint64 queryid, int application_name_len = pg_get_application_name(application_name); bool reset = false; uint64 bucketid; + uint64 prev_bucket_id; uint64 userid; int con; uint64 dbid = MyDatabaseId; @@ -1511,15 +1512,14 @@ pgss_store(uint64 queryid, extract_query_comments(query, comments, sizeof(comments)); /* Safety check... */ - if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket]) + if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]) return; + prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket); bucketid = get_next_wbucket(pgss); - if (bucketid != pgss->current_wbucket) - { + + if (bucketid != prev_bucket_id) reset = true; - pgss->current_wbucket = bucketid; - } LWLockAcquire(pgss->lock, LW_EXCLUSIVE); @@ -1545,6 +1545,7 @@ pgss_store(uint64 queryid, out_of_memory = true; break; } + query_entry->state = kind; entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid); if (entry == NULL) { @@ -1990,40 +1991,74 @@ static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; - uint64 current_usec; - uint64 bucket_id; - struct tm *lt; + uint64 current_usec; + uint64 current_bucket_usec; + uint64 new_bucket_id; + uint64 prev_bucket_id; + struct tm *lt; + bool update_bucket = false; gettimeofday(&tv,NULL); current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec; + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); - if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + /* + * If current bucket expired we loop attempting to update prev_bucket_usec. + * + * pg_atomic_compare_exchange_u64 may fail in two possible ways: + * 1. Another thread/process updated the variable before us. + * 2. A spurious failure / hardware event. + * + * In both failure cases we read prev_bucket_usec from memory again, if it was + * a spurious failure then the value of prev_bucket_usec must be the same as + * before, which will cause the while loop to execute again. + * + * If another thread updated prev_bucket_usec, then its current value will + * definitely make the while condition to fail, we can stop the loop as another + * thread has already updated prev_bucket_usec. + */ + while ((current_usec - current_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000)) + { + if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, ¤t_bucket_usec, current_usec)) + { + update_bucket = true; + break; + } + + current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec); + } + + if (update_bucket) { - unsigned char *buf; char file_name[1024]; int sec = 0; - bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - buf = pgss_qbuf[bucket_id]; - hash_entry_dealloc(bucket_id, pgss->current_wbucket); - hash_query_entry_dealloc(bucket_id, buf); + new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; - snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); + /* Update bucket id and retrieve the previous one. */ + prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + hash_entry_dealloc(new_bucket_id, prev_bucket_id); + hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); + + snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id); unlink(file_name); LWLockRelease(pgss->lock); - pgss->prev_bucket_usec = current_usec; + lt = localtime(&tv.tv_sec); sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME); if (sec < 0) sec = 0; - snprintf(pgss->bucket_start_time[bucket_id], sizeof(pgss->bucket_start_time[bucket_id]), + snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]), "%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec); - return bucket_id; + + return new_bucket_id; } - return pgss->current_wbucket; + + return pg_atomic_read_u64(&pgss->current_wbucket); } #if PG_VERSION_NUM < 140000 @@ -3024,7 +3059,7 @@ pgss_store_query_info(uint64 bucketid, pgssStoreKind kind) { pgssSharedState *pgss = pgsm_get_ss(); - unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; + unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)]; pgssQueryEntry *entry; if (query_len > PGSM_QUERY_MAX_LEN) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index e9c4f62..94a0f83 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -301,16 +301,16 @@ typedef struct pgssEntry */ typedef struct pgssSharedState { - LWLock *lock; /* protects hashtable search/modification */ - double cur_median_usage; /* current median usage in hashtable */ - slock_t mutex; /* protects following fields only: */ - Size extent; /* current extent of query file */ - int64 n_writers; /* number of active writers to query file */ - uint64 current_wbucket; - uint64 prev_bucket_usec; - uint64 bucket_entry[MAX_BUCKETS]; - int64 query_buf_size_bucket; - char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ + LWLock *lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ + slock_t mutex; /* protects following fields only: */ + Size extent; /* current extent of query file */ + int64 n_writers; /* number of active writers to query file */ + pg_atomic_uint64 current_wbucket; + pg_atomic_uint64 prev_bucket_usec; + uint64 bucket_entry[MAX_BUCKETS]; + int64 query_buf_size_bucket; + char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */ } pgssSharedState; #define ResetSharedState(x) \ @@ -318,8 +318,8 @@ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ x->n_writers = 0; \ - x->current_wbucket = 0; \ - x->prev_bucket_usec = 0; \ + pg_atomic_init_u64(&x->current_wbucket, 0); \ + pg_atomic_init_u64(&x->prev_bucket_usec, 0); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0) @@ -379,7 +379,7 @@ HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); -void hash_query_entry_dealloc(int bucket, unsigned char *buf); +void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void);