PG-286: Avoid duplicate queries in text buffer.

The memory area reserved for query text (pgsm_query_shared_buffer) was
divided evenly for each bucket, this allowed to have the same query,
e.g. "SELECT 1", duplicated in different buckets, thus wasting space.

This commit fix the query text duplication by adding a new hash table
whose only purpose is to verify if a given query is already added to the
buffer (by using the queryID).

This allows different buckets that share the same query to point to a
unique entry in the query buffer (pgss_qbuf).

When pg_stat_monitor moves to a new bucket id, by avoiding adding a
query that already exists in the buffer it can also save some CPU time.
pull/184/head
Diego Fronza 2021-11-29 11:09:47 -03:00 committed by Hamid Akhtar
parent b3c7ba8c60
commit c21a3de00d
3 changed files with 93 additions and 141 deletions

View File

@ -22,19 +22,8 @@
static pgssSharedState *pgss;
static HTAB *pgss_hash;
static HTAB *pgss_query_hash;
static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size);
/*
* Copy query from src_buffer to dst_buff.
* Use query_id and query_pos to fast locate query in source buffer.
* Store updated query position in the destination buffer into param query_pos.
*/
static bool copy_query(uint64 bucket_id,
uint64 query_id,
uint64 query_pos,
unsigned char *dst_buf,
unsigned char *src_buf,
size_t *new_query_pos);
static HTAB*
hash_init(const char *hash_name, int key_size, int entry_size, int hash_size)
@ -50,7 +39,6 @@ void
pgss_startup(void)
{
bool found = false;
int32 i;
/* reset in case this is a restart within the postmaster */
@ -75,16 +63,10 @@ pgss_startup(void)
init_hook_stats();
#endif
pgss->query_buf_size_bucket = MAX_QUERY_BUF / PGSM_MAX_BUCKETS;
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
{
unsigned char *buf = (unsigned char *)ShmemAlloc(pgss->query_buf_size_bucket);
set_qbuf(i, buf);
memset(buf, 0, sizeof (uint64));
}
set_qbuf((unsigned char *)ShmemAlloc(MAX_QUERY_BUF));
pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES);
pgss_query_hash = hash_init("pg_stat_monitor: queryID hashtable", sizeof(uint64), sizeof(pgssQueryEntry), MAX_BUCKET_ENTRIES);
LWLockRelease(AddinShmemInitLock);
@ -107,6 +89,12 @@ pgsm_get_hash(void)
return pgss_hash;
}
HTAB*
pgsm_get_query_hash(void)
{
return pgss_query_hash;
}
/*
* shmem_shutdown hook: Dump statistics into file.
*
@ -182,22 +170,15 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding)
* Caller must hold an exclusive lock on pgss->lock.
*/
void
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[])
hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer)
{
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry = NULL;
pgssSharedState *pgss = pgsm_get_ss();
/* Store pending query ids from the previous bucket. */
List *pending_entries = NIL;
ListCell *pending_entry;
if (new_bucket_id != -1)
{
/* Clear all queries in the query buffer for the new bucket. */
memset(query_buffer[new_bucket_id], 0, pgss->query_buf_size_bucket);
}
/* Iterate over the hash table. */
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
@ -210,6 +191,11 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
(entry->key.bucket_id == new_bucket_id &&
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
{
if (new_bucket_id == -1) {
/* pg_stat_monitor_reset(), remove entry from query hash table too. */
hash_search(pgss_query_hash, &(entry->key.queryid), HASH_REMOVE, NULL);
}
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
}
@ -268,13 +254,6 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu
new_entry->counters = old_entry->counters;
SpinLockInit(&new_entry->mutex);
new_entry->encoding = old_entry->encoding;
/* copy query's text from previous bucket to the new one. */
copy_query(new_bucket_id,
new_entry->key.queryid, /* query id */
old_entry->query_pos, /* query position in buffer */
query_buffer[new_bucket_id], /* destination query buffer */
query_buffer[old_bucket_id], /* source query buffer */
&new_entry->query_pos); /* position in which query was inserted into destination buffer */
}
free(old_entry);
@ -310,39 +289,3 @@ IsHashInitialize(void)
return (pgss != NULL &&
pgss_hash != NULL);
}
static bool copy_query(uint64 bucket_id,
uint64 query_id,
uint64 query_pos,
unsigned char *dst_buf,
unsigned char *src_buf,
size_t *new_query_pos)
{
uint64 query_len = 0;
uint64 buf_len = 0;
memcpy(&buf_len, src_buf, sizeof (uint64));
if (buf_len <= 0)
return false;
/* Try to locate the query directly. */
if (query_pos != 0 && (query_pos + sizeof(uint64) + sizeof(uint64)) < buf_len)
{
if (*(uint64 *)&src_buf[query_pos] != query_id)
return false;
query_pos += sizeof(uint64);
memcpy(&query_len, &src_buf[query_pos], sizeof(uint64)); /* query len */
query_pos += sizeof(uint64);
if (query_pos + query_len > buf_len) /* avoid reading past buffer's length. */
return false;
return SaveQueryText(bucket_id, query_id, dst_buf,
(const char *)&src_buf[query_pos],
query_len, new_query_pos);
}
return false;
}

View File

@ -67,7 +67,8 @@ static int num_relations; /* Number of relation in the query */
static bool system_init = false;
static struct rusage rusage_start;
static struct rusage rusage_end;
static unsigned char *pgss_qbuf[MAX_BUCKETS];
/* Query buffer, store queries' text. */
static unsigned char *pgss_qbuf = NULL;
static char *pgss_explain(QueryDesc *queryDesc);
#ifdef BENCHMARK
static struct pg_hook_stats_t *pg_hook_stats;
@ -1311,7 +1312,6 @@ pgss_update_entry(pgssEntry *entry,
e->counters.blocks.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time);
}
e->counters.calls.usage += USAGE_EXEC(total_time);
e->counters.info.host = pg_get_client_addr();
if (sys_info)
{
e->counters.sysinfo.utime = sys_info->utime;
@ -1478,14 +1478,13 @@ pgss_store(uint64 queryid,
uint64 planid;
uint64 appid;
char comments[512] = "";
size_t query_len;
/* Monitoring is disabled */
if (!PGSM_ENABLED)
return;
/* Safety check... */
if (!IsSystemInitialized() || !pgss_qbuf[pg_atomic_read_u64(&pgss->current_wbucket)])
if (!IsSystemInitialized())
return;
Assert(query != NULL);
@ -1528,42 +1527,63 @@ pgss_store(uint64 queryid,
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
if (!entry)
{
uint64 prev_qbuf_len;
/* position in which the query's text was inserted into the query buffer. */
size_t qpos = 0;
pgssQueryEntry *query_entry;
size_t query_len = 0;
bool query_found = false;
uint64 prev_qbuf_len = 0;
HTAB *pgss_query_hash;
query_len = strlen(query);
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
pgss_query_hash = pgsm_get_query_hash();
query_entry = hash_search(pgss_query_hash, &queryid, HASH_ENTER_NULL, &query_found);
if (query_entry == NULL)
{
LWLockRelease(pgss->lock);
pgsm_log_error("pgss_store: out of memory (pgss_query_hash).");
return;
}
else if (!query_found)
{
/* New query, must add it to the buffer, calculate its length. */
query_len = strlen(query);
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
}
/* Need exclusive lock to make a new hashtable entry - promote */
LWLockRelease(pgss->lock);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/*
* Save current query buffer length, if we fail to add a new
* new entry to the hash table then we must restore the
* original length.
*/
memcpy(&prev_qbuf_len, pgss_qbuf[bucketid], sizeof(prev_qbuf_len));
if (!SaveQueryText(bucketid, queryid, pgss_qbuf[bucketid], query, query_len, &qpos))
if (!query_found)
{
LWLockRelease(pgss->lock);
elog(DEBUG1, "pg_stat_monitor: insufficient shared space for query.");
return;
if (!SaveQueryText(bucketid, queryid, pgss_qbuf, query, query_len, &query_entry->query_pos))
{
LWLockRelease(pgss->lock);
pgsm_log_error("pgss_store: insufficient shared space for query.");
return;
}
/*
* Save current query buffer length, if we fail to add a new
* new entry to the hash table then we must restore the
* original length.
*/
memcpy(&prev_qbuf_len, pgss_qbuf, sizeof(prev_qbuf_len));
}
/* OK to create a new hashtable entry */
entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding());
if (entry == NULL)
{
/* Restore previous query buffer length. */
memcpy(pgss_qbuf[bucketid], &prev_qbuf_len, sizeof(prev_qbuf_len));
if (!query_found)
{
/* Restore previous query buffer length. */
memcpy(pgss_qbuf, &prev_qbuf_len, sizeof(prev_qbuf_len));
}
LWLockRelease(pgss->lock);
elog(DEBUG1, "pg_stat_monitor: out of memory");
return;
}
entry->query_pos = qpos;
entry->query_pos = query_entry->query_pos;
}
if (jstate == NULL)
@ -1599,11 +1619,10 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_entry_dealloc(-1, -1, NULL);
/* Reset query buffers. */
for (size_t i = 0; i < PGSM_MAX_BUCKETS; ++i)
{
*(uint64 *)pgss_qbuf[i] = 0;
}
/* Reset query buffer. */
*(uint64 *)pgss_qbuf = 0;
#ifdef BENCHMARK
for (int i = STATS_START; i < STATS_END; ++i) {
pg_hook_stats[i].min_time = 0;
@ -1713,7 +1732,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
uint64 userid = entry->key.userid;
uint64 ip = entry->key.ip;
uint64 planid = entry->key.planid;
unsigned char *buf = pgss_qbuf[bucketid];
#if PG_VERSION_NUM < 140000
bool toplevel = 1;
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
@ -1722,7 +1740,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
bool toplevel = entry->key.toplevel;
#endif
if (read_query(buf, queryid, query_txt, entry->query_pos) == 0)
if (read_query(pgss_qbuf, queryid, query_txt, entry->query_pos) == 0)
{
int rc;
rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos);
@ -1749,8 +1767,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
if (tmp.info.parentid != UINT64CONST(0))
{
int rc = 0;
if (read_query(buf, tmp.info.parentid, parent_query_txt, 0) == 0)
if (read_query(pgss_qbuf, tmp.info.parentid, parent_query_txt, 0) == 0)
{
rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0);
if (rc != 1)
@ -1802,7 +1819,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
if (enc != query_txt)
pfree(enc);
/* plan at column number 7 */
if (planid && strlen(tmp.planinfo.plan_text) > 0)
if (planid && tmp.planinfo.plan_text[0])
values[i++] = CStringGetTextDatum(tmp.planinfo.plan_text);
else
nulls[i++] = true;
@ -3120,17 +3137,17 @@ SaveQueryText(uint64 bucketid,
/*
* If the query buffer is empty, there is nothing to dump, this also
* means that the current query length exceeds MAX_QUERY_BUFFER_BUCKET.
* means that the current query length exceeds MAX_QUERY_BUF.
*/
if (buf_len <= sizeof (uint64))
return false;
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF);
buf_len = sizeof (uint64);
/*
* We must check for overflow again, as the query length may
* exceed the size allocated to the buffer (MAX_QUERY_BUFFER_BUCKET).
* exceed the total size allocated to the buffer (MAX_QUERY_BUF).
*/
if (QUERY_BUFFER_OVERFLOW(buf_len, query_len))
{
@ -3293,9 +3310,10 @@ pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS)
}
void
set_qbuf(int i, unsigned char *buf)
set_qbuf(unsigned char *buf)
{
pgss_qbuf[i] = buf;
pgss_qbuf = buf;
*(uint64 *)pgss_qbuf = 0;
}
#ifdef BENCHMARK
@ -3413,13 +3431,13 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
if (fd < 0)
goto exit;
buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET);
buf = (unsigned char*) palloc(MAX_QUERY_BUF);
while (!done)
{
off = 0;
/* read a chunck of MAX_QUERY_BUFFER_BUCKET size. */
/* read a chunck of MAX_QUERY_BUF size. */
do {
nread = read(fd, buf + off, MAX_QUERY_BUFFER_BUCKET - off);
nread = read(fd, buf + off, MAX_QUERY_BUF - off);
if (nread == -1)
{
if (errno == EINTR && tries++ < 3) /* read() was interrupted, attempt to read again (max attempts=3) */
@ -3434,9 +3452,9 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
}
off += nread;
} while (off < MAX_QUERY_BUFFER_BUCKET);
} while (off < MAX_QUERY_BUF);
if (off == MAX_QUERY_BUFFER_BUCKET)
if (off == MAX_QUERY_BUF)
{
/* we have a chunck, scan it looking for queryid. */
if (read_query(buf, queryid, query_txt, pos) != 0)
@ -3449,7 +3467,7 @@ read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos)
}
else
/*
* Either done=true or file has a size not multiple of MAX_QUERY_BUFFER_BUCKET.
* Either done=true or file has a size not multiple of MAX_QUERY_BUF.
* It is safe to assume that the file was truncated or corrupted.
*/
break;

View File

@ -87,9 +87,8 @@
#define MAX_QUERY_BUF (PGSM_QUERY_SHARED_BUFFER * 1024 * 1024)
#define MAX_BUCKETS_MEM (PGSM_MAX * 1024 * 1024)
#define BUCKETS_MEM_OVERFLOW() ((hash_get_num_entries(pgss_hash) * sizeof(pgssEntry)) >= MAX_BUCKETS_MEM)
#define MAX_QUERY_BUFFER_BUCKET MAX_QUERY_BUF / PGSM_MAX_BUCKETS
#define MAX_BUCKET_ENTRIES (MAX_BUCKETS_MEM / sizeof(pgssEntry))
#define QUERY_BUFFER_OVERFLOW(x,y) ((x + y + sizeof(uint64) + sizeof(uint64)) > MAX_QUERY_BUFFER_BUCKET)
#define QUERY_BUFFER_OVERFLOW(x,y) ((x + y + sizeof(uint64) + sizeof(uint64)) > MAX_QUERY_BUF)
#define QUERY_MARGIN 100
#define MIN_QUERY_LEN 10
#define SQLCODE_LEN 20
@ -161,7 +160,7 @@ typedef enum AGG_KEY
#define MAX_QUERY_LEN 1024
/* shared nenory storage for the query */
/* shared memory storage for the query */
typedef struct CallTime
{
double total_time; /* total execution time, in msec */
@ -171,21 +170,19 @@ typedef struct CallTime
double sum_var_time; /* sum of variances in execution time in msec */
} CallTime;
typedef struct pgssQueryHashKey
{
uint64 bucket_id; /* bucket number */
uint64 queryid; /* query identifier */
uint64 userid; /* user OID */
uint64 dbid; /* database OID */
uint64 ip; /* client ip address */
uint64 appid; /* hash of application name */
} pgssQueryHashKey;
/*
* Entry type for queries hash table (query ID).
*
* We use a hash table to keep track of query IDs that have their
* corresponding query text added to the query buffer (pgsm_query_shared_buffer).
*
* This allow us to avoid adding duplicated queries to the buffer, therefore
* leaving more space for other queries and saving some CPU.
*/
typedef struct pgssQueryEntry
{
pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */
uint64 pos; /* bucket number */
uint64 state;
uint64 queryid; /* query identifier, also the key. */
size_t query_pos; /* query location within query buffer */
} pgssQueryEntry;
typedef struct PlanInfo
@ -208,10 +205,6 @@ typedef struct pgssHashKey
typedef struct QueryInfo
{
uint64 queryid; /* query identifier */
Oid userid; /* user OID */
Oid dbid; /* database OID */
uint host; /* client IP */
uint64 parentid; /* parent queryid of current query*/
int64 type; /* type of query, options are query, info, warning, error, fatal */
char application_name[APPLICATIONNAME_LEN];
@ -311,7 +304,6 @@ typedef struct pgssSharedState
pg_atomic_uint64 current_wbucket;
pg_atomic_uint64 prev_bucket_usec;
uint64 bucket_entry[MAX_BUCKETS];
int64 query_buf_size_bucket;
char bucket_start_time[MAX_BUCKETS][60]; /* start time of the bucket */
} pgssSharedState;
@ -382,21 +374,20 @@ int pgsm_get_bucket_size(void);
pgssSharedState* pgsm_get_ss(void);
HTAB *pgsm_get_plan_hash(void);
HTAB *pgsm_get_hash(void);
HTAB *pgsm_get_query_hash(void);
HTAB *pgsm_get_plan_hash(void);
void hash_entry_reset(void);
void hash_query_entryies_reset(void);
void hash_query_entries();
void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]);
void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer);
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
Size hash_memsize(void);
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos);
uint64 read_query(unsigned char *buf, uint64 queryid, char * query, size_t pos);
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip, uint64 appid);
void pgss_startup(void);
void set_qbuf(int i, unsigned char *);
void set_qbuf(unsigned char *);
/* hash_query.c */
void pgss_startup(void);