PG-190: Does not show query, if query elapsed time is greater than bucket time.
parent
e8bfff127b
commit
89614e442b
58
hash_query.c
58
hash_query.c
|
@ -160,24 +160,65 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding)
|
||||||
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
|
elog(DEBUG1, "%s", "pg_stat_monitor: out of memory");
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Deallocate least-used entries.
|
* Reset all the entries.
|
||||||
*
|
*
|
||||||
* Caller must hold an exclusive lock on pgss->lock.
|
* Caller must hold an exclusive lock on pgss->lock.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
hash_query_entry_dealloc(int bucket)
|
hash_query_entryies_reset()
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS hash_seq;
|
HASH_SEQ_STATUS hash_seq;
|
||||||
pgssQueryEntry *entry;
|
pgssQueryEntry *entry;
|
||||||
|
|
||||||
|
hash_seq_init(&hash_seq, pgss_query_hash);
|
||||||
|
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||||
|
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Deallocate finished entries.
|
||||||
|
*
|
||||||
|
* Caller must hold an exclusive lock on pgss->lock.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
hash_query_entry_dealloc(int bucket, unsigned char *buf)
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS hash_seq;
|
||||||
|
pgssQueryEntry *entry;
|
||||||
|
unsigned char *old_buf;
|
||||||
|
pgssSharedState *pgss = pgsm_get_ss();
|
||||||
|
|
||||||
|
old_buf = palloc0(pgss->query_buf_size_bucket);
|
||||||
|
memcpy(old_buf, buf, pgss->query_buf_size_bucket);
|
||||||
|
|
||||||
|
memset(buf, 0, pgss->query_buf_size_bucket);
|
||||||
|
|
||||||
hash_seq_init(&hash_seq, pgss_query_hash);
|
hash_seq_init(&hash_seq, pgss_query_hash);
|
||||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||||
{
|
{
|
||||||
if (entry->key.bucket_id == bucket || bucket < 0)
|
if (entry->key.bucket_id == bucket)
|
||||||
|
{
|
||||||
|
if (entry->state == PGSS_FINISHED || entry->state == PGSS_ERROR)
|
||||||
|
{
|
||||||
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
|
entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int len;
|
||||||
|
char query_txt[1024];
|
||||||
|
if (read_query(old_buf, entry->key.bucket_id, entry->key.queryid, query_txt) == 0)
|
||||||
|
{
|
||||||
|
len = read_query_buffer(entry->key.bucket_id, entry->key.queryid, query_txt);
|
||||||
|
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||||
|
snprintf(query_txt, 32, "%s", "<insufficient disk/shared space>");
|
||||||
|
}
|
||||||
|
SaveQueryText(entry->key.bucket_id, entry->key.queryid, buf, query_txt, strlen(query_txt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pfree(old_buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -185,20 +226,23 @@ hash_query_entry_dealloc(int bucket)
|
||||||
*
|
*
|
||||||
* Caller must hold an exclusive lock on pgss->lock.
|
* Caller must hold an exclusive lock on pgss->lock.
|
||||||
*/
|
*/
|
||||||
void
|
bool
|
||||||
hash_entry_dealloc(int bucket)
|
hash_entry_dealloc(int bucket)
|
||||||
{
|
{
|
||||||
HASH_SEQ_STATUS hash_seq;
|
HASH_SEQ_STATUS hash_seq;
|
||||||
pgssEntry *entry;
|
pgssEntry *entry = NULL;
|
||||||
|
|
||||||
hash_seq_init(&hash_seq, pgss_hash);
|
hash_seq_init(&hash_seq, pgss_hash);
|
||||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||||
{
|
{
|
||||||
if (entry->key.bucket_id == bucket || bucket < 0)
|
if (bucket < 0 ||
|
||||||
|
(entry->key.bucket_id == bucket &&
|
||||||
|
(entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR)))
|
||||||
{
|
{
|
||||||
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -188,8 +188,6 @@ pgss_store_query(uint64 queryid,
|
||||||
int query_len,
|
int query_len,
|
||||||
pgssJumbleState *jstate,
|
pgssJumbleState *jstate,
|
||||||
pgssStoreKind kind);
|
pgssStoreKind kind);
|
||||||
static uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query);
|
|
||||||
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
|
|
||||||
|
|
||||||
static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
|
static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
|
||||||
|
|
||||||
|
@ -1292,7 +1290,7 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
|
||||||
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
|
||||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||||
hash_entry_dealloc(-1);
|
hash_entry_dealloc(-1);
|
||||||
hash_query_entry_dealloc(-1);
|
hash_query_entryies_reset();
|
||||||
LWLockRelease(pgss->lock);
|
LWLockRelease(pgss->lock);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -1398,8 +1396,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||||
char *query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
|
char *query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
|
||||||
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
|
bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
|
||||||
|
|
||||||
if (!IsBucketValid(bucketid))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip);
|
query_entry = hash_find_query_entry(bucketid, queryid, dbid, userid, ip);
|
||||||
if (query_entry == NULL)
|
if (query_entry == NULL)
|
||||||
|
@ -1420,6 +1416,11 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||||
tmp = e->counters;
|
tmp = e->counters;
|
||||||
SpinLockRelease(&e->mutex);
|
SpinLockRelease(&e->mutex);
|
||||||
}
|
}
|
||||||
|
if (!IsBucketValid(bucketid))
|
||||||
|
{
|
||||||
|
if (tmp.state == PGSS_FINISHED)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
/* bucketid at column number 0 */
|
/* bucketid at column number 0 */
|
||||||
values[i++] = Int64GetDatumFast(bucketid);
|
values[i++] = Int64GetDatumFast(bucketid);
|
||||||
|
|
||||||
|
@ -1689,12 +1690,11 @@ get_next_wbucket(pgssSharedState *pgss)
|
||||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||||
buf = pgss_qbuf[bucket_id];
|
buf = pgss_qbuf[bucket_id];
|
||||||
hash_entry_dealloc(bucket_id);
|
hash_entry_dealloc(bucket_id);
|
||||||
hash_query_entry_dealloc(bucket_id);
|
hash_query_entry_dealloc(bucket_id, buf);
|
||||||
|
|
||||||
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
snprintf(file_name, 1024, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
||||||
unlink(file_name);
|
unlink(file_name);
|
||||||
|
|
||||||
/* reset the query buffer */
|
|
||||||
memset(buf, 0, sizeof (uint64));
|
|
||||||
LWLockRelease(pgss->lock);
|
LWLockRelease(pgss->lock);
|
||||||
pgss->prev_bucket_usec = current_usec;
|
pgss->prev_bucket_usec = current_usec;
|
||||||
lt = localtime(&tv.tv_sec);
|
lt = localtime(&tv.tv_sec);
|
||||||
|
@ -2642,7 +2642,7 @@ intarray_get_datum(int32 arr[], int len)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64
|
uint64
|
||||||
read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
||||||
{
|
{
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
@ -2673,7 +2673,6 @@ read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query)
|
||||||
rlen += sizeof (uint64);
|
rlen += sizeof (uint64);
|
||||||
if (buf_len < rlen + query_len)
|
if (buf_len < rlen + query_len)
|
||||||
goto exit;
|
goto exit;
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
if (query != NULL)
|
if (query != NULL)
|
||||||
|
@ -2704,7 +2703,6 @@ pgss_store_query_info(uint64 bucketid,
|
||||||
uint64 query_len,
|
uint64 query_len,
|
||||||
pgssStoreKind kind)
|
pgssStoreKind kind)
|
||||||
{
|
{
|
||||||
uint64 buf_len = 0;
|
|
||||||
pgssSharedState *pgss = pgsm_get_ss();
|
pgssSharedState *pgss = pgsm_get_ss();
|
||||||
unsigned char *buf = pgss_qbuf[pgss->current_wbucket];
|
unsigned char *buf = pgss_qbuf[pgss->current_wbucket];
|
||||||
pgssQueryEntry *entry;
|
pgssQueryEntry *entry;
|
||||||
|
@ -2722,6 +2720,18 @@ pgss_store_query_info(uint64 bucketid,
|
||||||
entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip);
|
entry = hash_create_query_entry(bucketid, queryid, dbid, userid, ip);
|
||||||
if (!entry)
|
if (!entry)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
entry->state = kind;
|
||||||
|
|
||||||
|
if(!SaveQueryText(bucketid, queryid, buf, query, query_len))
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len)
|
||||||
|
{
|
||||||
|
uint64 buf_len = 0;
|
||||||
|
|
||||||
memcpy(&buf_len, buf, sizeof (uint64));
|
memcpy(&buf_len, buf, sizeof (uint64));
|
||||||
if (buf_len == 0)
|
if (buf_len == 0)
|
||||||
|
@ -2732,7 +2742,7 @@ pgss_store_query_info(uint64 bucketid,
|
||||||
switch(PGSM_OVERFLOW_TARGET)
|
switch(PGSM_OVERFLOW_TARGET)
|
||||||
{
|
{
|
||||||
case OVERFLOW_TARGET_NONE:
|
case OVERFLOW_TARGET_NONE:
|
||||||
return NULL;
|
return false;
|
||||||
case OVERFLOW_TARGET_DISK:
|
case OVERFLOW_TARGET_DISK:
|
||||||
{
|
{
|
||||||
dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
|
dump_queries_buffer(bucketid, buf, MAX_QUERY_BUFFER_BUCKET);
|
||||||
|
@ -2753,7 +2763,7 @@ pgss_store_query_info(uint64 bucketid,
|
||||||
memcpy(&buf[buf_len], query, query_len); /* query */
|
memcpy(&buf[buf_len], query, query_len); /* query */
|
||||||
buf_len += query_len;
|
buf_len += query_len;
|
||||||
memcpy(buf, &buf_len, sizeof (uint64));
|
memcpy(buf, &buf_len, sizeof (uint64));
|
||||||
return entry;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64
|
static uint64
|
||||||
|
|
|
@ -183,6 +183,7 @@ typedef struct pgssQueryEntry
|
||||||
{
|
{
|
||||||
pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */
|
pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */
|
||||||
uint64 pos; /* bucket number */
|
uint64 pos; /* bucket number */
|
||||||
|
uint64 state;
|
||||||
} pgssQueryEntry;
|
} pgssQueryEntry;
|
||||||
|
|
||||||
typedef struct PlanInfo
|
typedef struct PlanInfo
|
||||||
|
@ -356,6 +357,8 @@ typedef struct pgssJumbleState
|
||||||
|
|
||||||
/* Links to shared memory state */
|
/* Links to shared memory state */
|
||||||
|
|
||||||
|
bool SaveQueryText(uint64 bucketid, uint64 queryid, unsigned char *buf, const char *query, uint64 query_len);
|
||||||
|
|
||||||
/* guc.c */
|
/* guc.c */
|
||||||
void init_guc(void);
|
void init_guc(void);
|
||||||
GucVariable *get_conf(int i);
|
GucVariable *get_conf(int i);
|
||||||
|
@ -372,11 +375,15 @@ HTAB *pgsm_get_hash(void);
|
||||||
HTAB *pgsm_get_plan_hash(void);
|
HTAB *pgsm_get_plan_hash(void);
|
||||||
HTAB* pgsm_get_query_hash(void);
|
HTAB* pgsm_get_query_hash(void);
|
||||||
void hash_entry_reset(void);
|
void hash_entry_reset(void);
|
||||||
void hash_query_entry_dealloc(int bucket);
|
void hash_query_entryies_reset(void);
|
||||||
void hash_entry_dealloc(int bucket);
|
void hash_query_entries();
|
||||||
|
void hash_query_entry_dealloc(int bucket, unsigned char *buf);
|
||||||
|
bool hash_entry_dealloc(int bucket);
|
||||||
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding);
|
||||||
Size hash_memsize(void);
|
Size hash_memsize(void);
|
||||||
|
|
||||||
|
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
|
||||||
|
uint64 read_query(unsigned char *buf, uint64 bucketid, uint64 queryid, char * query);
|
||||||
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip);
|
pgssQueryEntry* hash_find_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip);
|
||||||
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip);
|
pgssQueryEntry* hash_create_query_entry(uint64 bucket_id, uint64 queryid, uint64 dbid, uint64 userid, uint64 ip);
|
||||||
void pgss_startup(void);
|
void pgss_startup(void);
|
||||||
|
|
Loading…
Reference in New Issue