From 802774a2a771ef2912b04f4fee8bafcca61b0a3b Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Thu, 22 Dec 2022 19:15:14 +0500 Subject: [PATCH] PG-488: Revert pg_stat_monitor: Overflow management. (#338) PG-488: Revert pg_stat_monitor: Overflow management. This patch does not work for < PostgreSQL - 15. More work required. --- hash_query.c | 225 +++++++-------------- pg_stat_monitor.c | 490 ++++++++++++++++++++++++++++++++++++++-------- pg_stat_monitor.h | 44 +++-- 3 files changed, 513 insertions(+), 246 deletions(-) diff --git a/hash_query.c b/hash_query.c index 48e9532..3139716 100644 --- a/hash_query.c +++ b/hash_query.c @@ -16,93 +16,59 @@ */ #include "postgres.h" #include "nodes/pg_list.h" + #include "pg_stat_monitor.h" -static pgsmLocalState pgsmStateLocal; -/* parameter for the shared hash */ - static dshash_parameters dsh_params = { - sizeof(pgssHashKey), - sizeof(pgssEntry), - dshash_memcmp, - dshash_memhash - }; -static void pgsm_proc_exit(int code, Datum arg); +static pgssSharedState *pgss; +static HTAB *pgss_hash; +static HTAB *pgss_query_hash; -static Size -pgsm_query_area_size(void) + +static HTAB * +hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) { - Size sz = MAXALIGN(MAX_QUERY_BUF); - return MAXALIGN(sz); -} + HASHCTL info; -Size -pgsm_ShmemSize(void) - { - Size sz = MAXALIGN(sizeof(pgssSharedState)); - sz = add_size(sz, pgsm_query_area_size()); - sz = add_size(sz, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); - return sz; - } + memset(&info, 0, sizeof(info)); + info.keysize = key_size; + info.entrysize = entry_size; + return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS); +} void pgss_startup(void) { bool found = false; - pgssSharedState *pgss; + /* reset in case this is a restart within the postmaster */ - pgsmStateLocal.dsa = NULL; - pgsmStateLocal.shared_hash = NULL; - pgsmStateLocal.shared_pgssState = NULL; + + pgss = NULL; + pgss_hash = NULL; /* * Create or attach to the shared memory state, including hash table */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pgss = ShmemInitStruct("pg_stat_monitor", pgsm_ShmemSize(), &found); + pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found); if (!found) { /* First time through ... */ - dsa_area *dsa; - dshash_table *dsh; - char *p = (char *) pgss; - pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; SpinLockInit(&pgss->mutex); ResetSharedState(pgss); - /* the allocation of pgssSharedState itself */ - p += MAXALIGN(sizeof(pgssSharedState)); - pgss->raw_dsa_area = p; - dsa = dsa_create_in_place(pgss->raw_dsa_area, - pgsm_query_area_size(), - LWLockNewTrancheId(), 0); - dsa_pin(dsa); - dsa_set_size_limit(dsa, pgsm_query_area_size()); - - pgss->hash_tranche_id = LWLockNewTrancheId(); - - dsh_params.tranche_id = pgss->hash_tranche_id; - dsh = dshash_create(dsa, &dsh_params, 0); - - pgss->hash_handle = dshash_get_hash_table_handle(dsh); - - if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_DISK) - dsa_set_size_limit(dsa, -1); - - pgsmStateLocal.shared_pgssState = pgss; - /* - * Postmaster will never access these again, thus free the local - * dsa/dshash references. - */ - dshash_detach(dsh); - dsa_detach(dsa); } #ifdef BENCHMARK init_hook_stats(); #endif + set_qbuf((unsigned char *) ShmemAlloc(MAX_QUERY_BUF)); + + pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); + pgss_query_hash = hash_init("pg_stat_monitor: queryID hashtable", sizeof(uint64), sizeof(pgssQueryEntry), MAX_BUCKET_ENTRIES); + LWLockRelease(AddinShmemInitLock); /* @@ -112,49 +78,23 @@ pgss_startup(void) on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); } -void -pgsm_attach_shmem(void) -{ - MemoryContext oldcontext; - if (pgsmStateLocal.dsa) - return; - - oldcontext = MemoryContextSwitchTo(TopMemoryContext); - - pgsmStateLocal.dsa = dsa_attach_in_place(pgsmStateLocal.shared_pgssState->raw_dsa_area, - NULL); - dsa_pin_mapping(pgsmStateLocal.dsa); - - dsh_params.tranche_id = pgsmStateLocal.shared_pgssState->hash_tranche_id; - pgsmStateLocal.shared_hash = dshash_attach(pgsmStateLocal.dsa, &dsh_params, - pgsmStateLocal.shared_pgssState->hash_handle, 0); - - on_proc_exit(pgsm_proc_exit, 0); - - MemoryContextSwitchTo(oldcontext); -} - -dsa_area* -get_dsa_area_for_query_text(void) -{ - pgsm_attach_shmem(); - return pgsmStateLocal.dsa; -} - -dshash_table* -get_pgssHash(void) -{ - pgsm_attach_shmem(); - return pgsmStateLocal.shared_hash; -} - pgssSharedState * pgsm_get_ss(void) { - pgsm_attach_shmem(); - return pgsmStateLocal.shared_pgssState; + return pgss; } +HTAB * +pgsm_get_hash(void) +{ + return pgss_hash; +} + +HTAB * +pgsm_get_query_hash(void) +{ + return pgss_query_hash; +} /* * shmem_shutdown hook: Dump statistics into file. @@ -166,24 +106,26 @@ void pgss_shmem_shutdown(int code, Datum arg) { /* Don't try to dump during a crash. */ - elog(LOG,"pgss_shmem_shutdown"); if (code) return; - pgsmStateLocal.shared_pgssState = NULL; + pgss = NULL; /* Safety check ... shouldn't get here unless shmem is set up. */ if (!IsHashInitialize()) return; } -static void -pgsm_proc_exit(int code, Datum arg) +Size +hash_memsize(void) { - Assert(pgsmStateLocal.dsa); - dshash_detach(pgsmStateLocal.shared_hash); - pgsmStateLocal.shared_hash = NULL; - dsa_detach(pgsmStateLocal.dsa); - pgsmStateLocal.dsa = NULL; + Size size; + + size = MAXALIGN(sizeof(pgssSharedState)); + size += MAXALIGN(MAX_QUERY_BUF); + size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); + size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssQueryEntry))); + + return size; } pgssEntry * @@ -192,9 +134,13 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) pgssEntry *entry = NULL; bool found = false; + if (hash_get_num_entries(pgss_hash) >= MAX_BUCKET_ENTRIES) + { + elog(DEBUG1, "pg_stat_monitor: out of memory"); + return NULL; + } /* Find or create an entry with desired hash code */ - entry = (pgssEntry *) dshash_find_or_insert(pgsmStateLocal.shared_hash, key, &found); - // entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); + entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); if (entry == NULL) elog(DEBUG1, "hash_entry_alloc: OUT OF MEMORY"); else if (!found) @@ -209,7 +155,6 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) /* ... and don't forget the query text metadata */ entry->encoding = encoding; } - dshash_release_lock(pgsmStateLocal.shared_hash, entry); return entry; } @@ -229,22 +174,17 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer) { - dshash_seq_status hstat; + HASH_SEQ_STATUS hash_seq; pgssEntry *entry = NULL; + /* Store pending query ids from the previous bucket. */ List *pending_entries = NIL; ListCell *pending_entry; - if (!pgsmStateLocal.shared_hash) - return; - /* Iterate over the hash table. */ - dshash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); - - while ((entry = dshash_seq_next(&hstat)) != NULL) + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) { - dsa_pointer pdsa; - /* * Remove all entries if new_bucket_id == -1. Otherwise remove entry * in new_bucket_id if it has finished already. @@ -253,14 +193,16 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) { - pdsa = entry->query_pos; - dsa_pointer parent_qdsa = entry->counters.info.parent_query; - dshash_delete_current(&hstat); - dsa_free(pgsmStateLocal.dsa, pdsa); - - if (DsaPointerIsValid(parent_qdsa)) - dsa_free(pgsmStateLocal.dsa, parent_qdsa); + if (new_bucket_id == -1) + { + /* + * pg_stat_monitor_reset(), remove entry from query hash table + * too. + */ + hash_search(pgss_query_hash, &(entry->key.queryid), HASH_REMOVE, NULL); + } + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } /* @@ -296,11 +238,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu if (entry->counters.calls.calls > 1) entry->counters.state = PGSS_FINISHED; else - { - pdsa = entry->query_pos; - dshash_delete_current(&hstat); - dsa_free(pgsmStateLocal.dsa, pdsa); - } + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); continue; } @@ -328,15 +266,11 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu if (entry->counters.calls.calls > 1) entry->counters.state = PGSS_FINISHED; else - { - pdsa = entry->query_pos; - dshash_delete_current(&hstat); - dsa_free(pgsmStateLocal.dsa, pdsa); - } + entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } } } - dshash_seq_term(&hstat); + /* * Iterate over the list of pending queries in order to add them back to * the hash table with the updated bucket id. @@ -347,8 +281,7 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu pgssEntry *new_entry; pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry); - - new_entry = (pgssEntry *) dshash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found); + new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found); if (new_entry == NULL) elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); else if (!found) @@ -359,9 +292,8 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu new_entry->encoding = old_entry->encoding; new_entry->query_pos = old_entry->query_pos; } - free(old_entry); - dshash_release_lock(pgsmStateLocal.shared_hash, entry); + free(old_entry); } list_free(pending_entries); @@ -374,22 +306,16 @@ void hash_entry_reset() { pgssSharedState *pgss = pgsm_get_ss(); - dshash_seq_status hstat; + HASH_SEQ_STATUS hash_seq; pgssEntry *entry; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - dshash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); - - while ((entry = dshash_seq_next(&hstat)) != NULL) + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) { - dsa_pointer pdsa = entry->query_pos; - dshash_delete_current(&hstat); - dsa_free(pgsmStateLocal.dsa, pdsa); + hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); } - - dshash_seq_term(&hstat); - pg_atomic_write_u64(&pgss->current_wbucket, 0); LWLockRelease(pgss->lock); } @@ -397,5 +323,6 @@ hash_entry_reset() bool IsHashInitialize(void) { - return (pgsmStateLocal.shared_pgssState != NULL); + return (pgss != NULL && + pgss_hash != NULL); } diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index d4b4ca9..88ccada 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -32,6 +32,7 @@ PGSM_V2_0 } pgsmVersion; + PG_MODULE_MAGIC; #define BUILD_VERSION "2.0.0-dev" @@ -77,7 +78,6 @@ static int plan_nested_level = 0; /* The array to store outer layer query id*/ uint64 *nested_queryids; -char **nested_query_txts; /* Regex object used to extract query comments. */ static regex_t preg_query_comments; @@ -88,11 +88,13 @@ static struct rusage rusage_start; static struct rusage rusage_end; /* Query buffer, store queries' text. */ +static unsigned char *pgss_qbuf = NULL; static char *pgss_explain(QueryDesc *queryDesc); static void extract_query_comments(const char *query, char *comments, size_t max_len); static int get_histogram_bucket(double q_time); static bool IsSystemInitialized(void); +static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len); static double time_diff(struct timeval end, struct timeval start); static void request_additional_shared_resources(void); @@ -227,6 +229,7 @@ static uint64 djb2_hash(unsigned char *str, size_t len); /* Same as above, but stores the calculated string length into *out_len (small optimization) */ static uint64 djb2_hash_str(unsigned char *str, int *out_len); + /* * Module load callback */ @@ -235,6 +238,7 @@ void _PG_init(void) { int rc; + char file_name[1024]; elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); @@ -261,6 +265,8 @@ _PG_init(void) EnableQueryId(); #endif + snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); + unlink(file_name); EmitWarningsOnPlaceholders("pg_stat_monitor"); @@ -306,7 +312,6 @@ _PG_init(void) ExecutorCheckPerms_hook = HOOK(pgss_ExecutorCheckPerms); nested_queryids = (uint64 *) malloc(sizeof(uint64) * max_stack_depth); - nested_query_txts = (char **) malloc(sizeof(char*) * max_stack_depth); system_init = true; } @@ -329,7 +334,6 @@ _PG_fini(void) emit_log_hook = prev_emit_log_hook; free(nested_queryids); - free(nested_query_txts); regfree(&preg_query_comments); hash_entry_reset(); @@ -358,7 +362,7 @@ request_additional_shared_resources(void) * the postmaster process.) We'll allocate or attach to the shared * resources in pgss_shmem_startup(). */ - RequestAddinShmemSpace(pgsm_ShmemSize() + HOOK_STATS_SIZE); + RequestAddinShmemSpace(hash_memsize() + HOOK_STATS_SIZE); RequestNamedLWLockTranche("pg_stat_monitor", 1); } /* @@ -549,11 +553,7 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) - { nested_queryids[exec_nested_level] = queryDesc->plannedstmt->queryId; - nested_query_txts[exec_nested_level] = strdup(queryDesc->sourceText); - } - exec_nested_level++; PG_TRY(); { @@ -563,23 +563,13 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, standard_ExecutorRun(queryDesc, direction, count, execute_once); exec_nested_level--; if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) - { nested_queryids[exec_nested_level] = UINT64CONST(0); - if(nested_query_txts[exec_nested_level]) - free(nested_query_txts[exec_nested_level]); - nested_query_txts[exec_nested_level] = NULL; - } } PG_CATCH(); { exec_nested_level--; if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) - { nested_queryids[exec_nested_level] = UINT64CONST(0); - if(nested_query_txts[exec_nested_level]) - free(nested_query_txts[exec_nested_level]); - nested_query_txts[exec_nested_level] = NULL; - } PG_RE_THROW(); } PG_END_TRY(); @@ -1269,29 +1259,11 @@ pgss_update_entry(pgssEntry *entry, if (exec_nested_level > 0) { if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) - { - int parent_query_len = nested_query_txts[exec_nested_level - 1]? - strlen(nested_query_txts[exec_nested_level - 1]): 0; e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; - if (parent_query_len > 0) - { - char *qry_buff; - dsa_area *query_dsa_area = get_dsa_area_for_query_text(); - dsa_pointer qry = dsa_allocate(query_dsa_area, parent_query_len+1); - qry_buff = dsa_get_address(query_dsa_area, qry); - memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); - qry_buff[parent_query_len] = 0; - e->counters.info.parent_query = qry; - } - else - e->counters.info.parent_query = InvalidDsaPointer; - - } } else { e->counters.info.parentid = UINT64CONST(0); - e->counters.info.parent_query = InvalidDsaPointer; } if (error_info) @@ -1408,6 +1380,7 @@ pgss_store(uint64 queryid, JumbleState *jstate, pgssStoreKind kind) { + HTAB *pgss_hash; pgssHashKey key; pgssEntry *entry; pgssSharedState *pgss = pgsm_get_ss(); @@ -1512,15 +1485,19 @@ pgss_store(uint64 queryid, #else key.toplevel = ((exec_nested_level + plan_nested_level) == 0); #endif + pgss_hash = pgsm_get_hash(); LWLockAcquire(pgss->lock, LW_SHARED); - entry = (pgssEntry *) dshash_find(get_pgssHash(), &key, false); + entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); if (!entry) { - dsa_pointer dsa_query_pointer; - char* query_buff; + pgssQueryEntry *query_entry; + bool query_found = false; + uint64 prev_qbuf_len = 0; + HTAB *pgss_query_hash; + pgss_query_hash = pgsm_get_query_hash(); /* * Create a new, normalized query string if caller asked. We don't @@ -1531,41 +1508,74 @@ pgss_store(uint64 queryid, */ if (jstate && PGSM_NORMALIZED_QUERY) { + LWLockRelease(pgss->lock); norm_query = generate_normalized_query(jstate, query, query_location, &query_len, GetDatabaseEncoding()); + LWLockAcquire(pgss->lock, LW_SHARED); } - /* New query, truncate length if necessary. */ - if (query_len > PGSM_QUERY_MAX_LEN) - query_len = PGSM_QUERY_MAX_LEN; + query_entry = hash_search(pgss_query_hash, &queryid, HASH_ENTER_NULL, &query_found); + if (query_entry == NULL) + { + LWLockRelease(pgss->lock); + if (norm_query) + pfree(norm_query); + elog(DEBUG1, "pgss_store: out of memory (pgss_query_hash)."); + return; + } + else if (!query_found) + { + /* New query, truncate length if necessary. */ + if (query_len > PGSM_QUERY_MAX_LEN) + query_len = PGSM_QUERY_MAX_LEN; + } /* Need exclusive lock to make a new hashtable entry - promote */ LWLockRelease(pgss->lock); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - /* Save the query text in raw dsa area */ - dsa_area* query_dsa_area = get_dsa_area_for_query_text(); - dsa_query_pointer = dsa_allocate(query_dsa_area, query_len+1); - query_buff = dsa_get_address(query_dsa_area, dsa_query_pointer); - memcpy(query_buff, norm_query ? norm_query : query, query_len); - query_buff[query_len] = 0; + if (!query_found) + { + if (!SaveQueryText(bucketid, + queryid, + pgss_qbuf, + norm_query ? norm_query : query, + query_len, + &query_entry->query_pos)) + { + LWLockRelease(pgss->lock); + if (norm_query) + pfree(norm_query); + elog(DEBUG1, "pgss_store: insufficient shared space for query."); + return; + } + + /* + * Save current query buffer length, if we fail to add a new new + * entry to the hash table then we must restore the original + * length. + */ + memcpy(&prev_qbuf_len, pgss_qbuf, sizeof(prev_qbuf_len)); + } /* OK to create a new hashtable entry */ entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); if (entry == NULL) { + if (!query_found) + { + /* Restore previous query buffer length. */ + memcpy(pgss_qbuf, &prev_qbuf_len, sizeof(prev_qbuf_len)); + } LWLockRelease(pgss->lock); if (norm_query) pfree(norm_query); return; } - entry->query_pos = dsa_query_pointer; + entry->query_pos = query_entry->query_pos; } - else - dshash_release_lock(get_pgssHash(), entry); - if (jstate == NULL) pgss_update_entry(entry, /* entry */ @@ -1608,6 +1618,9 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_entry_dealloc(-1, -1, NULL); + /* Reset query buffer. */ + *(uint64 *) pgss_qbuf = 0; + LWLockRelease(pgss->lock); PG_RETURN_VOID(); } @@ -1664,12 +1677,13 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - dshash_seq_status hstat; + HASH_SEQ_STATUS hash_seq; pgssEntry *entry; char parentid_txt[32]; pgssSharedState *pgss = pgsm_get_ss(); - char *query_txt = NULL; - char *parent_query_txt = NULL; + HTAB *pgss_hash = pgsm_get_hash(); + char *query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1); + char *parent_query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1); int expected_columns = (api_version >= PGSM_V2_0)?PG_STAT_MONITOR_COLS_V2_0:PG_STAT_MONITOR_COLS_V1_0; /* Safety check... */ @@ -1707,11 +1721,10 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, MemoryContextSwitchTo(oldcontext); - // LWLockAcquire(pgss->lock, LW_SHARED); + LWLockAcquire(pgss->lock, LW_SHARED); - dshash_seq_init(&hstat, get_pgssHash(), false); - - while ((entry = dshash_seq_next(&hstat)) != NULL) + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) { Datum values[PG_STAT_MONITOR_COLS] = {0}; bool nulls[PG_STAT_MONITOR_COLS] = {0}; @@ -1726,8 +1739,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 userid = entry->key.userid; int64 ip = entry->key.ip; uint64 planid = entry->key.planid; - dsa_area *query_dsa_area; - char *query_ptr; #if PG_VERSION_NUM < 140000 bool toplevel = 1; bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); @@ -1735,10 +1746,15 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); bool toplevel = entry->key.toplevel; #endif - /* Load the query text from dsa area */ - query_dsa_area = get_dsa_area_for_query_text(); - query_ptr = dsa_get_address(query_dsa_area, entry->query_pos); - query_txt = pstrdup(query_ptr); + + if (read_query(pgss_qbuf, queryid, query_txt, entry->query_pos) == 0) + { + int rc; + + rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); + if (rc != 1) + snprintf(query_txt, 32, "%s", ""); + } /* copy counters to a local variable to keep locking time short */ { @@ -1766,17 +1782,15 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.state == PGSS_PARSE || tmp.state == PGSS_PLAN) continue; - /* read the parent query text if any */ if (tmp.info.parentid != UINT64CONST(0)) { - if (DsaPointerIsValid(tmp.info.parent_query)) + if (read_query(pgss_qbuf, tmp.info.parentid, parent_query_txt, 0) == 0) { - query_dsa_area = get_dsa_area_for_query_text(); - query_ptr = dsa_get_address(query_dsa_area, tmp.info.parent_query); - parent_query_txt = pstrdup(query_ptr); + int rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); + + if (rc != 1) + snprintf(parent_query_txt, 32, "%s", ""); } - else - parent_query_txt = pstrdup("parent query text not available"); } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -2056,12 +2070,10 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ - dshash_seq_term(&hstat); + LWLockRelease(pgss->lock); - if(query_txt) - pfree(query_txt); - if(parent_query_txt) - pfree(parent_query_txt); + pfree(query_txt); + pfree(parent_query_txt); tuplestore_donestoring(tupstore); } @@ -2107,6 +2119,7 @@ get_next_wbucket(pgssSharedState *pgss) if (update_bucket) { + char file_name[1024]; new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; @@ -2114,7 +2127,24 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL); + hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); + + if (pgss->overflow) + { + pgss->n_bucket_cycles += 1; + if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS) + { + /* + * A full rotation of PGSM_MAX_BUCKETS buckets happened since + * we detected a query buffer overflow. + * Reset overflow state and remove the dump file. + */ + pgss->overflow = false; + pgss->n_bucket_cycles = 0; + snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); + unlink(file_name); + } + } LWLockRelease(pgss->lock); @@ -3113,6 +3143,165 @@ intarray_get_datum(int32 arr[], int len) } +uint64 +read_query(unsigned char *buf, uint64 queryid, char *query, size_t pos) +{ + bool found = false; + uint64 query_id = 0; + uint64 query_len = 0; + uint64 rlen = 0; + uint64 buf_len = 0; + + memcpy(&buf_len, buf, sizeof(uint64)); + if (buf_len <= 0) + goto exit; + + /* If a position hint is given, try to locate the query directly. */ + if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len) + { + memcpy(&query_id, &buf[pos], sizeof(uint64)); + if (query_id != queryid) + return 0; + + pos += sizeof(uint64); + + memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */ + pos += sizeof(uint64); + + if (pos + query_len > buf_len) /* avoid reading past buffer's length. */ + return 0; + + memcpy(query, &buf[pos], query_len); /* Actual query */ + query[query_len] = '\0'; + + return queryid; + } + + rlen = sizeof(uint64); /* Move forwad to skip length bytes */ + for (;;) + { + if (rlen >= buf_len) + goto exit; + + memcpy(&query_id, &buf[rlen], sizeof(uint64)); /* query id */ + if (query_id == queryid) + found = true; + + rlen += sizeof(uint64); + if (buf_len <= rlen) + continue; + + memcpy(&query_len, &buf[rlen], sizeof(uint64)); /* query len */ + rlen += sizeof(uint64); + if (buf_len < rlen + query_len) + goto exit; + if (found) + { + if (query != NULL) + { + memcpy(query, &buf[rlen], query_len); /* Actual query */ + query[query_len] = 0; + } + return query_id; + } + rlen += query_len; + } +exit: + if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) + { + sprintf(query, "%s", ""); + return -1; + } + return 0; +} + +bool +SaveQueryText(uint64 bucketid, + uint64 queryid, + unsigned char *buf, + const char *query, + uint64 query_len, + size_t *query_pos) +{ + uint64 buf_len = 0; + + memcpy(&buf_len, buf, sizeof(uint64)); + if (buf_len == 0) + buf_len += sizeof(uint64); + + if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) + { + switch (PGSM_OVERFLOW_TARGET) + { + case OVERFLOW_TARGET_NONE: + return false; + case OVERFLOW_TARGET_DISK: + { + bool dump_ok; + pgssSharedState *pgss = pgsm_get_ss(); + + if (pgss->overflow) + { + elog(DEBUG1, "query buffer overflowed twice"); + return false; + } + + /* + * If the query buffer is empty, there is nothing to dump, + * this also means that the current query length exceeds + * MAX_QUERY_BUF. + */ + if (buf_len <= sizeof(uint64)) + return false; + + dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF); + buf_len = sizeof(uint64); + + if (dump_ok) + { + pgss->overflow = true; + pgss->n_bucket_cycles = 0; + } + + /* + * We must check for overflow again, as the query length + * may exceed the total size allocated to the buffer + * (MAX_QUERY_BUF). + */ + if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) + { + /* + * If we successfully dumped the query buffer to disk, + * then reset the buffer, otherwise we could end up + * dumping the same buffer again. + */ + if (dump_ok) + *(uint64 *) buf = 0; + + return false; + } + + } + break; + default: + Assert(false); + break; + } + } + + *query_pos = buf_len; + + memcpy(&buf[buf_len], &queryid, sizeof(uint64)); /* query id */ + buf_len += sizeof(uint64); + + memcpy(&buf[buf_len], &query_len, sizeof(uint64)); /* query length */ + buf_len += sizeof(uint64); + + memcpy(&buf[buf_len], query, query_len); /* query */ + buf_len += query_len; + memcpy(buf, &buf_len, sizeof(uint64)); + return true; +} Datum pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS) @@ -3120,6 +3309,12 @@ pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS) return (Datum) 0; } +void +set_qbuf(unsigned char *buf) +{ + pgss_qbuf = buf; + *(uint64 *) pgss_qbuf = 0; +} void pgsm_emit_log_hook(ErrorData *edata) @@ -3156,6 +3351,145 @@ IsSystemInitialized(void) return (system_init && IsHashInitialize()); } +static bool +dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) +{ + int fd = 0; + char file_name[1024]; + bool success = true; + int off = 0; + int tries = 0; + + snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); + fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); + if (fd < 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + file_name))); + return false; + } + + /* Loop until write buf_len bytes to the file. */ + do + { + ssize_t nwrite = write(fd, buf + off, buf_len - off); + + if (nwrite == -1) + { + if (errno == EINTR && tries++ < 3) + continue; + + success = false; + break; + } + off += nwrite; + } while (off < buf_len); + + if (!success) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", file_name))); + + if (fd > 0) + CloseTransientFile(fd); + + return success; +} + +/* + * Try to locate query text in a dumped file for bucket_id. + * + * Returns: + * 1 Query sucessfully read, query_text will contain the query text. + * 0 Query not found. + * -1 I/O Error. + */ +int +read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) +{ + int fd = 0; + char file_name[1024]; + unsigned char *buf = NULL; + ssize_t nread = 0; + int off = 0; + int tries = 0; + bool done = false; + bool found = false; + + snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); + fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); + if (fd < 0) + goto exit; + + buf = (unsigned char *) palloc(MAX_QUERY_BUF); + while (!done) + { + off = 0; + /* read a chunck of MAX_QUERY_BUF size. */ + do + { + nread = read(fd, buf + off, MAX_QUERY_BUF - off); + if (nread == -1) + { + if (errno == EINTR && tries++ < 3) /* read() was interrupted, + * attempt to read again + * (max attempts=3) */ + continue; + + goto exit; + } + else if (nread == 0) /* EOF */ + { + done = true; + break; + } + + off += nread; + } while (off < MAX_QUERY_BUF); + + if (off == MAX_QUERY_BUF) + { + /* we have a chunck, scan it looking for queryid. */ + if (read_query(buf, queryid, query_txt, pos) != 0) + { + + found = true; + /* query was found, don't need to read another chunck. */ + break; + } + } + else + + /* + * Either done=true or file has a size not multiple of + * MAX_QUERY_BUF. It is safe to assume that the file was truncated + * or corrupted. + */ + break; + } + +exit: + if (fd < 0 || nread == -1) + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + file_name))); + + if (fd >= 0) + CloseTransientFile(fd); + + if (buf) + pfree(buf); + + if (found) + return 1; + else if (fd == -1 || nread == -1) + return -1; /* I/O error. */ + else + return 0; /* Not found. */ +} static double time_diff(struct timeval end, struct timeval start) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 78d8183..dd66c9d 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -27,9 +27,6 @@ #include #include -#include "lib/dshash.h" -#include "utils/dsa.h" - #include "access/hash.h" #include "catalog/pg_authid.h" #include "executor/instrument.h" @@ -182,6 +179,20 @@ typedef struct CallTime double sum_var_time; /* sum of variances in execution time in msec */ } CallTime; +/* + * Entry type for queries hash table (query ID). + * + * We use a hash table to keep track of query IDs that have their + * corresponding query text added to the query buffer (pgsm_query_shared_buffer). + * + * This allow us to avoid adding duplicated queries to the buffer, therefore + * leaving more space for other queries and saving some CPU. + */ +typedef struct pgssQueryEntry +{ + uint64 queryid; /* query identifier, also the key. */ + size_t query_pos; /* query location within query buffer */ +} pgssQueryEntry; typedef struct PlanInfo { @@ -205,7 +216,6 @@ typedef struct pgssHashKey typedef struct QueryInfo { uint64 parentid; /* parent queryid of current query */ - dsa_pointer parent_query; int64 type; /* type of query, options are query, info, * warning, error, fatal */ char application_name[APPLICATIONNAME_LEN]; @@ -312,7 +322,7 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ - dsa_pointer query_pos; /* query location within query buffer */ + size_t query_pos; /* query location within query buffer */ } pgssEntry; /* @@ -343,19 +353,10 @@ typedef struct pgssSharedState * This allows us to avoid having a large file on disk that would also * slowdown queries to the pg_stat_monitor view. */ + bool overflow; size_t n_bucket_cycles; - int hash_tranche_id; - void *raw_dsa_area; - dshash_table_handle hash_handle; } pgssSharedState; -typedef struct pgsmLocalState -{ - pgssSharedState *shared_pgssState; - dsa_area *dsa; - dshash_table *shared_hash; -}pgsmLocalState; - #define ResetSharedState(x) \ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ @@ -417,22 +418,27 @@ void init_guc(void); GucVariable *get_conf(int i); /* hash_create.c */ -dsa_area *get_dsa_area_for_query_text(void); -dshash_table *get_pgssHash(void); -void pgsm_attach_shmem(void); bool IsHashInitialize(void); void pgss_shmem_startup(void); void pgss_shmem_shutdown(int code, Datum arg); int pgsm_get_bucket_size(void); pgssSharedState *pgsm_get_ss(void); +HTAB *pgsm_get_plan_hash(void); +HTAB *pgsm_get_hash(void); +HTAB *pgsm_get_query_hash(void); +HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer); pgssEntry *hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); -Size pgsm_ShmemSize(void); +Size hash_memsize(void); + +int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); +uint64 read_query(unsigned char *buf, uint64 queryid, char *query, size_t pos); void pgss_startup(void); +void set_qbuf(unsigned char *); /* hash_query.c */ void pgss_startup(void);