PG-254: Removal of pgss_query_hash hash table.

There was no necessity for using a separate hash table to keep track
of queries as all the necessary information is already available in
the pgss_hash table.

The code that moves pending queries' text in hash_query_entry_dealloc
was merged into hash_entry_dealloc.

Couple functions were not necessary anymore, thus were removed:
- hash_create_query_entry
- pgss_store_query_info
- pgss_get_entry (this logic was added directly into pgss_store).

We simplified the logic in pgss_store, it basically works as follows:
1. Create a key (bucketid, queryid, etc...) for the query event.
2. Lookup the key in pgss_hash, if no entry exists for the key, create a
   new one, save the query text into the current query buffer.
3. If jstate == NULL, then update stats counters for the entry.
pull/115/head
Diego Fronza 2021-10-06 14:57:15 -03:00
parent fc9e630497
commit fcb70ffed1
3 changed files with 179 additions and 317 deletions

View File

@ -22,7 +22,6 @@
static pgssSharedState *pgss;
static HTAB *pgss_hash;
static HTAB *pgss_query_hash;
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
/*
@ -55,7 +54,6 @@ pgss_startup(void)
pgss = NULL;
pgss_hash = NULL;
pgss_query_hash = NULL;
/*
* Create or attach to the shared memory state, including hash table
@ -85,7 +83,6 @@ pgss_startup(void)
}
pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES);
pgss_query_hash = hash_init("pg_stat_monitor: query hashtable", sizeof(pgssQueryHashKey), sizeof(pgssQueryEntry),MAX_BUCKET_ENTRIES);
LWLockRelease(AddinShmemInitLock);
@ -169,146 +166,64 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
return entry;
}
/*
* Reset all the entries.
* Prepare resources for using the new bucket:
* - Deallocate finished hash table entries in new_bucket_id (entries whose
* state is PGSS_FINISHED or PGSS_FINISHED).
* - Clear query buffer for new_bucket_id.
* - If old_bucket_id != -1, move all pending hash table entries in
* old_bucket_id to the new bucket id, also move pending queries from the
* previous query buffer (query_buffer[old_bucket_id]) to the new one
* (query_buffer[new_bucket_id]).
*
* Caller must hold an exclusive lock on pgss->lock.
*/
void
hash_query_entryies_reset()
{
HASH_SEQ_STATUS hash_seq;
pgssQueryEntry *entry;
hash_seq_init(&hash_seq, pgss_query_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
}
/*
* Deallocate finished entries in new_bucket_id.
*
* Move all pending queries in query_buffer[old_bucket_id] to
* query_buffer[new_bucket_id].
*
* Caller must hold an exclusive lock on pgss->lock.
*/
void
hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
{
HASH_SEQ_STATUS hash_seq;
pgssQueryEntry *entry;
pgssSharedState *pgss = pgsm_get_ss();
/*
* Store pending query ids from the previous bucket.
* If there are more pending queries than MAX_PENDING_QUERIES then
* we try to dynamically allocate memory for them.
*/
#define MAX_PENDING_QUERIES 128
uint64 pending_query_ids[MAX_PENDING_QUERIES];
uint64 *pending_query_ids_buf = NULL;
size_t n_pending_queries = 0;
bool out_of_memory = false;
/* Clear all queries in the query buffer for the new bucket. */
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
hash_seq_init(&hash_seq, pgss_query_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
/* Remove previous finished query entries matching new bucket id. */
if (entry->key.bucket_id == new_bucket_id)
{
if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR)
{
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
}
}
/* Set up a list of pending query ids from the previous bucket. */
else if (entry->key.bucket_id == old_bucket_id &&
(entry->state == PGSS_PARSE ||
entry->state == PGSS_PLAN ||
entry->state == PGSS_EXEC))
{
if (n_pending_queries < MAX_PENDING_QUERIES)
{
pending_query_ids[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/*
* No. of pending queries exceeds MAX_PENDING_QUERIES.
* Try to allocate memory from heap to keep track of pending query ids.
* If allocation fails we manually copy pending query to the next query buffer.
*/
if (!out_of_memory && !pending_query_ids_buf)
{
/* Allocate enough room for query ids. */
pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_query_hash));
if (pending_query_ids_buf != NULL)
memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64));
else
out_of_memory = true;
}
if (!out_of_memory)
{
/* Store pending query id in the dynamic buffer. */
pending_query_ids_buf[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/* No memory, manually copy query from previous buffer. */
char query_txt[1024];
if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0
|| read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET)
{
SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt));
}
else
/* There was no space available to store the pending query text. */
elog(WARNING, "hash_query_entry_dealloc: Failed to move pending query %lX, %s",
entry->key.queryid,
(PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ?
"insufficient shared space for query" :
"I/O error reading query from disk");
}
}
}
}
/* Copy all detected pending queries from previous bucket id to the new one. */
if (n_pending_queries > 0) {
if (n_pending_queries < MAX_PENDING_QUERIES)
pending_query_ids_buf = pending_query_ids;
copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries);
}
}
/*
* Deallocate least-used entries.
*
* If old_bucket_id != -1, move all pending queries in old_bucket_id
* to the new bucket id.
*
* Caller must hold an exclusive lock on pgss->lock.
*/
bool
hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
{
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry = NULL;
List *pending_entries = NIL;
ListCell *pending_entry;
pgssSharedState *pgss = pgsm_get_ss();
#define MAX_PENDING_QUERIES 128
/*
* Variables used to store pending queries from the previous bucket.
*
* We use a linked list to keep a full copy of entries from the hash table
* that must be moved to the new bucket.
*
* We use an array to keep a list of pending query IDs only, the array will
* be used in copy_queries() as a filter of which queries to copy.
* The reason we use a separate array to keep pending queries IDs is that it
* is faster to iterate than the linked list, as following pointers in a list
* almost always make bad use of cpu cache, while a small array of uint64 is
* a good candidate to be stored in L1 cache.
*
* If there are more pending queries than MAX_PENDING_QUERIES then
* we try to dynamically allocate memory for them.
*/
List *pending_entries = NIL;
ListCell *pending_entry;
uint64 pending_query_ids[MAX_PENDING_QUERIES];
uint64 *pending_query_ids_buf = NULL;
size_t n_pending_queries = 0;
bool out_of_memory = false;
if (new_bucket_id != -1)
{
/* Clear all queries in the query buffer for the new bucket. */
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
}
/* Iterate over the hash table. */
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
/*
* Remove all entries if new_bucket_id == -1.
* Otherwise remove entry in new_bucket_id if it's finished already.
*/
if (new_bucket_id < 0 ||
(entry->key.bucket_id == new_bucket_id &&
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
@ -333,6 +248,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
if (!bkp_entry)
{
/* No memory, remove pending query entry from the previous bucket. */
elog(ERROR, "hash_entry_dealloc: out of memory");
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
continue;
}
@ -346,12 +262,63 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
/* Add the entry to a list of nodes to be processed later. */
pending_entries = lappend(pending_entries, bkp_entry);
/* Add pending query ID to the array. */
if (n_pending_queries < MAX_PENDING_QUERIES)
{
pending_query_ids[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/*
* No. of pending queries exceeds MAX_PENDING_QUERIES.
* Try to dynamically allocate memory to keep track of pending query ids.
* If allocation fails we manually copy pending query to the next query buffer.
*/
if (!out_of_memory && !pending_query_ids_buf)
{
/* Allocate enough room for query ids. */
pending_query_ids_buf = malloc(sizeof(uint64) * hash_get_num_entries(pgss_hash));
if (pending_query_ids_buf != NULL)
memcpy(pending_query_ids_buf, pending_query_ids, n_pending_queries * sizeof(uint64));
else
out_of_memory = true;
}
if (!out_of_memory)
{
/* Store pending query id in the dynamic buffer. */
pending_query_ids_buf[n_pending_queries] = entry->key.queryid;
++n_pending_queries;
}
else
{
/* No memory, manually copy query from previous buffer. */
char query_txt[1024];
if (read_query(query_buffer[old_bucket_id], old_bucket_id, entry->key.queryid, query_txt) != 0
|| read_query_buffer(old_bucket_id, entry->key.queryid, query_txt) == MAX_QUERY_BUFFER_BUCKET)
{
SaveQueryText(new_bucket_id, entry->key.queryid, query_buffer[new_bucket_id], query_txt, strlen(query_txt));
}
else
/* There was no space available to store the pending query text. */
elog(ERROR, "hash_entry_dealloc: Failed to move pending query %lX, %s",
entry->key.queryid,
(PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) ?
"insufficient shared space for query" :
"I/O error reading query from disk");
}
}
/* Finally remove the pending query from the expired bucket id. */
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
}
}
}
Assert(list_length(pending_entries) == n_pending_queries);
/*
* Iterate over the list of pending queries in order
* to add them back to the hash table with the updated bucket id.
@ -375,9 +342,15 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id)
free(old_entry);
}
list_free(pending_entries);
/* Copy all detected pending queries from previous bucket id to the new one. */
if (n_pending_queries > 0) {
if (n_pending_queries < MAX_PENDING_QUERIES)
pending_query_ids_buf = pending_query_ids;
return true;
copy_queries(query_buffer, new_bucket_id, old_bucket_id, pending_query_ids_buf, n_pending_queries);
}
list_free(pending_entries);
}
/*
@ -401,45 +374,6 @@ hash_entry_reset()
LWLockRelease(pgss->lock);
}
/* Caller must acquire a lock */
pgssQueryEntry*
hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid)
{
pgssQueryHashKey key;
pgssQueryEntry *entry;
bool found;
key.queryid = queryid;
key.bucket_id = bucket_id;
key.dbid = dbid;
key.userid = userid;
key.ip = ip;
key.appid = appid;
entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_ENTER_NULL, &found);
return entry;
}
/* Caller must acquire a lock */
pgssQueryEntry*
hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid)
{
pgssQueryHashKey key;
pgssQueryEntry *entry;
bool found;
key.queryid = queryid;
key.bucket_id = bucket_id;
key.dbid = dbid;
key.userid = userid;
key.ip = ip;
key.appid = appid;
/* Lookup the hash table entry with shared lock. */
entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_FIND, &found);
return entry;
}
bool
IsHashInitialize(void)
{

View File

@ -147,15 +147,6 @@ static uint64 pgss_hash_string(const char *str, int len);
char *unpack_sql_state(int sql_state);
static void pgss_store_error(uint64 queryid, const char * query, ErrorData *edata);
static pgssQueryEntry *pgss_store_query_info(uint64 bucketid,
uint64 queryid,
uint64 dbid,
uint64 userid,
uint64 ip,
uint64 appid,
const char *query,
uint64 query_len,
pgssStoreKind kind);
static void pgss_store_utility(const char *query,
double total_time,
@ -1314,40 +1305,6 @@ pgss_update_entry(pgssEntry *entry,
}
}
static pgssEntry*
pgss_get_entry(uint64 bucket_id,
uint64 userid,
uint64 dbid,
uint64 queryid,
uint64 ip,
uint64 planid,
uint64 appid)
{
pgssEntry *entry;
pgssHashKey key;
HTAB *pgss_hash = pgsm_get_hash();
pgssSharedState *pgss = pgsm_get_ss();
key.bucket_id = bucket_id;
key.userid = userid;
key.dbid = MyDatabaseId;
key.queryid = queryid;
key.ip = pg_get_client_addr();
key.planid = planid;
key.appid = appid;
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
if(!entry)
{
/* OK to create a new hashtable entry */
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
if (entry == NULL)
return NULL;
}
Assert(entry);
return entry;
}
static void
pgss_store_query(uint64 queryid,
const char * query,
@ -1486,6 +1443,8 @@ pgss_store(uint64 queryid,
JumbleState *jstate,
pgssStoreKind kind)
{
HTAB *pgss_hash;
pgssHashKey key;
pgssEntry *entry;
pgssSharedState *pgss = pgsm_get_ss();
char application_name[APPLICATIONNAME_LEN];
@ -1495,12 +1454,10 @@ pgss_store(uint64 queryid,
uint64 prev_bucket_id;
uint64 userid;
int con;
uint64 dbid;
uint64 ip;
uint64 planid;
uint64 appid;
char comments[512] = "";
bool out_of_memory = false;
size_t query_len;
/* Monitoring is disabled */
if (!PGSM_ENABLED)
@ -1516,9 +1473,7 @@ pgss_store(uint64 queryid,
else
userid = GetUserId();
dbid = MyDatabaseId;
application_name_len = pg_get_application_name(application_name);
ip = pg_get_client_addr();
planid = plan_info ? plan_info->planid: 0;
appid = djb2_hash((unsigned char *)application_name, application_name_len);
@ -1530,63 +1485,74 @@ pgss_store(uint64 queryid,
if (bucketid != prev_bucket_id)
reset = true;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
key.bucket_id = bucketid;
key.userid = userid;
key.dbid = MyDatabaseId;
key.queryid = queryid;
key.ip = pg_get_client_addr();
key.planid = planid;
key.appid = appid;
switch (kind)
pgss_hash = pgsm_get_hash();
LWLockAcquire(pgss->lock, LW_SHARED);
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
if (!entry)
{
case PGSS_PARSE:
case PGSS_PLAN:
{
pgssQueryEntry *query_entry;
query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind);
if (query_entry == NULL)
out_of_memory = true;
break;
}
case PGSS_ERROR:
case PGSS_EXEC:
case PGSS_FINISHED:
{
pgssQueryEntry *query_entry;
query_entry = pgss_store_query_info(bucketid, queryid, dbid, userid, ip, appid, query, strlen(query), kind);
if (query_entry == NULL)
{
out_of_memory = true;
break;
}
query_entry->state = kind;
entry = pgss_get_entry(bucketid, userid, dbid, queryid, ip, planid, appid);
if (entry == NULL)
{
out_of_memory = true;
break;
}
uint64 prev_qbuf_len;
if (jstate == NULL)
pgss_update_entry(entry, /* entry */
bucketid, /* bucketid */
queryid, /* queryid */
query, /* query */
comments, /* comments */
plan_info, /* PlanInfo */
cmd_type, /* CmdType */
sys_info, /* SysInfo */
error_info, /* ErrorInfo */
total_time, /* total_time */
rows, /* rows */
bufusage, /* bufusage */
walusage, /* walusage */
reset, /* reset */
kind); /* kind */
query_len = strlen(query);
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
/* Need exclusive lock to make a new hashtable entry - promote */
LWLockRelease(pgss->lock);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/*
* Save current query buffer length, if we fail to add a new
* new entry to the hash table then we must restore the
* original length.
*/
memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len));
if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len))
{
LWLockRelease(pgss->lock);
elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query.");
return;
}
/* OK to create a new hashtable entry */
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
if (entry == NULL)
{
/* Restore previous query buffer length. */
memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len));
LWLockRelease(pgss->lock);
elog(DEBUG1, "pg_stat_monitor: out of memory");
return;
}
break;
case PGSS_NUMKIND:
case PGSS_INVALID:
break;
}
if (jstate == NULL)
pgss_update_entry(entry, /* entry */
bucketid, /* bucketid */
queryid, /* queryid */
query, /* query */
comments, /* comments */
plan_info, /* PlanInfo */
cmd_type, /* CmdType */
sys_info, /* SysInfo */
error_info, /* ErrorInfo */
total_time, /* total_time */
rows, /* rows */
bufusage, /* bufusage */
walusage, /* walusage */
reset, /* reset */
kind); /* kind */
LWLockRelease(pgss->lock);
if (out_of_memory)
elog(DEBUG1, "pg_stat_monitor: out of memory");
}
/*
* Reset all statement statistics.
@ -1601,8 +1567,12 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_entry_dealloc(-1, -1);
hash_query_entryies_reset();
hash_entry_dealloc(-1, -1, NULL);
/* Reset query buffers. */
for (size_t i = 0; i < MAX_BUCKETS; ++i)
{
*(uint64 *)pgss_qbuf[i] = 0;
}
#ifdef BENCHMARK
for (int i = STATS_START; i < STATS_END; ++i) {
pg_hook_stats[i].min_time = 0;
@ -1653,7 +1623,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
MemoryContext oldcontext;
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
pgssQueryEntry *query_entry;
char parentid_txt[32];
pgssSharedState *pgss = pgsm_get_ss();
HTAB *pgss_hash = pgsm_get_hash();
@ -1713,16 +1682,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
uint64 userid = entry->key.userid;
uint64 ip = entry->key.ip;
uint64 planid = entry->key.planid;
uint64 appid = entry->key.appid;
unsigned char *buf = pgss_qbuf[bucketid];
#if PG_VERSION_NUM < 140000
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
#else
bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS);
#endif
query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid);
if (query_entry == NULL)
continue;
if (read_query(buf, bucketid, queryid, query_txt) == 0)
{
@ -2063,8 +2028,7 @@ get_next_wbucket(pgssSharedState *pgss)
prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_entry_dealloc(new_bucket_id, prev_bucket_id);
hash_query_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf);
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)new_bucket_id);
unlink(file_name);
@ -3069,42 +3033,6 @@ exit:
return 0;
}
static pgssQueryEntry*
pgss_store_query_info(uint64 bucketid,
uint64 queryid,
uint64 dbid,
uint64 userid,
uint64 ip,
uint64 appid,
const char *query,
uint64 query_len,
pgssStoreKind kind)
{
pgssSharedState *pgss = pgsm_get_ss();
unsigned char *buf = pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)];
pgssQueryEntry *entry;
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
/* Already have query in the shared buffer, there
* is no need to add that again.
*/
entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip, appid);
if (entry)
return entry;
entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip, appid);
if (!entry)
return NULL;
entry->state = kind;
if(!SaveQueryText(bucketid, queryid, buf, query, query_len))
return NULL;
return entry;
}
bool
SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len)
{

View File

@ -380,7 +380,7 @@ void hash_entry_reset(void);
void hash_query_entryies_reset(void);
void hash_query_entries();
void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
bool hash_entry_dealloc(int new_bucket_id, int old_bucket_id);
void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
Size hash_memsize(void);