Merge pull request #114 from darkfronza/PG-244_move_pending_queries_to_next_query_buffer

Pg 244 move pending queries to next query buffer
pull/116/head
Ibrar Ahmed 2021-10-04 23:59:36 +05:00 committed by GitHub
commit 8e39b02bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 206 additions and 69 deletions

View File

@ -25,6 +25,15 @@ static HTAB *pgss_hash;
static HTAB *pgss_query_hash;
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
/*
* Copy all queries from query_buffer[old_bucket_id] to query_buffer[new_bucket_id]
* whose query ids are found in the array 'query_ids', of length 'n_queries'.
*/
static void copy_queries(unsigned char *query_buffer[],
uint64 new_bucket_id,
uint64 old_bucket_id,
uint64 *query_ids,
size_t n_queries);
static HTAB*
hash_init(const char *hash_name, int key_size, int entry_size, int hash_size)
@ -146,7 +155,7 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found);
if (!found)
{
pgss->bucket_entry[pgss->current_wbucket]++;
pgss->bucket_entry[pg_atomic_read_u64(&pgss->current_wbucket)]++;
/* New entry, initialize it */
/* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters));
@ -178,47 +187,107 @@ hash_query_entryies_reset()
/*
* Deallocate finished entries.
* Deallocate finished entries in new_bucket_id.
*
* Move all pending queries in query_buffer[old_bucket_id] to
* query_buffer[new_bucket_id].
*
* Caller must hold an exclusive lock on pgss->lock.
*/
void
hash_query_entry_dealloc(int bucket, unsigned char *buf)
hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
{
HASH_SEQ_STATUS hash_seq;
pgssQueryEntry *entry;
unsigned char *old_buf;
pgssSharedState *pgss = pgsm_get_ss();
/*
* Store pending query ids from the previous bucket.
* If there are more pending queries than MAX_PENDING_QUERIES then
* we try to dynamically allocate memory for them.
*/
#define MAX_PENDING_QUERIES 128
uint64 pending_query_ids[MAX_PENDING_QUERIES];
uint64 *pending_query_ids_buf = NULL;
size_t n_pending_queries = 0;
bool out_of_memory = false;
old_buf = palloc0(pgss->query_buf_size_bucket);
memcpy(old_buf, buf, pgss->query_buf_size_bucket);
memset(buf, 0, pgss->query_buf_size_bucket);
/* Clear all queries in the query buffer for the new bucket. */
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
hash_seq_init(&hash_seq, pgss_query_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
if (entry->key.bucket_id == bucket)
/* Remove previous finished query entries matching new bucket id. */
if (entry->key.bucket_id == new_bucket_id)
{
if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR)
{
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
}
}
/* Set up a list of pending query ids from the previous bucket. */
else if (entry->key.bucket_id == old_bucket_id &&
(entry->state == PGSS_PARSE ||
entry->state == PGSS_PLAN ||
entry->state == PGSS_EXEC))
{
if (n_pending_queries < MAX_PENDING_QUERIES)
{
pending_query_ids[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
int len;
char query_txt[1024];
if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0)
/*
* No. of pending queries exceeds MAX_PENDING_QUERIES.
* Try to allocate memory from heap to keep track of pending query ids.
* If allocation fails we manually copy pending query to the next query buffer.
*/
if (!out_of_memory && !pending_query_ids_buf)
{
len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt);
if (len != MAX_QUERY_BUFFER_BUCKET)
snprintf(query_txt, 32, "%s", "<insufficient disk/shared space>");
/* Allocate enough room for query ids. */
pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash));
if (pending_query_ids_buf != NULL)
memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64));
else
out_of_memory = true;
}
if (!out_of_memory)
{
/* Store pending query id in the dynamic buffer. */
pending_query_ids_buf[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/* No memory, manually copy query from previous buffer. */
char query_txt[1024];
if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0
|| read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET)
{
SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt));
}
else
/* There was no space available to store the pending query text. */
elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s",
entry->key.queryid,
(PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ?
"insufficient shared space for query" :
"I/O error reading query from disk");
}
SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt));
}
}
}
pfree(old_buf);
/* Copy all detected pending queries from previous bucket id to the new one. */
if (n_pending_queries > 0) {
if (n_pending_queries < MAX_PENDING_QUERIES)
pending_query_ids_buf = pending_query_ids;
copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries);
}
}
/*
@ -236,22 +305,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
pgssEntry *entry = NULL;
List *pending_entries = NIL;
ListCell *pending_entry;
/*
* During transition to a new bucket id, a rare but possible race
* condition may happen while reading pgss->current_wbucket. If a
* different thread/process updates pgss->current_wbucket before this
* function is called, it may happen that old_bucket_id == new_bucket_id.
* If that is the case, we adjust the old bucket id here instead of using
* a lock in order to avoid the overhead.
*/
if (old_bucket_id != -1 && old_bucket_id == new_bucket_id)
{
if (old_bucket_id == 0)
old_bucket_id = PGSM_MAX_BUCKETS - 1;
else
old_bucket_id--;
}
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
@ -290,7 +343,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
/* Update key to use the new bucket id. */
bkp_entry->key.bucket_id = new_bucket_id;
/* Add the entry to a list of noded to be processed later. */
/* Add the entry to a list of nodes to be processed later. */
pending_entries = lappend(pending_entries, bkp_entry);
/* Finally remove the pending query from the expired bucket id. */
@ -344,7 +397,7 @@ hash_entry_reset()
{
hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
}
pgss->current_wbucket = 0;
pg_atomic_write_u64(&pgss->current_wbucket, 0);
LWLockRelease(pgss->lock);
}
@ -394,3 +447,52 @@ IsHashInitialize(void)
pgss_hash != NULL);
}
static void copy_queries(unsigned char *query_buffer[],
uint64 new_bucket_id,
uint64 old_bucket_id,
uint64 *query_ids,
size_t n_queries)
{
bool found;
uint64 query_id = 0;
uint64 query_len = 0;
uint64 rlen = 0;
uint64 buf_len = 0;
unsigned char *src_buffer = query_buffer[old_bucket_id];
size_t i;
memcpy(&buf_len, src_buffer, sizeof (uint64));
if (buf_len <= 0)
return;
rlen = sizeof (uint64); /* Move forwad to skip length bytes */
while (rlen < buf_len)
{
found = false;
memcpy(&query_id, &src_buffer[rlen], sizeof (uint64)); /* query id */
for (i = 0; i < n_queries; ++i)
{
if (query_id == query_ids[i])
{
found = true;
break;
}
}
rlen += sizeof (uint64);
if (buf_len <= rlen)
break;
memcpy(&query_len, &src_buffer[rlen], sizeof (uint64)); /* query len */
rlen += sizeof (uint64);
if (buf_len < rlen + query_len)
break;
if (found) {
SaveQueryText(new_bucket_id, query_id, query_buffer[new_bucket_id],
(const char *)&src_buffer[rlen], query_len);
}
rlen += query_len;
}
}

View File

@ -1490,6 +1490,7 @@ pgss_store(uint64 queryid,
int application_name_len = pg_get_application_name(application_name);
bool reset = false;
uint64 bucketid;
uint64 prev_bucket_id;
uint64 userid;
int con;
uint64 dbid = MyDatabaseId;
@ -1511,15 +1512,14 @@ pgss_store(uint64 queryid,
extract_query_comments(query, comments, sizeof(comments));
/* Safety check... */
if (!IsSystemInitialized() || !pgss_qbuf[pgss->current_wbucket])
if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)])
return;
prev_bucket_id = pg_atomic_read_u64(&pgss->current_wbucket);
bucketid = get_next_wbucket(pgss);
if (bucketid != pgss->current_wbucket)
{
if (bucketid != prev_bucket_id)
reset = true;
pgss->current_wbucket = bucketid;
}
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
@ -1545,6 +1545,7 @@ pgss_store(uint64 queryid,
out_of_memory = true;
break;
}
query_entry->state = kind;
entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid);
if (entry == NULL)
{
@ -1990,40 +1991,74 @@ static uint64
get_next_wbucket(pgssSharedState *pgss)
{
struct timeval tv;
uint64 current_usec;
uint64 bucket_id;
struct tm *lt;
uint64 current_usec;
uint64 current_bucket_usec;
uint64 new_bucket_id;
uint64 prev_bucket_id;
struct tm *lt;
bool update_bucket = false;
gettimeofday(&tv,NULL);
current_usec = (TimestampTz) tv.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
current_usec = (current_usec * USECS_PER_SEC) + tv.tv_usec;
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
if ((current_usec - pgss->prev_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000))
/*
* If current bucket expired we loop attempting to update prev_bucket_usec.
*
* pg_atomic_compare_exchange_u64 may fail in two possible ways:
* 1. Another thread/process updated the variable before us.
* 2. A spurious failure / hardware event.
*
* In both failure cases we read prev_bucket_usec from memory again, if it was
* a spurious failure then the value of prev_bucket_usec must be the same as
* before, which will cause the while loop to execute again.
*
* If another thread updated prev_bucket_usec, then its current value will
* definitely make the while condition to fail, we can stop the loop as another
* thread has already updated prev_bucket_usec.
*/
while ((current_usec - current_bucket_usec) > (PGSM_BUCKET_TIME * 1000 * 1000))
{
if (pg_atomic_compare_exchange_u64(&pgss->prev_bucket_usec, &current_bucket_usec, current_usec))
{
update_bucket = true;
break;
}
current_bucket_usec = pg_atomic_read_u64(&pgss->prev_bucket_usec);
}
if (update_bucket)
{
unsigned char *buf;
char file_name[1024];
int sec = 0;
bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
buf = pgss_qbuf[bucket_id];
hash_entry_dealloc(bucket_id, pgss->current_wbucket);
hash_query_entry_dealloc(bucket_id, buf);
new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS;
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
/* Update bucket id and retrieve the previous one. */
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_entry_dealloc(new_bucket_id, prev_bucket_id);
hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id);
unlink(file_name);
LWLockRelease(pgss->lock);
pgss->prev_bucket_usec = current_usec;
lt = localtime(&tv.tv_sec);
sec = lt->tm_sec - (lt->tm_sec % PGSM_BUCKET_TIME);
if (sec < 0)
sec = 0;
snprintf(pgss->bucket_start_time[bucket_id], sizeof(pgss->bucket_start_time[bucket_id]),
snprintf(pgss->bucket_start_time[new_bucket_id], sizeof(pgss->bucket_start_time[new_bucket_id]),
"%04d-%02d-%02d %02d:%02d:%02d", lt->tm_year + 1900, lt->tm_mon + 1, lt->tm_mday, lt->tm_hour, lt->tm_min, sec);
return bucket_id;
return new_bucket_id;
}
return pgss->current_wbucket;
return pg_atomic_read_u64(&pgss->current_wbucket);
}
#if PG_VERSION_NUM < 140000
@ -3024,7 +3059,7 @@ pgss_store_query_info(uint64 bucketid,
pgssStoreKind kind)
{
pgssSharedState *pgss = pgsm_get_ss();
unsigned char *buf = pgss_qbuf[pgss->current_wbucket];
unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)];
pgssQueryEntry *entry;
if (query_len > PGSM_QUERY_MAX_LEN)

View File

@ -301,16 +301,16 @@ typedef struct pgssEntry
*/
typedef struct pgssSharedState
{
LWLock *lock; /* protects hashtable search/modification */
double cur_median_usage; /* current median usage in hashtable */
slock_t mutex; /* protects following fields only: */
Size extent; /* current extent of query file */
int64 n_writers; /* number of active writers to query file */
uint64 current_wbucket;
uint64 prev_bucket_usec;
uint64 bucket_entry[MAX_BUCKETS];
int64 query_buf_size_bucket;
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
LWLock *lock; /* protects hashtable search/modification */
double cur_median_usage; /* current median usage in hashtable */
slock_t mutex; /* protects following fields only: */
Size extent; /* current extent of query file */
int64 n_writers; /* number of active writers to query file */
pg_atomic_uint64 current_wbucket;
pg_atomic_uint64 prev_bucket_usec;
uint64 bucket_entry[MAX_BUCKETS];
int64 query_buf_size_bucket;
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
} pgssSharedState;
#define ResetSharedState(x) \
@ -318,8 +318,8 @@ do { \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->cur_median_usage = ASSUMED_MEDIAN_INIT; \
x->n_writers = 0; \
x->current_wbucket = 0; \
x->prev_bucket_usec = 0; \
pg_atomic_init_u64(&x->current_wbucket, 0); \
pg_atomic_init_u64(&x->prev_bucket_usec, 0); \
memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \
} while(0)
@ -379,7 +379,7 @@ HTAB *pgsm_get_plan_hash(void);
void hash_entry_reset(void);
void hash_query_entryies_reset(void);
void hash_query_entries();
void hash_query_entry_dealloc(int bucket, unsigned char *buf);
void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id);
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
Size hash_memsize(void);