From bedc2ffc78feb3d12c3f8da92a7cdb897913b4c3 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Sun, 29 Nov 2020 20:11:23 +0000 Subject: [PATCH] PG-154: Add backup option in case of no space left in the bucket. --- guc.c | 37 ++++++----- hash_query.c | 110 +++++++++++++++++++++------------ pg_stat_monitor.c | 154 +++++++++++++++++++++++++++++++++++++++------- pg_stat_monitor.h | 33 +++++++++- 4 files changed, 251 insertions(+), 83 deletions(-) diff --git a/guc.c b/guc.c index 7d76701..d80f1a9 100644 --- a/guc.c +++ b/guc.c @@ -12,7 +12,7 @@ #include "postgres.h" #include "pg_stat_monitor.h" - + GucVariable conf[12]; /* @@ -24,10 +24,10 @@ init_guc(void) int i = 0; conf[i++] = (GucVariable) { .guc_name = "pg_stat_monitor.pgsm_max", - .guc_desc = "Sets the maximum number of statements tracked by pg_stat_monitor.", - .guc_default = 5000, - .guc_min = 5000, - .guc_max = INT_MAX, + .guc_desc = "Sets the maximum size of shared memory in (MB) used for statement's metadata tracked by pg_stat_monitor.", + .guc_default = 100, + .guc_min = 1, + .guc_max = 1000, .guc_restart = true }; @@ -101,10 +101,10 @@ init_guc(void) conf[i++] = (GucVariable) { .guc_name = "pg_stat_monitor.pgsm_query_shared_buffer", - .guc_desc = "Sets the query shared_buffer size.", - .guc_default = 500000, - .guc_min = 500000, - .guc_max = INT_MAX, + .guc_desc = "Sets the maximum size of shared memory in (MB) used for query tracked by pg_stat_monitor.", + .guc_default = 20, + .guc_min = 1, + .guc_max = 10000, .guc_restart = true }; #if PG_VERSION_NUM >= 130000 @@ -119,14 +119,14 @@ init_guc(void) #endif DefineCustomIntVariable("pg_stat_monitor.pgsm_max", - "Sets the maximum number of statements tracked by pg_stat_monitor.", + "Sets the maximum size of shared memory in (MB) used for statement's metadata tracked by pg_stat_monitor.", NULL, &PGSM_MAX, - 5000, - 5000, - INT_MAX, + 100, + 1, + 1000, PGC_POSTMASTER, - 0, + GUC_UNIT_MB, NULL, NULL, NULL); @@ -231,12 +231,12 @@ init_guc(void) NULL); DefineCustomIntVariable("pg_stat_monitor.pgsm_query_shared_buffer", - "Sets the query shared_buffer size", + "Sets the maximum size of shared memory in (MB) used for query tracked by pg_stat_monitor.", NULL, &PGSM_QUERY_BUF_SIZE, - 500000, - 500000, - INT_MAX, + 20, + 1, + 10000, PGC_POSTMASTER, 0, NULL, @@ -253,7 +253,6 @@ init_guc(void) NULL, NULL, NULL); - } GucVariable* diff --git a/hash_query.c b/hash_query.c index 02b4896..a34447b 100644 --- a/hash_query.c +++ b/hash_query.c @@ -16,6 +16,9 @@ static pgssSharedState *pgss; static HTAB *pgss_hash; +static HTAB *pgss_query_hash; + + static HTAB* hash_init(const char *hash_name, int key_size, int entry_size, int hash_size); @@ -39,6 +42,7 @@ pgss_startup(void) pgss = NULL; pgss_hash = NULL; + pgss_query_hash = NULL; /* * Create or attach to the shared memory state, including hash table @@ -54,7 +58,7 @@ pgss_startup(void) ResetSharedState(pgss); } - pgss->query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS; + pgss->query_buf_size_bucket = MAX_QUERY_BUF / PGSM_MAX_BUCKETS; for (i = 0; i < PGSM_MAX_BUCKETS; i++) { @@ -63,7 +67,8 @@ pgss_startup(void) memset(buf, 0, sizeof (uint64)); } - pgss_hash = hash_init("pg_stat_monitor: Queries hashtable", sizeof(pgssHashKey), sizeof(pgssEntry),PGSM_MAX); + pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); + pgss_query_hash = hash_init("pg_stat_monitor: query hashtable", sizeof(pgssQueryHashKey), sizeof(pgssQueryEntry),500000); LWLockRelease(AddinShmemInitLock); @@ -80,16 +85,24 @@ pgsm_get_bucket_size(void) return pgss->query_buf_size_bucket; } -pgssSharedState* pgsm_get_ss(void) +pgssSharedState* +pgsm_get_ss(void) { return pgss; } -HTAB* pgsm_get_hash(void) +HTAB* +pgsm_get_hash(void) { return pgss_hash; } +HTAB* +pgsm_get_query_hash(void) +{ + return pgss_query_hash; +} + /* * shmem_shutdown hook: Dump statistics into file. * @@ -115,24 +128,21 @@ hash_memsize(void) Size size; size = MAXALIGN(sizeof(pgssSharedState)); - size = add_size(size, hash_estimate_size(PGSM_MAX, sizeof(pgssEntry))); + size = MAXALIGN(MAX_QUERY_BUF); + size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); + size = add_size(size, hash_estimate_size(500000, sizeof(pgssQueryEntry))); return size; } + pgssEntry * hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding) { pgssEntry *entry = NULL; bool found = false; - if (pgss->bucket_entry[pgss->current_wbucket] >= (PGSM_MAX / PGSM_MAX_BUCKETS)) - { - pgss->bucket_overflow[pgss->current_wbucket]++; - return NULL; - } - - if (hash_get_num_entries(pgss_hash) >= PGSM_MAX) + if (hash_get_num_entries(pgss_hash) >= MAX_BUCKET_ENTRIES) return NULL; /* Find or create an entry with desired hash code */ @@ -150,9 +160,32 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key,int encoding) /* ... and don't forget the query text metadata */ entry->encoding = encoding; } + if (entry == NULL) + elog(FATAL, "%s", "Ibrar"); return entry; } +/* + * Deallocate least-used entries. + * + * Caller must hold an exclusive lock on pgss->lock. + */ +void +hash_query_entry_dealloc(int bucket) +{ + HASH_SEQ_STATUS hash_seq; + pgssQueryEntry *entry; + + printf ("--%d--", hash_get_num_entries(pgss_hash)); + hash_seq_init(&hash_seq, pgss_query_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (entry->key.bucket_id == bucket) + entry = hash_search(pgss_query_hash, &entry->key, HASH_REMOVE, NULL); + } + printf ("--%d", hash_get_num_entries(pgss_hash)); +} + /* * Deallocate least-used entries. * @@ -164,13 +197,13 @@ hash_entry_dealloc(int bucket) HASH_SEQ_STATUS hash_seq; pgssEntry *entry; - pgss->bucket_entry[bucket] = 0; - hash_seq_init(&hash_seq, pgss_hash); while ((entry = hash_seq_search(&hash_seq)) != NULL) { if (entry->key.bucket_id == bucket || bucket < 0) + { entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + } } } @@ -180,6 +213,7 @@ hash_entry_dealloc(int bucket) void hash_entry_reset() { + pgssSharedState *pgss = pgsm_get_ss(); HASH_SEQ_STATUS hash_seq; pgssEntry *entry; @@ -194,35 +228,35 @@ hash_entry_reset() LWLockRelease(pgss->lock); } -pgssEntry* -hash_create_query_entry(unsigned int queryid, - unsigned int userid, - unsigned int dbid, - unsigned int bucket_id, - unsigned int ip) +/* Caller must accuire lock */ +bool +hash_create_query_entry(uint64 bucket_id, uint64 queryid) { - pgssHashKey key; - pgssEntry *entry = NULL; - int encoding = GetDatabaseEncoding(); + pgssQueryHashKey key; + pgssQueryEntry *entry = NULL; + bool found; key.queryid = queryid; - key.userid = userid; - key.dbid = dbid; - key.bucket_id = bucket_id; - key.ip = ip; + key.bucket_id = bucket_id; + + entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_ENTER, &found); + return (entry != NULL); +} + +/* Caller must accuire lock */ +bool +hash_find_query_entry(uint64 bucket_id, uint64 queryid) +{ + pgssQueryHashKey key; + pgssQueryEntry *entry = NULL; + bool found; + + key.queryid = queryid; + key.bucket_id = bucket_id; /* Lookup the hash table entry with shared lock. */ - LWLockAcquire(pgss->lock, LW_SHARED); - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); - if(!entry) - { - LWLockRelease(pgss->lock); - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - - /* OK to create a new hashtable entry */ - entry = hash_entry_alloc(pgss, &key, encoding); - } - return entry; + entry = (pgssQueryEntry *) hash_search(pgss_query_hash, &key, HASH_FIND, &found); + return ((entry != NULL) && found); } bool diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 9f72d97..231d221 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -16,7 +16,9 @@ PG_MODULE_MAGIC; -#define BUILD_VERSION "0.6.0" +#define BUILD_VERSION "0.6.0" +#define PG_STAT_STATEMENTS_COLS 41 /* maximum of above */ +#define PGSM_TEXT_FILE "/tmp/pg_stat_monitor_query" /*---- Initicalization Function Declarations ----*/ void _PG_init(void); @@ -32,13 +34,14 @@ static int plan_nested_level = 0; static int exec_nested_level = 0; #endif +FILE *qfile; static bool system_init = false; static struct rusage rusage_start; static struct rusage rusage_end; static unsigned char *pgss_qbuf[MAX_BUCKETS]; - static bool IsSystemInitialized(void); +static void dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len); /* Saved hook values in case of unload */ static planner_hook_type planner_hook_next = NULL; @@ -53,7 +56,6 @@ void pgsm_emit_log_hook(ErrorData *edata); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static ExecutorCheckPerms_hook_type prev_ExecutorCheckPerms_hook = NULL; - PG_FUNCTION_INFO_V1(pg_stat_monitor_version); PG_FUNCTION_INFO_V1(pg_stat_monitor_reset); PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2); @@ -130,8 +132,9 @@ static int comp_location(const void *a, const void *b); static uint64 get_next_wbucket(pgssSharedState *pgss); -static void store_query(uint64 queryid, const char *query, uint64 query_len); -static uint64 locate_query(uint64 bucket_id, uint64 queryid, char * query); +static void store_query(int bucket_id, uint64 queryid, const char *query, uint64 query_len); +static uint64 read_query(unsigned char *buf, uint64 queryid, char * query); +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt); static uint64 get_query_id(pgssJumbleState *jstate, Query *query); @@ -141,6 +144,8 @@ static uint64 get_query_id(pgssJumbleState *jstate, Query *query); void _PG_init(void) { + int i; + elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); /* * In order to create our shared memory area, we have to be loaded via @@ -156,6 +161,13 @@ _PG_init(void) /* Inilize the GUC variables */ init_guc(); + for (i = 0; i < PGSM_MAX_BUCKETS; i++) + { + char file_name[1024]; + sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, i); + unlink(file_name); + } + EmitWarningsOnPlaceholders("pg_stat_monitor"); /* @@ -233,7 +245,6 @@ pg_stat_monitor_version(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(BUILD_VERSION)); } -#define PG_STAT_STATEMENTS_COLS 41 /* maximum of above */ /* * Post-parse-analysis hook: mark query with a queryId @@ -660,7 +671,7 @@ pgss_hash_string(const char *str, int len) static PgBackendStatus* pg_get_backend_status(void) { - LocalPgBackendStatus *local_beentry; + LocalPgBackendStatus *local_beentry; int num_backends = pgstat_fetch_stat_numbackends(); int i; @@ -861,9 +872,9 @@ static void pgss_store(uint64 queryId, goto exit; } if (PGSM_NORMALIZED_QUERY) - store_query(queryId, norm_query ? norm_query : query, query_len); + store_query(key.bucket_id, queryId, norm_query ? norm_query : query, query_len); else - store_query(queryId, query, query_len); + store_query(key.bucket_id, queryId, query, query_len); } /* @@ -1090,9 +1101,20 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); - if(locate_query(entry->key.bucket_id, queryid, query_txt) == 0) - query_txt = NULL; - + if (!hash_find_query_entry(entry->key.bucket_id, queryid)) + { + sprintf(query_txt, "%s", "pg_stat_monitor: queryid not found in hash and in temporay file"); + } + else + { + unsigned char *buf = pgss_qbuf[entry->key.bucket_id]; + if(read_query(buf, queryid, query_txt) == 0) + { + len = read_query_buffer(entry->key.bucket_id, queryid, query_txt); + if (len != MAX_QUERY_BUFFER_BUCKET) + sprintf(query_txt, "%s", "pg_stat_monitor: query not found either in hash nor in temporay file"); + } + } if (query_txt) sprintf(queryid_txt, "%08lX", queryid); else @@ -1213,8 +1235,9 @@ static uint64 get_next_wbucket(pgssSharedState *pgss) { struct timeval tv; - uint64 current_usec; - uint64 bucket_id; + uint64 current_usec; + uint64 bucket_id; + char file_name[1024]; gettimeofday(&tv,NULL); current_usec = tv.tv_sec; @@ -1229,6 +1252,11 @@ get_next_wbucket(pgssSharedState *pgss) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); buf = pgss_qbuf[bucket_id]; hash_entry_dealloc(bucket_id); + hash_query_entry_dealloc(bucket_id); + sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id); + unlink(file_name); + printf("\nRemove file %s\n", file_name); + /* reset the query buffer */ memset(buf, 0, sizeof (uint64)); LWLockRelease(pgss->lock); @@ -2199,17 +2227,16 @@ intarray_get_datum(int32 arr[], int len) } static uint64 -locate_query(uint64 bucket_id, uint64 queryid, char * query) +read_query(unsigned char *buf, uint64 queryid, char * query) { bool found = false; uint64 query_id = 0; uint64 query_len = 0; uint64 rlen = 0; uint64 buf_len = 0; - unsigned char *buf = pgss_qbuf[bucket_id]; memcpy(&buf_len, buf, sizeof (uint64)); - if (buf_len <= sizeof (uint64)) + if (buf_len <= 0) return 0; rlen = sizeof (uint64); /* Move forwad to skip length bytes */ @@ -2246,7 +2273,7 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query) } static void -store_query(uint64 queryid, const char *query, uint64 query_len) +store_query(int bucket_id, uint64 queryid, const char *query, uint64 query_len) { uint64 buf_len = 0; pgssSharedState *pgss = pgsm_get_ss(); @@ -2258,18 +2285,20 @@ store_query(uint64 queryid, const char *query, uint64 query_len) /* Already have query in the shared buffer, there * is no need to add that again. */ - if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid) + if (hash_find_query_entry(bucket_id, queryid)) + return; + + if (!hash_create_query_entry(bucket_id, queryid)) return; memcpy(&buf_len, buf, sizeof (uint64)); if (buf_len == 0) buf_len += sizeof (uint64); - if ((buf_len + query_len + sizeof(uint64) + sizeof(uint64)) > pgsm_get_bucket_size()) + if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) { - /* Buffer is full */ - elog(INFO, "pg_stat_monitor: no space left in shared_buffer"); - return; + dump_queries_buffer(bucket_id, buf, MAX_QUERY_BUFFER_BUCKET); + buf_len = sizeof (uint64); } memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */ @@ -2417,7 +2446,11 @@ pg_stat_monitor_settings(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); +#if PG_VERSION_NUM >= 130000 for(i = 0; i < 11; i++) +#else + for(i = 0; i < 10; i++) +#endif { Datum values[7]; bool nulls[7]; @@ -2490,3 +2523,78 @@ IsSystemInitialized(void) return (system_init && IsHashInitialize()); } +static void +dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) +{ + int fd = 0; + char file_name[1024]; + + sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, bucket_id); + printf("\nWriting to %s\n", file_name); + fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); + if (fd < 0) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + file_name))); + + if (write(fd, buf, buf_len) != buf_len) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + file_name))); + if (fd > 0) + CloseTransientFile(fd); +} + +int +read_query_buffer(int bucket_id, uint64 queryid, char *query_txt) +{ + int fd = 0; + int buf_len; + char file_name[1024]; + unsigned char *buf = NULL; + int off = 0; + + sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, bucket_id); + fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); + if (fd < 0) + goto exit; + + buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET); + for(;;) + { + if (lseek(fd, off, SEEK_SET) != off) + goto exit; + + buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET); + if (buf_len != MAX_QUERY_BUFFER_BUCKET) + { + if (errno != ENOENT) + goto exit; + + if (buf_len == 0) + break; + } + off += buf_len; + if (read_query(buf, queryid, query_txt)) + break; + } + if (fd > 0) + CloseTransientFile(fd); + if (buf) + pfree(buf); + return buf_len; + +exit: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + file_name))); + if (fd > 0) + CloseTransientFile(fd); + if (buf) + pfree(buf); + return buf_len; +} + diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 5821695..274dfbc 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -57,6 +57,7 @@ #define JUMBLE_SIZE 1024 /* query serialization buffer size */ #define MAX_RESPONSE_BUCKET 10 +#define INVALID_BUCKET_ID -1 #define MAX_REL_LEN 255 #define MAX_BUCKETS 10 #define TEXT_LEN 255 @@ -65,6 +66,18 @@ #define CMD_LST 10 #define CMD_LEN 20 #define APPLICATIONNAME_LEN 100 +#define PGSM_OVER_FLOW_MAX 10 + + +#define MAX_QUERY_BUF (PGSM_QUERY_BUF_SIZE * 1024 * 1024) +#define MAX_BUCKETS_MEM (PGSM_MAX * 1024 * 1024) +#define BUCKETS_MEM_OVERFLOW() ((hash_get_num_entries(pgss_hash) * sizeof(pgssEntry)) >= MAX_BUCKETS_MEM) +//#define MAX_QUERY_BUFFER_BUCKET 200 +#define MAX_QUERY_BUFFER_BUCKET MAX_QUERY_BUF / PGSM_MAX_BUCKETS +#define MAX_BUCKET_ENTRIES (MAX_BUCKETS_MEM / sizeof(pgssEntry)) +#define QUERY_BUFFER_OVERFLOW(x,y) ((x + y + sizeof(uint64) + sizeof(uint64)) > MAX_QUERY_BUFFER_BUCKET) +#define QUERY_MARGIN 100 +#define MIN_QUERY_LEN 10 typedef struct GucVariables { @@ -106,6 +119,18 @@ typedef enum AGG_KEY #define MAX_QUERY_LEN 1024 /* shared nenory storage for the query */ +typedef struct pgssQueryHashKey +{ + uint64 queryid; /* query identifier */ + uint64 bucket_id; /* bucket number */ +} pgssQueryHashKey; + +typedef struct pgssQueryEntry +{ + pgssQueryHashKey key; /* hash key of entry - MUST BE FIRST */ + uint64 pos; /* bucket number */ +} pgssQueryEntry; + typedef struct pgssHashKey { uint64 bucket_id; /* bucket number */ @@ -213,7 +238,6 @@ typedef struct pgssSharedState int64 n_writers; /* number of active writers to query file */ uint64 current_wbucket; uint64 prev_bucket_usec; - uint64 bucket_overflow[MAX_BUCKETS]; uint64 bucket_entry[MAX_BUCKETS]; int64 query_buf_size_bucket; int32 relations[REL_LST]; @@ -228,7 +252,6 @@ do { \ x->n_writers = 0; \ x->current_wbucket = 0; \ x->prev_bucket_usec = 0; \ - memset(&x->bucket_overflow, 0, MAX_BUCKETS * sizeof(uint64)); \ memset(&x->bucket_entry, 0, MAX_BUCKETS * sizeof(uint64)); \ } while(0) @@ -282,11 +305,15 @@ void pgss_shmem_shutdown(int code, Datum arg); int pgsm_get_bucket_size(void); pgssSharedState* pgsm_get_ss(void); HTAB* pgsm_get_hash(void); +HTAB* pgsm_get_query_hash(void); void hash_entry_reset(void); +void hash_query_entry_dealloc(int bucket); void hash_entry_dealloc(int bucket); pgssEntry* hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); Size hash_memsize(void); -pgssEntry* hash_create_query_entry(unsigned int queryid, unsigned int userid, unsigned int dbid, unsigned int bucket_id, unsigned int ip); + +bool hash_find_query_entry(uint64 bucket_id, uint64 queryid); +bool hash_create_query_entry(uint64 bucket_id, uint64 queryid); /* hash_query.c */ void pgss_startup(void);