diff --git a/hash_query.c b/hash_query.c index 5333d9b..5a61df1 100644 --- a/hash_query.c +++ b/hash_query.c @@ -73,8 +73,12 @@ pgss_shmem_startup(void) pgss->query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS; for (i = 0; i < PGSM_MAX_BUCKETS; i++) + { + unsigned char *buf; pgss_qbuf[i] = (unsigned char *) ShmemAlloc(pgss->query_buf_size_bucket); - + buf = pgss_qbuf[i]; + memset(buf, 0, sizeof (uint64)); + } pgss_hash = hash_init("pg_stat_monitor: Queries hashtable", sizeof(pgssHashKey), sizeof(pgssEntry),PGSM_MAX); pgss_buckethash = hash_init("pg_stat_monitor: Bucket hashtable", sizeof(pgssBucketHashKey), sizeof(pgssBucketEntry), PGSM_MAX_BUCKETS); diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index d854ece..c1da2de 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -655,6 +655,7 @@ static void pgss_store(const char *query, uint64 queryId, float utime, float stime) { pgssHashKey key; + int bucket_id; pgssEntry *entry; char *norm_query = NULL; int encoding = GetDatabaseEncoding(); @@ -665,7 +666,7 @@ static void pgss_store(const char *query, uint64 queryId, pgssSharedState *pgss = pgsm_get_ss(); pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket(); HTAB *pgss_hash = pgsm_get_hash(); - + Assert(query != NULL); Assert(PGSM_ENABLED); @@ -713,22 +714,25 @@ static void pgss_store(const char *query, uint64 queryId, hash_dealloc_object_entry(queryId, tables_name); len = strlen(tables_name); - + + bucket_id = get_next_wbucket(pgss); + + if (bucket_id != pgss->current_wbucket) + { + reset = true; + pgss->current_wbucket = bucket_id; + } + + /* Lookup the hash table entry with shared lock. */ + LWLockAcquire(pgss->lock, LW_SHARED); + /* Set up key for hashtable search */ + key.bucket_id = bucket_id; key.userid = GetUserId(); key.dbid = MyDatabaseId; key.queryid = queryId; key.ip = pg_get_client_addr(); - key.bucket_id = get_next_wbucket(pgss); - - if (key.bucket_id != pgss->current_wbucket) - { - reset = true; - pgss->current_wbucket = key.bucket_id; - } - /* Lookup the hash table entry with shared lock. */ - LWLockAcquire(pgss->lock, LW_SHARED); entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); if(!entry) { @@ -752,17 +756,18 @@ static void pgss_store(const char *query, uint64 queryId, LWLockRelease(pgss->lock); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - /* OK to create a new hashtable entry */ + /* OK to create a new hashtable entry */ entry = hash_entry_alloc(pgss, &key, encoding); if (entry == NULL) + { goto exit; + } + if (PGSM_NORMALIZED_QUERY) + store_query(queryId, norm_query ? norm_query : query, query_len); + else + store_query(queryId, query, query_len); } - if (PGSM_NORMALIZED_QUERY) - store_query(queryId, norm_query ? norm_query : query, query_len); - else - store_query(queryId, query, query_len); - /* * Grab the spinlock while updating the counters (see comment about * locking rules at the head of the file) @@ -903,7 +908,7 @@ pg_stat_wait_events(PG_FUNCTION_ARGS) "allowed in this context"))); query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); - + /* Switch into long-lived context to construct returned data structures */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); @@ -996,7 +1001,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket_entries(); query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN); - + /* Superusers or members of pg_read_all_stats members are allowed */ is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); @@ -1151,19 +1156,18 @@ get_next_wbucket(pgssSharedState *pgss) if ((current_usec - pgss->prev_bucket_usec) > PGSM_BUCKET_TIME) { + unsigned char *buf; bucket_id = pgss->current_wbucket + 1; if (bucket_id == PGSM_MAX_BUCKETS) bucket_id = 0; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + buf = pgss_qbuf[bucket_id]; hash_entry_dealloc(bucket_id); /* reset the query buffer */ - pgss->query_fifo[bucket_id].head = 0; - pgss->query_fifo[bucket_id].tail = 0; + memset(buf, 0, sizeof (uint64)); LWLockRelease(pgss->lock); - pgss->prev_bucket_usec = current_usec; - pgssBucketEntries[bucket_id]->counters.current_time = GetCurrentTimestamp(); return bucket_id; } @@ -2101,42 +2105,49 @@ array_get_datum(int arr[]) return CStringGetTextDatum(str); } -#define FIFO_HEAD(b) pgss->query_fifo[b].head -#define FIFO_TAIL(b) pgss->query_fifo[b].tail - static uint64 locate_query(uint64 bucket_id, uint64 queryid, char * query) { - uint64 id = 0; - uint64 len = 0; - uint64 offset = 0; - pgssSharedState *pgss = pgsm_get_ss(); + bool found = false; + uint64 query_id = 0; + uint64 query_len = 0; + uint64 rlen = 0; + uint64 buf_len = 0; + unsigned char *buf = pgss_qbuf[bucket_id]; - uint64 tail = FIFO_TAIL(bucket_id); - unsigned char *buf = pgss_qbuf[bucket_id]; + memcpy(&buf_len, buf, sizeof (uint64)); + if (buf_len <= sizeof (uint64)) + return 0; - while (FIFO_HEAD(bucket_id) != tail) + rlen = sizeof (uint64); /* Move forwad to skip length bytes */ + for(;;) { - offset = 0; - memcpy(&id, &buf[tail + offset], sizeof (uint64)); /* query id */ - - offset += sizeof (uint64); - memcpy(&len, &buf[tail + offset], sizeof (uint64)); /* query len */ - - if (len == 0) + if (rlen >= buf_len) return 0; - offset += sizeof (uint64); - if (query != NULL) - { - memcpy(query, &buf[tail + offset], len); /* Actual query */ - query[len] = 0; - } - offset += len; + memcpy(&query_id, &buf[rlen], sizeof (uint64)); /* query id */ + if (query_id == queryid) + found = true; - if (id == queryid) - return id; - tail = (tail + offset) % pgsm_get_bucket_size(); + rlen += sizeof (uint64); + if (buf_len <= rlen) + continue; + + memcpy(&query_len, &buf[rlen], sizeof (uint64)); /* query len */ + rlen += sizeof (uint64); + if (buf_len < rlen + query_len) + return 0; + + if (found) + { + if (query != NULL) + { + memcpy(query, &buf[rlen], query_len); /* Actual query */ + query[query_len] = 0; + } + return query_id; + } + rlen += query_len; } return 0; } @@ -2144,9 +2155,9 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query) static void store_query(uint64 queryid, const char *query, uint64 query_len) { - int next; - int offset = 0; - pgssSharedState *pgss = pgsm_get_ss(); + uint64 buf_len = 0; + pgssSharedState *pgss = pgsm_get_ss(); + unsigned char *buf = pgss_qbuf[pgss->current_wbucket]; if (query_len > PGSM_QUERY_MAX_LEN) query_len = PGSM_QUERY_MAX_LEN; @@ -2157,27 +2168,26 @@ store_query(uint64 queryid, const char *query, uint64 query_len) if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid) return; - next = FIFO_HEAD(pgss->current_wbucket) + query_len + sizeof (uint64) + sizeof (uint64); - if (next >= pgsm_get_bucket_size()) - next = 0; + memcpy(&buf_len, buf, sizeof (uint64)); + if (buf_len == 0) + buf_len += sizeof (uint64); - /* Buffer is full */ - if (next == FIFO_HEAD(pgss->current_wbucket)) - { + if ((buf_len + query_len + sizeof(uint64) + sizeof(uint64)) > pgsm_get_bucket_size()) + { + /* Buffer is full */ elog(INFO, "pg_stat_monitor: no space left in shared_buffer"); return; } - offset = 0; - memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket)], &queryid, sizeof (uint64)); /* query id */ - offset += sizeof (uint64); + memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ + buf_len += sizeof (uint64); - memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], &query_len, sizeof (uint64)); /* query len */ - offset += sizeof (uint64); + memcpy(&buf[buf_len], &query_len, sizeof (uint64)); /* query length */ + buf_len += sizeof (uint64); - memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], query, query_len); /* actual query */ - - pgss->query_fifo[pgss->current_wbucket].head = next; + memcpy(&buf[buf_len], query, query_len); /* query */ + buf_len += query_len; + memcpy(buf, &buf_len, sizeof (uint64)); } #if PG_VERSION_NUM >= 130000 diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 351a227..cd95227 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -152,9 +152,9 @@ typedef struct pgssHashKey { uint64 bucket_id; /* bucket number */ uint64 queryid; /* query identifier */ - Oid userid; /* user OID */ - Oid dbid; /* database OID */ - uint32 ip; /* client ip address */ + uint64 userid; /* user OID */ + uint64 dbid; /* database OID */ + uint64 ip; /* client ip address */ } pgssHashKey; typedef struct QueryInfo