diff --git a/hash_query.c b/hash_query.c index 3139716..25a7b8a 100644 --- a/hash_query.c +++ b/hash_query.c @@ -16,59 +16,110 @@ */ #include "postgres.h" #include "nodes/pg_list.h" - #include "pg_stat_monitor.h" +static pgsmLocalState pgsmStateLocal; +static PGSM_HASH_TABLE_HANDLE pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa); +static Size pgsm_get_shared_area_size(void); -static pgssSharedState *pgss; -static HTAB *pgss_hash; -static HTAB *pgss_query_hash; +#if USE_DYNAMIC_HASH +/* parameter for the shared hash */ +static dshash_parameters dsh_params = { + sizeof(pgssHashKey), + sizeof(pgssEntry), + dshash_memcmp, + dshash_memhash +}; +#endif - -static HTAB * -hash_init(const char *hash_name, int key_size, int entry_size, int hash_size) +static Size +pgsm_query_area_size(void) { - HASHCTL info; + Size sz = MAX_QUERY_BUF; + #if USE_DYNAMIC_HASH + /* Dynamic hash also lives DSA area */ + sz = add_size(sz, MAX_BUCKETS_MEM); + #endif + return MAXALIGN(sz); +} - memset(&info, 0, sizeof(info)); - info.keysize = key_size; - info.entrysize = entry_size; - return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS); +Size +pgsm_ShmemSize(void) +{ + Size sz = MAXALIGN(sizeof(pgssSharedState)); + sz = add_size(sz, MAX_QUERY_BUF); + #if USE_DYNAMIC_HASH + sz = add_size(sz, MAX_BUCKETS_MEM); + #else + sz = add_size(sz, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); + #endif + return MAXALIGN(sz); +} + +static Size +pgsm_get_shared_area_size(void) +{ + Size sz; + #if USE_DYNAMIC_HASH + sz = pgsm_ShmemSize(); + #else + sz = MAXALIGN(sizeof(pgssSharedState)); + sz = add_size(sz, pgsm_query_area_size()); + #endif + return sz; } void pgss_startup(void) { bool found = false; - + pgssSharedState *pgss; /* reset in case this is a restart within the postmaster */ - - pgss = NULL; - pgss_hash = NULL; + pgsmStateLocal.dsa = NULL; + pgsmStateLocal.shared_hash = NULL; + pgsmStateLocal.shared_pgssState = NULL; /* * Create or attach to the shared memory state, including hash table */ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found); + pgss = ShmemInitStruct("pg_stat_monitor", pgsm_get_shared_area_size(), &found); if (!found) { /* First time through ... */ + dsa_area *dsa; + char *p = (char *) pgss; + pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock; SpinLockInit(&pgss->mutex); ResetSharedState(pgss); + /* the allocation of pgssSharedState itself */ + p += MAXALIGN(sizeof(pgssSharedState)); + pgss->raw_dsa_area = p; + dsa = dsa_create_in_place(pgss->raw_dsa_area, + pgsm_query_area_size(), + LWLockNewTrancheId(), 0); + dsa_pin(dsa); + dsa_set_size_limit(dsa, pgsm_query_area_size()); + + pgss->hash_handle = pgsm_create_bucket_hash(pgss,dsa); + + if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_DISK) + dsa_set_size_limit(dsa, -1); + + pgsmStateLocal.shared_pgssState = pgss; + /* + * Postmaster will never access these again, thus free the local + * dsa/dshash references. + */ + dsa_detach(dsa); } #ifdef BENCHMARK init_hook_stats(); #endif - set_qbuf((unsigned char *) ShmemAlloc(MAX_QUERY_BUF)); - - pgss_hash = hash_init("pg_stat_monitor: bucket hashtable", sizeof(pgssHashKey), sizeof(pgssEntry), MAX_BUCKET_ENTRIES); - pgss_query_hash = hash_init("pg_stat_monitor: queryID hashtable", sizeof(uint64), sizeof(pgssQueryEntry), MAX_BUCKET_ENTRIES); - LWLockRelease(AddinShmemInitLock); /* @@ -78,23 +129,73 @@ pgss_startup(void) on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); } +static PGSM_HASH_TABLE_HANDLE +pgsm_create_bucket_hash(pgssSharedState *pgss, dsa_area *dsa) +{ + PGSM_HASH_TABLE_HANDLE bucket_hash; + +#if USE_DYNAMIC_HASH + dshash_table *dsh; + pgss->hash_tranche_id = LWLockNewTrancheId(); + dsh_params.tranche_id = pgss->hash_tranche_id; + dsh = dshash_create(dsa, &dsh_params, 0); + bucket_hash = dshash_get_hash_table_handle(dsh); + dshash_detach(dsh); +#else + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(pgssHashKey); + info.entrysize = sizeof(pgssEntry); + bucket_hash = ShmemInitHash("pg_stat_monitor: bucket hashtable", MAX_BUCKET_ENTRIES, MAX_BUCKET_ENTRIES, &info, HASH_ELEM | HASH_BLOBS); +#endif + return bucket_hash; +} + +void +pgsm_attach_shmem(void) +{ + MemoryContext oldcontext; + if (pgsmStateLocal.dsa) + return; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + pgsmStateLocal.dsa = dsa_attach_in_place(pgsmStateLocal.shared_pgssState->raw_dsa_area, + NULL); + dsa_pin_mapping(pgsmStateLocal.dsa); + +#if USE_DYNAMIC_HASH + dsh_params.tranche_id = pgsmStateLocal.shared_pgssState->hash_tranche_id; + pgsmStateLocal.shared_hash = dshash_attach(pgsmStateLocal.dsa, &dsh_params, + pgsmStateLocal.shared_pgssState->hash_handle, 0); +#else + pgsmStateLocal.shared_hash = pgsmStateLocal.shared_pgssState->hash_handle; +#endif + + MemoryContextSwitchTo(oldcontext); +} + +dsa_area* +get_dsa_area_for_query_text(void) +{ + pgsm_attach_shmem(); + return pgsmStateLocal.dsa; +} + +PGSM_HASH_TABLE* +get_pgssHash(void) +{ + pgsm_attach_shmem(); + return pgsmStateLocal.shared_hash; +} + pgssSharedState * pgsm_get_ss(void) { - return pgss; + pgsm_attach_shmem(); + return pgsmStateLocal.shared_pgssState; } -HTAB * -pgsm_get_hash(void) -{ - return pgss_hash; -} - -HTAB * -pgsm_get_query_hash(void) -{ - return pgss_query_hash; -} /* * shmem_shutdown hook: Dump statistics into file. @@ -106,41 +207,23 @@ void pgss_shmem_shutdown(int code, Datum arg) { /* Don't try to dump during a crash. */ + elog(LOG,"pgss_shmem_shutdown"); if (code) return; - pgss = NULL; + pgsmStateLocal.shared_pgssState = NULL; /* Safety check ... shouldn't get here unless shmem is set up. */ if (!IsHashInitialize()) return; } -Size -hash_memsize(void) -{ - Size size; - - size = MAXALIGN(sizeof(pgssSharedState)); - size += MAXALIGN(MAX_QUERY_BUF); - size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssEntry))); - size = add_size(size, hash_estimate_size(MAX_BUCKET_ENTRIES, sizeof(pgssQueryEntry))); - - return size; -} - pgssEntry * hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) { pgssEntry *entry = NULL; bool found = false; - - if (hash_get_num_entries(pgss_hash) >= MAX_BUCKET_ENTRIES) - { - elog(DEBUG1, "pg_stat_monitor: out of memory"); - return NULL; - } /* Find or create an entry with desired hash code */ - entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER_NULL, &found); + entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, key, &found); if (entry == NULL) elog(DEBUG1, "hash_entry_alloc: OUT OF MEMORY"); else if (!found) @@ -149,12 +232,19 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) /* New entry, initialize it */ /* reset the statistics */ memset(&entry->counters, 0, sizeof(Counters)); + entry->query_pos = InvalidDsaPointer; + entry->counters.info.parent_query = InvalidDsaPointer; + /* set the appropriate initial usage count */ /* re-initialize the mutex each time ... we assume no one using it */ SpinLockInit(&entry->mutex); /* ... and don't forget the query text metadata */ entry->encoding = encoding; } + #if USE_DYNAMIC_HASH + if(entry) + dshash_release_lock(pgsmStateLocal.shared_hash, entry); + #endif return entry; } @@ -174,17 +264,22 @@ hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding) void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer) { - HASH_SEQ_STATUS hash_seq; + PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry = NULL; - /* Store pending query ids from the previous bucket. */ List *pending_entries = NIL; ListCell *pending_entry; + if (!pgsmStateLocal.shared_hash) + return; + /* Iterate over the hash table. */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); + + while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) { + dsa_pointer pdsa; + /* * Remove all entries if new_bucket_id == -1. Otherwise remove entry * in new_bucket_id if it has finished already. @@ -193,16 +288,17 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu (entry->key.bucket_id == new_bucket_id && (entry->counters.state == PGSS_FINISHED || entry->counters.state == PGSS_ERROR))) { - if (new_bucket_id == -1) - { - /* - * pg_stat_monitor_reset(), remove entry from query hash table - * too. - */ - hash_search(pgss_query_hash, &(entry->key.queryid), HASH_REMOVE, NULL); - } + dsa_pointer parent_qdsa = entry->counters.info.parent_query; + pdsa = entry->query_pos; - entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); + + if (DsaPointerIsValid(pdsa)) + dsa_free(pgsmStateLocal.dsa, pdsa); + + if (DsaPointerIsValid(parent_qdsa)) + dsa_free(pgsmStateLocal.dsa, parent_qdsa); + continue; } /* @@ -238,7 +334,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu if (entry->counters.calls.calls > 1) entry->counters.state = PGSS_FINISHED; else - entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + { + pdsa = entry->query_pos; + pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); + if (DsaPointerIsValid(pdsa)) + dsa_free(pgsmStateLocal.dsa, pdsa); + } continue; } @@ -266,11 +367,17 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu if (entry->counters.calls.calls > 1) entry->counters.state = PGSS_FINISHED; else - entry = hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + { + pdsa = entry->query_pos; + pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); + /* We should not delete the Query in DSA here + * as the same will get reused when the entry gets inserted into new bucket + */ + } } } } - + pgsm_hash_seq_term(&hstat); /* * Iterate over the list of pending queries in order to add them back to * the hash table with the updated bucket id. @@ -281,7 +388,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu pgssEntry *new_entry; pgssEntry *old_entry = (pgssEntry *) lfirst(pending_entry); - new_entry = (pgssEntry *) hash_search(pgss_hash, &old_entry->key, HASH_ENTER_NULL, &found); + + PGSM_DISABLE_ERROR_CAPUTRE(); + { + new_entry = (pgssEntry*) pgsm_hash_find_or_insert(pgsmStateLocal.shared_hash, &old_entry->key, &found); + }PGSM_END_DISABLE_ERROR_CAPTURE(); + if (new_entry == NULL) elog(DEBUG1, "%s", "pg_stat_monitor: out of memory"); else if (!found) @@ -292,10 +404,12 @@ hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_bu new_entry->encoding = old_entry->encoding; new_entry->query_pos = old_entry->query_pos; } - + #if USE_DYNAMIC_HASH + if(new_entry) + dshash_release_lock(pgsmStateLocal.shared_hash, new_entry); + #endif free(old_entry); } - list_free(pending_entries); } @@ -306,16 +420,23 @@ void hash_entry_reset() { pgssSharedState *pgss = pgsm_get_ss(); - HASH_SEQ_STATUS hash_seq; + PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry; LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + pgsm_hash_seq_init(&hstat, pgsmStateLocal.shared_hash, true); + + while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) { - hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL); + dsa_pointer pdsa = entry->query_pos; + pgsm_hash_delete_current(&hstat, pgsmStateLocal.shared_hash, &entry->key); + if (DsaPointerIsValid(pdsa)) + dsa_free(pgsmStateLocal.dsa, pdsa); } + + pgsm_hash_seq_term(&hstat); + pg_atomic_write_u64(&pgss->current_wbucket, 0); LWLockRelease(pgss->lock); } @@ -323,6 +444,67 @@ hash_entry_reset() bool IsHashInitialize(void) { - return (pgss != NULL && - pgss_hash != NULL); + return (pgsmStateLocal.shared_pgssState != NULL); } + +/* hash function port based on USE_DYNAMIC_HASH */ + +void * +pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found) +{ + #if USE_DYNAMIC_HASH + void *entry; + entry = dshash_find_or_insert(shared_hash, key, found); + return entry; + #else + return hash_search(shared_hash, key, HASH_ENTER_NULL, found); + #endif +} + +void * +pgsm_hash_find(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found) +{ + #if USE_DYNAMIC_HASH + return dshash_find(shared_hash, key, false); + #else + return hash_search(shared_hash, key, HASH_FIND, found); + #endif +} + +void +pgsm_hash_seq_init(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, bool lock) +{ +#if USE_DYNAMIC_HASH + dshash_seq_init(hstat, shared_hash, lock); +#else + hash_seq_init(hstat, shared_hash); +#endif +} + +void* +pgsm_hash_seq_next(PGSM_HASH_SEQ_STATUS *hstat) +{ +#if USE_DYNAMIC_HASH + return dshash_seq_next(hstat); +#else + return hash_seq_search(hstat); +#endif +} + +void +pgsm_hash_seq_term(PGSM_HASH_SEQ_STATUS *hstat) +{ +#if USE_DYNAMIC_HASH + dshash_seq_term(hstat); +#endif +} + +void +pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, void *key) +{ + #if USE_DYNAMIC_HASH + dshash_delete_current(hstat); + #else + hash_search(shared_hash, key, HASH_REMOVE, NULL); + #endif +} \ No newline at end of file diff --git a/pg_stat_monitor.c b/pg_stat_monitor.c index 6be3568..956c48d 100644 --- a/pg_stat_monitor.c +++ b/pg_stat_monitor.c @@ -32,7 +32,6 @@ PGSM_V2_0 } pgsmVersion; - PG_MODULE_MAGIC; #define BUILD_VERSION "2.0.0-dev" @@ -72,12 +71,15 @@ void _PG_fini(void); /* Current nesting depth of ExecutorRun+ProcessUtility calls */ static int exec_nested_level = 0; +volatile bool __pgsm_do_not_capture_error = false; + #if PG_VERSION_NUM >= 130000 static int plan_nested_level = 0; #endif /* The array to store outer layer query id*/ uint64 *nested_queryids; +char **nested_query_txts; /* Regex object used to extract query comments. */ static regex_t preg_query_comments; @@ -88,13 +90,11 @@ static struct rusage rusage_start; static struct rusage rusage_end; /* Query buffer, store queries' text. */ -static unsigned char *pgss_qbuf = NULL; static char *pgss_explain(QueryDesc *queryDesc); static void extract_query_comments(const char *query, char *comments, size_t max_len); static int get_histogram_bucket(double q_time); static bool IsSystemInitialized(void); -static bool dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len); static double time_diff(struct timeval end, struct timeval start); static void request_additional_shared_resources(void); @@ -229,7 +229,6 @@ static uint64 djb2_hash(unsigned char *str, size_t len); /* Same as above, but stores the calculated string length into *out_len (small optimization) */ static uint64 djb2_hash_str(unsigned char *str, int *out_len); - /* * Module load callback */ @@ -238,7 +237,6 @@ void _PG_init(void) { int rc; - char file_name[1024]; elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__); @@ -265,8 +263,6 @@ _PG_init(void) EnableQueryId(); #endif - snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); - unlink(file_name); EmitWarningsOnPlaceholders("pg_stat_monitor"); @@ -312,6 +308,7 @@ _PG_init(void) ExecutorCheckPerms_hook = HOOK(pgss_ExecutorCheckPerms); nested_queryids = (uint64 *) malloc(sizeof(uint64) * max_stack_depth); + nested_query_txts = (char **) malloc(sizeof(char*) * max_stack_depth); system_init = true; } @@ -334,6 +331,7 @@ _PG_fini(void) emit_log_hook = prev_emit_log_hook; free(nested_queryids); + free(nested_query_txts); regfree(&preg_query_comments); hash_entry_reset(); @@ -362,7 +360,7 @@ request_additional_shared_resources(void) * the postmaster process.) We'll allocate or attach to the shared * resources in pgss_shmem_startup(). */ - RequestAddinShmemSpace(hash_memsize() + HOOK_STATS_SIZE); + RequestAddinShmemSpace(pgsm_ShmemSize() + HOOK_STATS_SIZE); RequestNamedLWLockTranche("pg_stat_monitor", 1); } /* @@ -553,7 +551,11 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + { nested_queryids[exec_nested_level] = queryDesc->plannedstmt->queryId; + nested_query_txts[exec_nested_level] = strdup(queryDesc->sourceText); + } + exec_nested_level++; PG_TRY(); { @@ -563,13 +565,23 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, standard_ExecutorRun(queryDesc, direction, count, execute_once); exec_nested_level--; if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + { nested_queryids[exec_nested_level] = UINT64CONST(0); + if(nested_query_txts[exec_nested_level]) + free(nested_query_txts[exec_nested_level]); + nested_query_txts[exec_nested_level] = NULL; + } } PG_CATCH(); { exec_nested_level--; if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + { nested_queryids[exec_nested_level] = UINT64CONST(0); + if(nested_query_txts[exec_nested_level]) + free(nested_query_txts[exec_nested_level]); + nested_query_txts[exec_nested_level] = NULL; + } PG_RE_THROW(); } PG_END_TRY(); @@ -1257,11 +1269,30 @@ pgss_update_entry(pgssEntry *entry, if (exec_nested_level > 0) { if (exec_nested_level >= 0 && exec_nested_level < max_stack_depth) + { + int parent_query_len = nested_query_txts[exec_nested_level - 1]? + strlen(nested_query_txts[exec_nested_level - 1]): 0; e->counters.info.parentid = nested_queryids[exec_nested_level - 1]; + e->counters.info.parent_query = InvalidDsaPointer; + if (parent_query_len > 0) + { + char *qry_buff; + dsa_area *query_dsa_area = get_dsa_area_for_query_text(); + dsa_pointer qry = dsa_allocate_extended(query_dsa_area, parent_query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); + if (DsaPointerIsValid(qry)) + { + qry_buff = dsa_get_address(query_dsa_area, qry); + memcpy(qry_buff, nested_query_txts[exec_nested_level - 1], parent_query_len); + qry_buff[parent_query_len] = 0; + e->counters.info.parent_query = qry; + } + } + } } else { e->counters.info.parentid = UINT64CONST(0); + e->counters.info.parent_query = InvalidDsaPointer; } if (error_info) @@ -1378,7 +1409,6 @@ pgss_store(uint64 queryid, JumbleState *jstate, pgssStoreKind kind) { - HTAB *pgss_hash; pgssHashKey key; pgssEntry *entry; pgssSharedState *pgss = pgsm_get_ss(); @@ -1398,6 +1428,7 @@ pgss_store(uint64 queryid, bool found_app_name = false; bool found_client_addr = false; uint client_addr = 0; + bool found; /* Safety check... */ if (!IsSystemInitialized()) @@ -1485,19 +1516,15 @@ pgss_store(uint64 queryid, #else key.toplevel = ((exec_nested_level + plan_nested_level) == 0); #endif - pgss_hash = pgsm_get_hash(); LWLockAcquire(pgss->lock, LW_SHARED); - entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL); + entry = (pgssEntry *) pgsm_hash_find(get_pgssHash(), &key, &found); if (!entry) { - pgssQueryEntry *query_entry; - bool query_found = false; - uint64 prev_qbuf_len = 0; - HTAB *pgss_query_hash; - - pgss_query_hash = pgsm_get_query_hash(); + dsa_pointer dsa_query_pointer; + dsa_area *query_dsa_area; + char *query_buff; /* * Create a new, normalized query string if caller asked. We don't @@ -1537,66 +1564,61 @@ pgss_store(uint64 queryid, pgsm_query_id = pgss_hash_string(query, query_len); } - query_entry = hash_search(pgss_query_hash, &queryid, HASH_ENTER_NULL, &query_found); - if (query_entry == NULL) + /* New query, truncate length if necessary. */ + if (query_len > PGSM_QUERY_MAX_LEN) + query_len = PGSM_QUERY_MAX_LEN; + + /* Save the query text in raw dsa area */ + query_dsa_area = get_dsa_area_for_query_text(); + dsa_query_pointer = dsa_allocate_extended(query_dsa_area, query_len+1, DSA_ALLOC_NO_OOM | DSA_ALLOC_ZERO); + if (!DsaPointerIsValid(dsa_query_pointer)) { LWLockRelease(pgss->lock); if (norm_query) pfree(norm_query); - elog(DEBUG1, "pgss_store: out of memory (pgss_query_hash)."); return; } - else if (!query_found) - { - /* New query, truncate length if necessary. */ - if (query_len > PGSM_QUERY_MAX_LEN) - query_len = PGSM_QUERY_MAX_LEN; - } + query_buff = dsa_get_address(query_dsa_area, dsa_query_pointer); + memcpy(query_buff, norm_query ? norm_query : query, query_len); + /* OK to create a new hashtable entry */ - /* Need exclusive lock to make a new hashtable entry - promote */ - LWLockRelease(pgss->lock); - LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - - if (!query_found) + PGSM_DISABLE_ERROR_CAPUTRE(); { - if (!SaveQueryText(bucketid, - queryid, - pgss_qbuf, - norm_query ? norm_query : query, - query_len, - &query_entry->query_pos)) + PG_TRY(); + { + entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); + } + PG_CATCH(); { LWLockRelease(pgss->lock); if (norm_query) pfree(norm_query); - elog(DEBUG1, "pgss_store: insufficient shared space for query."); - return; + if (DsaPointerIsValid(dsa_query_pointer)) + dsa_free(query_dsa_area, dsa_query_pointer); + PG_RE_THROW(); } + PG_END_TRY(); + }PGSM_END_DISABLE_ERROR_CAPTURE(); - /* - * Save current query buffer length, if we fail to add a new new - * entry to the hash table then we must restore the original - * length. - */ - memcpy(&prev_qbuf_len, pgss_qbuf, sizeof(prev_qbuf_len)); - } - - /* OK to create a new hashtable entry */ - entry = hash_entry_alloc(pgss, &key, GetDatabaseEncoding()); if (entry == NULL) { - if (!query_found) - { - /* Restore previous query buffer length. */ - memcpy(pgss_qbuf, &prev_qbuf_len, sizeof(prev_qbuf_len)); - } LWLockRelease(pgss->lock); if (norm_query) pfree(norm_query); + if (DsaPointerIsValid(dsa_query_pointer)) + dsa_free(query_dsa_area, dsa_query_pointer); return; } - entry->query_pos = query_entry->query_pos; + entry->query_pos = dsa_query_pointer; entry->pgsm_query_id = pgsm_query_id; + + } + else + { + #if USE_DYNAMIC_HASH + if(entry) + dshash_release_lock(get_pgssHash(), entry); + #endif } if (jstate == NULL) @@ -1640,9 +1662,6 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); hash_entry_dealloc(-1, -1, NULL); - /* Reset query buffer. */ - *(uint64 *) pgss_qbuf = 0; - LWLockRelease(pgss->lock); PG_RETURN_VOID(); } @@ -1699,13 +1718,12 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - HASH_SEQ_STATUS hash_seq; + PGSM_HASH_SEQ_STATUS hstat; pgssEntry *entry; char parentid_txt[32]; pgssSharedState *pgss = pgsm_get_ss(); - HTAB *pgss_hash = pgsm_get_hash(); - char *query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1); - char *parent_query_txt = (char *) palloc0(PGSM_QUERY_MAX_LEN + 1); + char *query_txt = NULL; + char *parent_query_txt = NULL; int expected_columns = (api_version >= PGSM_V2_0)?PG_STAT_MONITOR_COLS_V2_0:PG_STAT_MONITOR_COLS_V1_0; /* Safety check... */ @@ -1745,8 +1763,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, LWLockAcquire(pgss->lock, LW_SHARED); - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) + pgsm_hash_seq_init(&hstat, get_pgssHash(), false); + + while ((entry = pgsm_hash_seq_next(&hstat)) != NULL) { Datum values[PG_STAT_MONITOR_COLS] = {0}; bool nulls[PG_STAT_MONITOR_COLS] = {0}; @@ -1761,7 +1780,9 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, uint64 userid = entry->key.userid; int64 ip = entry->key.ip; uint64 planid = entry->key.planid; - uint64 pgsm_query_id = entry->pgsm_query_id; + uint64 pgsm_query_id = entry->pgsm_query_id; + dsa_area *query_dsa_area; + char *query_ptr; #if PG_VERSION_NUM < 140000 bool toplevel = 1; bool is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS); @@ -1769,15 +1790,15 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, bool is_allowed_role = is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS); bool toplevel = entry->key.toplevel; #endif - - if (read_query(pgss_qbuf, queryid, query_txt, entry->query_pos) == 0) + /* Load the query text from dsa area */ + if (DsaPointerIsValid(entry->query_pos)) { - int rc; - - rc = read_query_buffer(bucketid, queryid, query_txt, entry->query_pos); - if (rc != 1) - snprintf(query_txt, 32, "%s", ""); + query_dsa_area = get_dsa_area_for_query_text(); + query_ptr = dsa_get_address(query_dsa_area, entry->query_pos); + query_txt = pstrdup(query_ptr); } + else + query_txt = pstrdup("Query string not available");/* Should never happen. Just a safty check*/ /* copy counters to a local variable to keep locking time short */ { @@ -1805,15 +1826,17 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, if (tmp.state == PGSS_PARSE || tmp.state == PGSS_PLAN) continue; + /* read the parent query text if any */ if (tmp.info.parentid != UINT64CONST(0)) { - if (read_query(pgss_qbuf, tmp.info.parentid, parent_query_txt, 0) == 0) + if (DsaPointerIsValid(tmp.info.parent_query)) { - int rc = read_query_buffer(bucketid, tmp.info.parentid, parent_query_txt, 0); - - if (rc != 1) - snprintf(parent_query_txt, 32, "%s", ""); + query_dsa_area = get_dsa_area_for_query_text(); + query_ptr = dsa_get_address(query_dsa_area, tmp.info.parent_query); + parent_query_txt = pstrdup(query_ptr); } + else + parent_query_txt = pstrdup("parent query text not available"); } /* bucketid at column number 0 */ values[i++] = Int64GetDatumFast(bucketid); @@ -2095,10 +2118,13 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo, tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ + pgsm_hash_seq_term(&hstat); LWLockRelease(pgss->lock); - pfree(query_txt); - pfree(parent_query_txt); + if(query_txt) + pfree(query_txt); + if(parent_query_txt) + pfree(parent_query_txt); tuplestore_donestoring(tupstore); } @@ -2144,7 +2170,6 @@ get_next_wbucket(pgssSharedState *pgss) if (update_bucket) { - char file_name[1024]; new_bucket_id = (tv.tv_sec / PGSM_BUCKET_TIME) % PGSM_MAX_BUCKETS; @@ -2152,24 +2177,7 @@ get_next_wbucket(pgssSharedState *pgss) prev_bucket_id = pg_atomic_exchange_u64(&pgss->current_wbucket, new_bucket_id); LWLockAcquire(pgss->lock, LW_EXCLUSIVE); - hash_entry_dealloc(new_bucket_id, prev_bucket_id, pgss_qbuf); - - if (pgss->overflow) - { - pgss->n_bucket_cycles += 1; - if (pgss->n_bucket_cycles >= PGSM_MAX_BUCKETS) - { - /* - * A full rotation of PGSM_MAX_BUCKETS buckets happened since - * we detected a query buffer overflow. - * Reset overflow state and remove the dump file. - */ - pgss->overflow = false; - pgss->n_bucket_cycles = 0; - snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); - unlink(file_name); - } - } + hash_entry_dealloc(new_bucket_id, prev_bucket_id, NULL); LWLockRelease(pgss->lock); @@ -3167,165 +3175,6 @@ intarray_get_datum(int32 arr[], int len) } -uint64 -read_query(unsigned char *buf, uint64 queryid, char *query, size_t pos) -{ - bool found = false; - uint64 query_id = 0; - uint64 query_len = 0; - uint64 rlen = 0; - uint64 buf_len = 0; - - memcpy(&buf_len, buf, sizeof(uint64)); - if (buf_len <= 0) - goto exit; - - /* If a position hint is given, try to locate the query directly. */ - if (pos != 0 && (pos + sizeof(uint64) + sizeof(uint64)) < buf_len) - { - memcpy(&query_id, &buf[pos], sizeof(uint64)); - if (query_id != queryid) - return 0; - - pos += sizeof(uint64); - - memcpy(&query_len, &buf[pos], sizeof(uint64)); /* query len */ - pos += sizeof(uint64); - - if (pos + query_len > buf_len) /* avoid reading past buffer's length. */ - return 0; - - memcpy(query, &buf[pos], query_len); /* Actual query */ - query[query_len] = '\0'; - - return queryid; - } - - rlen = sizeof(uint64); /* Move forwad to skip length bytes */ - for (;;) - { - if (rlen >= buf_len) - goto exit; - - memcpy(&query_id, &buf[rlen], sizeof(uint64)); /* query id */ - if (query_id == queryid) - found = true; - - rlen += sizeof(uint64); - if (buf_len <= rlen) - continue; - - memcpy(&query_len, &buf[rlen], sizeof(uint64)); /* query len */ - rlen += sizeof(uint64); - if (buf_len < rlen + query_len) - goto exit; - if (found) - { - if (query != NULL) - { - memcpy(query, &buf[rlen], query_len); /* Actual query */ - query[query_len] = 0; - } - return query_id; - } - rlen += query_len; - } -exit: - if (PGSM_OVERFLOW_TARGET == OVERFLOW_TARGET_NONE) - { - sprintf(query, "%s", ""); - return -1; - } - return 0; -} - -bool -SaveQueryText(uint64 bucketid, - uint64 queryid, - unsigned char *buf, - const char *query, - uint64 query_len, - size_t *query_pos) -{ - uint64 buf_len = 0; - - memcpy(&buf_len, buf, sizeof(uint64)); - if (buf_len == 0) - buf_len += sizeof(uint64); - - if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) - { - switch (PGSM_OVERFLOW_TARGET) - { - case OVERFLOW_TARGET_NONE: - return false; - case OVERFLOW_TARGET_DISK: - { - bool dump_ok; - pgssSharedState *pgss = pgsm_get_ss(); - - if (pgss->overflow) - { - elog(DEBUG1, "query buffer overflowed twice"); - return false; - } - - /* - * If the query buffer is empty, there is nothing to dump, - * this also means that the current query length exceeds - * MAX_QUERY_BUF. - */ - if (buf_len <= sizeof(uint64)) - return false; - - dump_ok = dump_queries_buffer(bucketid, buf, MAX_QUERY_BUF); - buf_len = sizeof(uint64); - - if (dump_ok) - { - pgss->overflow = true; - pgss->n_bucket_cycles = 0; - } - - /* - * We must check for overflow again, as the query length - * may exceed the total size allocated to the buffer - * (MAX_QUERY_BUF). - */ - if (QUERY_BUFFER_OVERFLOW(buf_len, query_len)) - { - /* - * If we successfully dumped the query buffer to disk, - * then reset the buffer, otherwise we could end up - * dumping the same buffer again. - */ - if (dump_ok) - *(uint64 *) buf = 0; - - return false; - } - - } - break; - default: - Assert(false); - break; - } - } - - *query_pos = buf_len; - - memcpy(&buf[buf_len], &queryid, sizeof(uint64)); /* query id */ - buf_len += sizeof(uint64); - - memcpy(&buf[buf_len], &query_len, sizeof(uint64)); /* query length */ - buf_len += sizeof(uint64); - - memcpy(&buf[buf_len], query, query_len); /* query */ - buf_len += query_len; - memcpy(buf, &buf_len, sizeof(uint64)); - return true; -} Datum pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS) @@ -3333,12 +3182,6 @@ pg_stat_monitor_hook_stats(PG_FUNCTION_ARGS) return (Datum) 0; } -void -set_qbuf(unsigned char *buf) -{ - pgss_qbuf = buf; - *(uint64 *) pgss_qbuf = 0; -} void pgsm_emit_log_hook(ErrorData *edata) @@ -3353,7 +3196,8 @@ pgsm_emit_log_hook(ErrorData *edata) if (MyProc == NULL) goto exit; - if ((edata->elevel == ERROR || edata->elevel == WARNING || edata->elevel == INFO || edata->elevel == DEBUG1)) + if (PGSM_ERROR_CAPTURE_ENABLED && + (edata->elevel == ERROR || edata->elevel == WARNING || edata->elevel == INFO || edata->elevel == DEBUG1)) { uint64 queryid = 0; @@ -3375,145 +3219,6 @@ IsSystemInitialized(void) return (system_init && IsHashInitialize()); } -static bool -dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len) -{ - int fd = 0; - char file_name[1024]; - bool success = true; - int off = 0; - int tries = 0; - - snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); - fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY); - if (fd < 0) - { - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - file_name))); - return false; - } - - /* Loop until write buf_len bytes to the file. */ - do - { - ssize_t nwrite = write(fd, buf + off, buf_len - off); - - if (nwrite == -1) - { - if (errno == EINTR && tries++ < 3) - continue; - - success = false; - break; - } - off += nwrite; - } while (off < buf_len); - - if (!success) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", file_name))); - - if (fd > 0) - CloseTransientFile(fd); - - return success; -} - -/* - * Try to locate query text in a dumped file for bucket_id. - * - * Returns: - * 1 Query sucessfully read, query_text will contain the query text. - * 0 Query not found. - * -1 I/O Error. - */ -int -read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos) -{ - int fd = 0; - char file_name[1024]; - unsigned char *buf = NULL; - ssize_t nread = 0; - int off = 0; - int tries = 0; - bool done = false; - bool found = false; - - snprintf(file_name, 1024, "%s", PGSM_TEXT_FILE); - fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY); - if (fd < 0) - goto exit; - - buf = (unsigned char *) palloc(MAX_QUERY_BUF); - while (!done) - { - off = 0; - /* read a chunck of MAX_QUERY_BUF size. */ - do - { - nread = read(fd, buf + off, MAX_QUERY_BUF - off); - if (nread == -1) - { - if (errno == EINTR && tries++ < 3) /* read() was interrupted, - * attempt to read again - * (max attempts=3) */ - continue; - - goto exit; - } - else if (nread == 0) /* EOF */ - { - done = true; - break; - } - - off += nread; - } while (off < MAX_QUERY_BUF); - - if (off == MAX_QUERY_BUF) - { - /* we have a chunck, scan it looking for queryid. */ - if (read_query(buf, queryid, query_txt, pos) != 0) - { - - found = true; - /* query was found, don't need to read another chunck. */ - break; - } - } - else - - /* - * Either done=true or file has a size not multiple of - * MAX_QUERY_BUF. It is safe to assume that the file was truncated - * or corrupted. - */ - break; - } - -exit: - if (fd < 0 || nread == -1) - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - file_name))); - - if (fd >= 0) - CloseTransientFile(fd); - - if (buf) - pfree(buf); - - if (found) - return 1; - else if (fd == -1 || nread == -1) - return -1; /* I/O error. */ - else - return 0; /* Not found. */ -} static double time_diff(struct timeval end, struct timeval start) diff --git a/pg_stat_monitor.h b/pg_stat_monitor.h index 2c708c4..5f90892 100644 --- a/pg_stat_monitor.h +++ b/pg_stat_monitor.h @@ -27,6 +27,9 @@ #include #include +#include "lib/dshash.h" +#include "utils/dsa.h" + #include "access/hash.h" #include "catalog/pg_authid.h" #include "executor/instrument.h" @@ -103,6 +106,29 @@ /* Update this if need a enum GUC with more options. */ #define MAX_ENUM_OPTIONS 6 + +extern volatile bool __pgsm_do_not_capture_error; +#define PGSM_DISABLE_ERROR_CAPUTRE() \ + do { \ + __pgsm_do_not_capture_error = true + +#define PGSM_END_DISABLE_ERROR_CAPTURE() \ + __pgsm_do_not_capture_error = false; \ + } while (0) + +#define PGSM_ERROR_CAPTURE_ENABLED \ + __pgsm_do_not_capture_error == false + +#ifdef USE_DYNAMIC_HASH + #define PGSM_HASH_TABLE dshash_table + #define PGSM_HASH_TABLE_HANDLE dshash_table_handle + #define PGSM_HASH_SEQ_STATUS dshash_seq_status +#else + #define PGSM_HASH_TABLE HTAB + #define PGSM_HASH_TABLE_HANDLE HTAB* + #define PGSM_HASH_SEQ_STATUS HASH_SEQ_STATUS +#endif + typedef struct GucVariables { enum config_type type; /* PGC_BOOL, PGC_INT, PGC_REAL, PGC_STRING, @@ -179,20 +205,6 @@ typedef struct CallTime double sum_var_time; /* sum of variances in execution time in msec */ } CallTime; -/* - * Entry type for queries hash table (query ID). - * - * We use a hash table to keep track of query IDs that have their - * corresponding query text added to the query buffer (pgsm_query_shared_buffer). - * - * This allow us to avoid adding duplicated queries to the buffer, therefore - * leaving more space for other queries and saving some CPU. - */ -typedef struct pgssQueryEntry -{ - uint64 queryid; /* query identifier, also the key. */ - size_t query_pos; /* query location within query buffer */ -} pgssQueryEntry; typedef struct PlanInfo { @@ -216,6 +228,7 @@ typedef struct pgssHashKey typedef struct QueryInfo { uint64 parentid; /* parent queryid of current query */ + dsa_pointer parent_query; int64 type; /* type of query, options are query, info, * warning, error, fatal */ char application_name[APPLICATIONNAME_LEN]; @@ -323,7 +336,7 @@ typedef struct pgssEntry Counters counters; /* the statistics for this query */ int encoding; /* query text encoding */ slock_t mutex; /* protects the counters only */ - size_t query_pos; /* query location within query buffer */ + dsa_pointer query_pos; /* query location within query buffer */ } pgssEntry; /* @@ -354,10 +367,19 @@ typedef struct pgssSharedState * This allows us to avoid having a large file on disk that would also * slowdown queries to the pg_stat_monitor view. */ - bool overflow; size_t n_bucket_cycles; + int hash_tranche_id; + void *raw_dsa_area; + PGSM_HASH_TABLE_HANDLE hash_handle; } pgssSharedState; +typedef struct pgsmLocalState +{ + pgssSharedState *shared_pgssState; + dsa_area *dsa; + PGSM_HASH_TABLE *shared_hash; +}pgsmLocalState; + #define ResetSharedState(x) \ do { \ x->cur_median_usage = ASSUMED_MEDIAN_INIT; \ @@ -419,27 +441,23 @@ void init_guc(void); GucVariable *get_conf(int i); /* hash_create.c */ +dsa_area *get_dsa_area_for_query_text(void); +PGSM_HASH_TABLE *get_pgssHash(void); + +void pgsm_attach_shmem(void); bool IsHashInitialize(void); void pgss_shmem_startup(void); void pgss_shmem_shutdown(int code, Datum arg); int pgsm_get_bucket_size(void); pgssSharedState *pgsm_get_ss(void); -HTAB *pgsm_get_plan_hash(void); -HTAB *pgsm_get_hash(void); -HTAB *pgsm_get_query_hash(void); -HTAB *pgsm_get_plan_hash(void); void hash_entry_reset(void); void hash_query_entryies_reset(void); void hash_query_entries(); void hash_query_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer[]); void hash_entry_dealloc(int new_bucket_id, int old_bucket_id, unsigned char *query_buffer); pgssEntry *hash_entry_alloc(pgssSharedState *pgss, pgssHashKey *key, int encoding); -Size hash_memsize(void); - -int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt, size_t pos); -uint64 read_query(unsigned char *buf, uint64 queryid, char *query, size_t pos); +Size pgsm_ShmemSize(void); void pgss_startup(void); -void set_qbuf(unsigned char *); /* hash_query.c */ void pgss_startup(void); @@ -481,3 +499,9 @@ static const struct config_enum_entry track_options[] = #define HOOK_STATS_SIZE 0 #endif +void *pgsm_hash_find_or_insert(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found); +void *pgsm_hash_find(PGSM_HASH_TABLE *shared_hash, pgssHashKey *key, bool* found); +void pgsm_hash_seq_init(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, bool lock); +void *pgsm_hash_seq_next(PGSM_HASH_SEQ_STATUS *hstat); +void pgsm_hash_seq_term(PGSM_HASH_SEQ_STATUS *hstat); +void pgsm_hash_delete_current(PGSM_HASH_SEQ_STATUS *hstat, PGSM_HASH_TABLE *shared_hash, void *key);