Issue (#49): Duplicates in pg_stat_monitor.

Jira: PG-139
pull/55/head
Ibrar Ahmed 2020-10-10 22:46:53 +00:00
parent db01ed546b
commit dea16f1878
3 changed files with 85 additions and 71 deletions

View File

@ -73,8 +73,12 @@ pgss_shmem_startup(void)
pgss->query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS;
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
{
unsigned char *buf;
pgss_qbuf[i] = (unsigned char *) ShmemAlloc(pgss->query_buf_size_bucket);
buf = pgss_qbuf[i];
memset(buf, 0, sizeof (uint64));
}
pgss_hash = hash_init("pg_stat_monitor: Queries hashtable", sizeof(pgssHashKey), sizeof(pgssEntry),PGSM_MAX);
pgss_buckethash = hash_init("pg_stat_monitor: Bucket hashtable", sizeof(pgssBucketHashKey), sizeof(pgssBucketEntry), PGSM_MAX_BUCKETS);

View File

@ -655,6 +655,7 @@ static void pgss_store(const char *query, uint64 queryId,
float utime, float stime)
{
pgssHashKey key;
int bucket_id;
pgssEntry *entry;
char *norm_query = NULL;
int encoding = GetDatabaseEncoding();
@ -714,21 +715,24 @@ static void pgss_store(const char *query, uint64 queryId,
hash_dealloc_object_entry(queryId, tables_name);
len = strlen(tables_name);
bucket_id = get_next_wbucket(pgss);
if (bucket_id != pgss->current_wbucket)
{
reset = true;
pgss->current_wbucket = bucket_id;
}
/* Lookup the hash table entry with shared lock. */
LWLockAcquire(pgss->lock, LW_SHARED);
/* Set up key for hashtable search */
key.bucket_id = bucket_id;
key.userid = GetUserId();
key.dbid = MyDatabaseId;
key.queryid = queryId;
key.ip = pg_get_client_addr();
key.bucket_id = get_next_wbucket(pgss);
if (key.bucket_id != pgss->current_wbucket)
{
reset = true;
pgss->current_wbucket = key.bucket_id;
}
/* Lookup the hash table entry with shared lock. */
LWLockAcquire(pgss->lock, LW_SHARED);
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
if(!entry)
{
@ -752,17 +756,18 @@ static void pgss_store(const char *query, uint64 queryId,
LWLockRelease(pgss->lock);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/* OK to create a new hashtable entry */
/* OK to create a new hashtable entry */
entry = hash_entry_alloc(pgss, &key, encoding);
if (entry == NULL)
{
goto exit;
}
if (PGSM_NORMALIZED_QUERY)
store_query(queryId, norm_query ? norm_query : query, query_len);
else
store_query(queryId, query, query_len);
}
if (PGSM_NORMALIZED_QUERY)
store_query(queryId, norm_query ? norm_query : query, query_len);
else
store_query(queryId, query, query_len);
/*
* Grab the spinlock while updating the counters (see comment about
* locking rules at the head of the file)
@ -1151,19 +1156,18 @@ get_next_wbucket(pgssSharedState *pgss)
if ((current_usec - pgss->prev_bucket_usec) > PGSM_BUCKET_TIME)
{
unsigned char *buf;
bucket_id = pgss->current_wbucket + 1;
if (bucket_id == PGSM_MAX_BUCKETS)
bucket_id = 0;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
buf = pgss_qbuf[bucket_id];
hash_entry_dealloc(bucket_id);
/* reset the query buffer */
pgss->query_fifo[bucket_id].head = 0;
pgss->query_fifo[bucket_id].tail = 0;
memset(buf, 0, sizeof (uint64));
LWLockRelease(pgss->lock);
pgss->prev_bucket_usec = current_usec;
pgssBucketEntries[bucket_id]->counters.current_time = GetCurrentTimestamp();
return bucket_id;
}
@ -2101,42 +2105,49 @@ array_get_datum(int arr[])
return CStringGetTextDatum(str);
}
#define FIFO_HEAD(b) pgss->query_fifo[b].head
#define FIFO_TAIL(b) pgss->query_fifo[b].tail
static uint64
locate_query(uint64 bucket_id, uint64 queryid, char * query)
{
uint64 id = 0;
uint64 len = 0;
uint64 offset = 0;
pgssSharedState *pgss = pgsm_get_ss();
bool found = false;
uint64 query_id = 0;
uint64 query_len = 0;
uint64 rlen = 0;
uint64 buf_len = 0;
unsigned char *buf = pgss_qbuf[bucket_id];
uint64 tail = FIFO_TAIL(bucket_id);
unsigned char *buf = pgss_qbuf[bucket_id];
memcpy(&buf_len, buf, sizeof (uint64));
if (buf_len <= sizeof (uint64))
return 0;
while (FIFO_HEAD(bucket_id) != tail)
rlen = sizeof (uint64); /* Move forwad to skip length bytes */
for(;;)
{
offset = 0;
memcpy(&id, &buf[tail + offset], sizeof (uint64)); /* query id */
offset += sizeof (uint64);
memcpy(&len, &buf[tail + offset], sizeof (uint64)); /* query len */
if (len == 0)
if (rlen >= buf_len)
return 0;
offset += sizeof (uint64);
if (query != NULL)
{
memcpy(query, &buf[tail + offset], len); /* Actual query */
query[len] = 0;
}
offset += len;
memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */
if (query_id == queryid)
found = true;
if (id == queryid)
return id;
tail = (tail + offset) % pgsm_get_bucket_size();
rlen += sizeof (uint64);
if (buf_len <= rlen)
continue;
memcpy(&query_len, &buf[rlen], sizeof (uint64)); /* query len */
rlen += sizeof (uint64);
if (buf_len < rlen + query_len)
return 0;
if (found)
{
if (query != NULL)
{
memcpy(query, &buf[rlen], query_len); /* Actual query */
query[query_len] = 0;
}
return query_id;
}
rlen += query_len;
}
return 0;
}
@ -2144,9 +2155,9 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query)
static void
store_query(uint64 queryid, const char *query, uint64 query_len)
{
int next;
int offset = 0;
pgssSharedState *pgss = pgsm_get_ss();
uint64 buf_len = 0;
pgssSharedState *pgss = pgsm_get_ss();
unsigned char *buf = pgss_qbuf[pgss->current_wbucket];
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
@ -2157,27 +2168,26 @@ store_query(uint64 queryid, const char *query, uint64 query_len)
if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid)
return;
next = FIFO_HEAD(pgss->current_wbucket) + query_len + sizeof (uint64) + sizeof (uint64);
if (next >= pgsm_get_bucket_size())
next = 0;
memcpy(&buf_len, buf, sizeof (uint64));
if (buf_len == 0)
buf_len += sizeof (uint64);
/* Buffer is full */
if (next == FIFO_HEAD(pgss->current_wbucket))
{
if ((buf_len + query_len + sizeof(uint64) + sizeof(uint64)) > pgsm_get_bucket_size())
{
/* Buffer is full */
elog(INFO, "pg_stat_monitor: no space left in shared_buffer");
return;
}
offset = 0;
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket)], &queryid, sizeof (uint64)); /* query id */
offset += sizeof (uint64);
memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */
buf_len += sizeof (uint64);
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], &query_len, sizeof (uint64)); /* query len */
offset += sizeof (uint64);
memcpy(&buf[buf_len], &query_len, sizeof (uint64)); /* query length */
buf_len += sizeof (uint64);
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], query, query_len); /* actual query */
pgss->query_fifo[pgss->current_wbucket].head = next;
memcpy(&buf[buf_len], query, query_len); /* query */
buf_len += query_len;
memcpy(buf, &buf_len, sizeof (uint64));
}
#if PG_VERSION_NUM >= 130000

View File

@ -152,9 +152,9 @@ typedef struct pgssHashKey
{
uint64 bucket_id; /* bucket number */
uint64 queryid; /* query identifier */
Oid userid; /* user OID */
Oid dbid; /* database OID */
uint32 ip; /* client ip address */
uint64 userid; /* user OID */
uint64 dbid; /* database OID */
uint64 ip; /* client ip address */
} pgssHashKey;
typedef struct QueryInfo