Issue(30): Code refactoring.

This commit is contained in:
Ibrar Ahmed
2020-09-14 22:26:19 +00:00
parent 7613b1cb1c
commit 935d063f13
6 changed files with 554 additions and 713 deletions

View File

@@ -20,27 +20,23 @@ PG_MODULE_MAGIC;
void _PG_init(void);
void _PG_fini(void);
/*---- Local variables ----*/
/* Current nesting depth of ExecutorRun+ProcessUtility calls */
static int nested_level = 0;
#if PG_VERSION_NUM >= 130000
static int plan_nested_level = 0;
static int exec_nested_level = 0;
#endif
static struct rusage rusage_start;
static struct rusage rusage_end;
static volatile sig_atomic_t sigterm = false;
static void handle_sigterm(SIGNAL_ARGS);
int query_buf_size_bucket;
HTAB *
CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size);
/* Saved hook values in case of unload */
static planner_hook_type planner_hook_next = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL;
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
static ExecutorRun_hook_type prev_ExecutorRun = NULL;
@@ -48,23 +44,6 @@ static ExecutorFinish_hook_type prev_ExecutorFinish = NULL;
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static ProcessUtility_hook_type prev_ProcessUtility = NULL;
/* Links to shared memory state */
static pgssSharedState *pgss = NULL;
static HTAB *pgss_hash = NULL;
static HTAB *pgss_object_hash = NULL;
/* Hash table for aggegates */
static HTAB *pgss_agghash = NULL;
/* Hash table for aggegates */
static HTAB *pgss_buckethash = NULL;
/* Hash table for wait events */
static HTAB *pgss_waiteventshash = NULL;
static pgssBucketEntry **pgssBucketEntries = NULL;
static pgssWaitEventEntry **pgssWaitEventEntries = NULL;
PG_FUNCTION_INFO_V1(pg_stat_monitor_reset);
PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2);
@@ -73,14 +52,9 @@ PG_FUNCTION_INFO_V1(pg_stat_monitor);
PG_FUNCTION_INFO_V1(pg_stat_wait_events);
PG_FUNCTION_INFO_V1(pg_stat_monitor_settings);
/* Extended version function prototypes */
PG_FUNCTION_INFO_V1(pg_stat_agg);
static uint pg_get_client_addr(void);
static Datum array_get_datum(int arr[]);
static void update_agg_counters(uint64 bucket_id, uint64 queryid, uint64 id, AGG_KEY type);
static pgssAggEntry *agg_entry_alloc(pgssAggHashKey *key);
void add_object_entry(uint64 queryid, char *objects);
#if PG_VERSION_NUM >= 130000
static PlannedStmt * pgss_planner_hook(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams);
#else
@@ -88,8 +62,6 @@ static void BufferUsageAccumDiff(BufferUsage* bufusage, BufferUsage* pgBufferUsa
static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param);
#endif
static void pgss_shmem_startup(void);
static void pgss_shmem_shutdown(int code, Datum arg);
static void pgss_post_parse_analyze(ParseState *pstate, Query *query);
static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once);
@@ -125,11 +97,7 @@ static void pgss_store(const char *query, uint64 queryId,
static void pg_stat_monitor_internal(FunctionCallInfo fcinfo,
bool showtext);
static Size pgss_memsize(void);
static pgssEntry *entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding, bool sticky);
static void entry_dealloc(int bucket_id);
static void entry_reset(void);
static void AppendJumble(pgssJumbleState *jstate,
const unsigned char *item, Size size);
static void JumbleQuery(pgssJumbleState *jstate, Query *query);
@@ -181,7 +149,7 @@ _PG_init(void)
* the postmaster process.) We'll allocate or attach to the shared
* resources in pgss_shmem_startup().
*/
RequestAddinShmemSpace(pgss_memsize());
RequestAddinShmemSpace(hash_memsize());
RequestNamedLWLockTranche("pg_stat_monitor", 1);
/* Register Wait events */
@@ -223,152 +191,9 @@ _PG_fini(void)
ExecutorFinish_hook = prev_ExecutorFinish;
ExecutorEnd_hook = prev_ExecutorEnd;
ProcessUtility_hook = prev_ProcessUtility;
entry_reset();
hash_entry_reset();
}
HTAB *
CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size)
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = key_size;
info.entrysize = entry_size;
return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS);
}
/*
* shmem_startup hook: allocate or attach to shared memory,
* then load any pre-existing statistics from file.
* Also create and load the query-texts file, which is expected to exist
* (even if empty) while the module is enabled.
*/
static void
pgss_shmem_startup(void)
{
bool found = false;
int32 i;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
/* reset in case this is a restart within the postmaster */
pgss = NULL;
pgss_hash = NULL;
pgss_object_hash = NULL;
pgss_agghash = NULL;
pgss_buckethash = NULL;
pgss_waiteventshash = NULL;
/*
* Create or attach to the shared memory state, including hash table
*/
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pgss = ShmemInitStruct("pg_stat_monitor", sizeof(pgssSharedState), &found);
if (!found)
{
/* First time through ... */
pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
SpinLockInit(&pgss->mutex);
ResetSharedState(pgss);
}
query_buf_size_bucket = PGSM_QUERY_BUF_SIZE / PGSM_MAX_BUCKETS;
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
pgss_qbuf[i] = (unsigned char *) ShmemAlloc(query_buf_size_bucket);
pgss_hash = CreateHash("pg_stat_monitor: Queries hashtable",
sizeof(pgssHashKey),
sizeof(pgssEntry),
PGSM_MAX);
pgss_buckethash = CreateHash("pg_stat_monitor: Bucket hashtable",
sizeof(pgssBucketHashKey),
sizeof(pgssBucketEntry),
PGSM_MAX_BUCKETS);
pgss_waiteventshash = CreateHash("pg_stat_monitor: Wait Event hashtable",
sizeof(pgssWaitEventKey),
sizeof(pgssWaitEventEntry),
100);
pgss_object_hash = CreateHash("pg_stat_monitor: Object hashtable",
sizeof(pgssObjectHashKey),
sizeof(pgssObjectEntry),
PGSM_OBJECT_CACHE);
pgss_agghash = CreateHash("pg_stat_monitor: Aggregate hashtable",
sizeof(pgssAggHashKey),
sizeof(pgssAggEntry),
PGSM_MAX * 3);
Assert(IsHashInitialize());
pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES);
for (i = 0; i < MAX_BACKEND_PROCESES; i++)
{
pgssWaitEventKey key;
pgssWaitEventEntry *entry = NULL;
bool found = false;
key.processid = i;
entry = (pgssWaitEventEntry *) hash_search(pgss_waiteventshash, &key, HASH_ENTER, &found);
if (!found)
{
SpinLockInit(&entry->mutex);
pgssWaitEventEntries[i] = entry;
}
}
pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * PGSM_MAX_BUCKETS);
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
{
pgssBucketHashKey key;
pgssBucketEntry *entry = NULL;
bool found = false;
key.bucket_id = i;
/* Find or create an entry with desired hash code */
entry = (pgssBucketEntry *) hash_search(pgss_buckethash, &key, HASH_ENTER, &found);
if (!found)
{
memset(&entry->counters, 0, sizeof(pgssBucketCounters));
SpinLockInit(&entry->mutex);
pgssBucketEntries[i] = entry;
}
}
LWLockRelease(AddinShmemInitLock);
/*
* If we're in the postmaster (or a standalone backend...), set up a shmem
* exit hook to dump the statistics to disk.
*/
if (!IsUnderPostmaster)
on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
}
/*
* shmem_shutdown hook: Dump statistics into file.
*
* Note: we don't bother with acquiring lock, because there should be no
* other processes running when this is called.
*/
static void
pgss_shmem_shutdown(int code, Datum arg)
{
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
/* Don't try to dump during a crash. */
if (code)
return;
/* Safety check ... shouldn't get here unless shmem is set up. */
if (IsHashInitialize())
return;
}
/*
* Post-parse-analysis hook: mark query with a queryId
*/
@@ -434,9 +259,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
}
}
}
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
add_object_entry(query->queryId, tables_name);
LWLockRelease(pgss->lock);
}
/*
@@ -446,7 +269,6 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
if (query->queryId == UINT64CONST(0))
query->queryId = UINT64CONST(1);
if (PGSM_ENABLED == 1)
if (jstate.clocations_count > 0)
pgss_store(pstate->p_sourcetext,
@@ -839,7 +661,11 @@ static void pgss_store(const char *query, uint64 queryId,
bool reset = false;
int i;
char tables_name[MAX_REL_LEN] = {0};
int len;
pgssSharedState *pgss = pgsm_get_ss();
pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket();
HTAB *pgss_hash = pgsm_get_hash();
Assert(query != NULL);
Assert(PGSM_ENABLED);
@@ -885,29 +711,15 @@ static void pgss_store(const char *query, uint64 queryId,
if (queryId == UINT64CONST(0))
queryId = pgss_hash_string(query, query_len);
{
pgssObjectHashKey key;
pgssObjectEntry *entry;
key.queryid = queryId;
LWLockAcquire(pgss->lock, LW_SHARED);
entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_FIND, NULL);
if (entry != NULL)
{
LWLockRelease(pgss->lock);
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
snprintf(tables_name, MAX_REL_LEN, "%s", entry->tables_name);
hash_search(pgss_object_hash, &entry->key, HASH_REMOVE, NULL);
}
LWLockRelease(pgss->lock);
}
remove_object_entry(queryId, tables_name);
len = strlen(tables_name);
/* Set up key for hashtable search */
key.userid = GetUserId();
key.dbid = MyDatabaseId;
key.queryid = queryId;
key.ip = pg_get_client_addr();
key.bucket_id = get_next_wbucket(pgss);
if (key.bucket_id != pgss->current_wbucket)
@@ -915,7 +727,6 @@ static void pgss_store(const char *query, uint64 queryId,
reset = true;
pgss->current_wbucket = key.bucket_id;
}
/* Lookup the hash table entry with shared lock. */
LWLockAcquire(pgss->lock, LW_SHARED);
entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
@@ -942,7 +753,7 @@ static void pgss_store(const char *query, uint64 queryId,
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
/* OK to create a new hashtable entry */
entry = entry_alloc(pgss, &key, 0, query_len, encoding, jstate != NULL);
entry = hash_entry_alloc(pgss, &key, encoding);
if (entry == NULL)
goto exit;
}
@@ -968,11 +779,6 @@ static void pgss_store(const char *query, uint64 queryId,
if (reset)
memset(&entry->counters, 0, sizeof(Counters));
/* Calculate the agregates for database/user and host */
update_agg_counters(entry->key.bucket_id, key.queryid, key.dbid, AGG_KEY_DATABASE);
update_agg_counters(entry->key.bucket_id, key.queryid, key.userid, AGG_KEY_USER);
update_agg_counters(entry->key.bucket_id, key.queryid, pg_get_client_addr(), AGG_KEY_HOST);
/* "Unstick" entry if it was previously sticky */
if (e->counters.calls[kind].calls == 0)
e->counters.calls[kind].usage = USAGE_INIT;
@@ -1033,7 +839,7 @@ static void pgss_store(const char *query, uint64 queryId,
e->counters.info.host = pg_get_client_addr();
e->counters.sysinfo.utime = utime;
e->counters.sysinfo.stime = stime;
for(i = 0; i < MAX_REL_LEN - 1; i++)
for(i = 0; i < len; i++)
e->counters.info.tables_name[i] = tables_name[i];
SpinLockRelease(&e->mutex);
}
@@ -1053,11 +859,12 @@ exit:
Datum
pg_stat_monitor_reset(PG_FUNCTION_ARGS)
{
if (!pgss || !pgss_hash || !pgss_agghash || !pgss_buckethash || !pgss_waiteventshash)
/* Safety check... */
if (!IsHashInitialize())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
entry_dealloc(-1);
hash_entry_dealloc(-1);
PG_RETURN_VOID();
}
@@ -1066,19 +873,20 @@ pg_stat_monitor_reset(PG_FUNCTION_ARGS)
Datum
pg_stat_wait_events(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
HASH_SEQ_STATUS hash_seq;
pgssWaitEventEntry *entry;
char *query_txt;
char queryid_txt[64];
query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
HASH_SEQ_STATUS hash_seq;
pgssWaitEventEntry *entry;
char *query_txt;
char queryid_txt[64];
pgssSharedState *pgss = pgsm_get_ss();
HTAB *pgss_waiteventshash = pgsm_get_wait_event_hash();
/* hash table must exist already */
if (!pgss || !pgss_hash || !pgss_object_hash)
/* Safety check... */
if (!IsHashInitialize())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
@@ -1094,6 +902,8 @@ pg_stat_wait_events(PG_FUNCTION_ARGS)
errmsg("pg_stat_monitor: materialize mode required, but it is not " \
"allowed in this context")));
query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
/* Switch into long-lived context to construct returned data structures */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
@@ -1161,7 +971,6 @@ pg_stat_wait_events(PG_FUNCTION_ARGS)
Datum
pg_stat_monitor(PG_FUNCTION_ARGS)
{
/* If it's really API 1.1, we'll figure that out below */
pg_stat_monitor_internal(fcinfo, true);
return (Datum) 0;
}
@@ -1171,24 +980,28 @@ static void
pg_stat_monitor_internal(FunctionCallInfo fcinfo,
bool showtext)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
Oid userid = GetUserId();
bool is_allowed_role = false;
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
char *query_txt;
char queryid_txt[64];
query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
Oid userid = GetUserId();
bool is_allowed_role = false;
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
char *query_txt;
char queryid_txt[64];
pgssSharedState *pgss = pgsm_get_ss();
HTAB *pgss_hash = pgsm_get_hash();
pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket_entries();
query_txt = (char*) malloc(PGSM_QUERY_MAX_LEN);
/* Superusers or members of pg_read_all_stats members are allowed */
is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
/* hash table must exist already */
if (!pgss || !pgss_hash || !pgss_object_hash)
/* Safety check... */
if (!IsHashInitialize())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
@@ -1243,6 +1056,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
values[i++] = ObjectIdGetDatum(entry->key.bucket_id);
values[i++] = ObjectIdGetDatum(entry->key.userid);
values[i++] = ObjectIdGetDatum(entry->key.dbid);
values[i++] = Int64GetDatumFast(entry->key.ip);
/* copy counters to a local variable to keep locking time short */
{
volatile pgssEntry *e = (volatile pgssEntry *) entry;
@@ -1307,7 +1121,6 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
values[i++] = Int64GetDatumFast(tmp.blocks.temp_blks_written);
values[i++] = Float8GetDatumFast(tmp.blocks.blk_read_time);
values[i++] = Float8GetDatumFast(tmp.blocks.blk_write_time);
values[i++] = Int64GetDatumFast(tmp.info.host);
values[i++] = ArrayGetTextDatum(pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls);
values[i++] = Float8GetDatumFast(tmp.sysinfo.utime);
values[i++] = Float8GetDatumFast(tmp.sysinfo.stime);
@@ -1325,78 +1138,13 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
tuplestore_donestoring(tupstore);
}
/*
* Estimate shared memory space needed.
*/
static Size
pgss_memsize(void)
{
Size size;
size = MAXALIGN(sizeof(pgssSharedState));
size = add_size(size, hash_estimate_size(PGSM_MAX, sizeof(pgssEntry)));
return size;
}
/*
* Allocate a new hashtable entry.
* caller must hold an exclusive lock on pgss->lock
*
* "query" need not be null-terminated; we rely on query_len instead
*
* If "sticky" is true, make the new entry artificially sticky so that it will
* probably still be there when the query finishes execution. We do this by
* giving it a median usage value rather than the normal value. (Strictly
* speaking, query strings are normalized on a best effort basis, though it
* would be difficult to demonstrate this even under artificial conditions.)
*
* Note: despite needing exclusive lock, it's not an error for the target
* entry to already exist. This is because pgss_store releases and
* reacquires lock after failing to find a match; so someone else could
* have made the entry while we waited to get exclusive lock.
*/
static pgssEntry *
entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int query_len, int encoding,
bool sticky)
{
pgssEntry *entry = NULL;
bool found = false;
if (pgss->bucket_entry[pgss->current_wbucket] >= (PGSM_MAX / PGSM_MAX_BUCKETS))
{
pgss->bucket_overflow[pgss->current_wbucket]++;
return NULL;
}
if (hash_get_num_entries(pgss_hash) >= PGSM_MAX)
return NULL;
/* Find or create an entry with desired hash code */
entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found);
if (!found)
{
pgss->bucket_entry[pgss->current_wbucket]++;
/* New entry, initialize it */
/* reset the statistics */
memset(&entry->counters, 0, sizeof(Counters));
/* set the appropriate initial usage count */
entry->counters.calls[0].usage = sticky ? pgss->cur_median_usage : USAGE_INIT;
/* re-initialize the mutex each time ... we assume no one using it */
SpinLockInit(&entry->mutex);
/* ... and don't forget the query text metadata */
entry->encoding = encoding;
}
return entry;
}
static uint64
get_next_wbucket(pgssSharedState *pgss)
{
struct timeval tv;
uint64 current_usec;
uint64 bucket_id;
pgssBucketEntry **pgssBucketEntries = pgsm_get_bucket();
gettimeofday(&tv,NULL);
current_usec = tv.tv_sec;
@@ -1408,7 +1156,7 @@ get_next_wbucket(pgssSharedState *pgss)
bucket_id = 0;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
entry_dealloc(bucket_id);
hash_entry_dealloc(bucket_id);
/* reset the query buffer */
pgss->query_fifo[bucket_id].head = 0;
pgss->query_fifo[bucket_id].tail = 0;
@@ -1422,94 +1170,6 @@ get_next_wbucket(pgssSharedState *pgss)
return pgss->current_wbucket;
}
/*
* Deallocate least-used entries.
*
* Caller must hold an exclusive lock on pgss->lock.
*/
static void
entry_dealloc(int bucket)
{
HASH_SEQ_STATUS hash_seq;
HASH_SEQ_STATUS hash_dbseq;
pgssEntry *entry;
pgssAggEntry *agg_entry;
pgssEntry **entries;
pgssAggEntry **agg_entries;
int i;
int nvictims = 0;
pgss->bucket_entry[bucket] = 0;
entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *));
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
if (entry->key.bucket_id == bucket || bucket < 0)
entries[nvictims++] = entry;
}
for (i = 0; i < nvictims; i++)
entry = hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL);
nvictims = 0;
agg_entries = palloc(hash_get_num_entries(pgss_agghash) * sizeof(pgssAggEntry *));
hash_seq_init(&hash_dbseq, pgss_agghash);
while ((agg_entry = hash_seq_search(&hash_dbseq)) != NULL)
{
if (agg_entry->key.bucket_id == bucket || bucket < 0)
agg_entries[nvictims++] = agg_entry;
}
for (i = 0; i < nvictims; i++)
hash_search(pgss_agghash, &agg_entries[i]->key, HASH_REMOVE, NULL);
pfree(entries);
pfree(agg_entries);
}
/*
* Release all entries.
*/
static void
entry_reset()
{
HASH_SEQ_STATUS hash_seq;
pgssEntry *entry;
pgssAggEntry *dbentry;
pgssObjectEntry *objentry;
pgssWaitEventEntry *weentry;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
hash_seq_init(&hash_seq, pgss_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(pgss_hash, &entry->key, HASH_REMOVE, NULL);
}
hash_seq_init(&hash_seq, pgss_agghash);
while ((dbentry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(pgss_agghash, &dbentry->key, HASH_REMOVE, NULL);
}
hash_seq_init(&hash_seq, pgss_buckethash);
while ((objentry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(pgss_buckethash, &dbentry->key, HASH_REMOVE, NULL);
}
hash_seq_init(&hash_seq, pgss_waiteventshash);
while ((weentry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(pgss_waiteventshash, &dbentry->key, HASH_REMOVE, NULL);
}
pgss->current_wbucket = 0;
free(pgssWaitEventEntries);
free(pgssBucketEntries);
LWLockRelease(pgss->lock);
}
/*
* AppendJumble: Append a value that is substantive in a given query to
* the current jumble.
@@ -2441,151 +2101,6 @@ array_get_datum(int arr[])
return CStringGetTextDatum(str);
}
/* Alocate memory for a new entry */
void add_object_entry(uint64 queryid, char *objects)
{
pgssObjectEntry *entry = NULL;
bool found;
pgssObjectHashKey key;
key.queryid = queryid;
entry = (pgssObjectEntry *) hash_search(pgss_object_hash, &key, HASH_ENTER, &found);
if (!found)
{
SpinLockAcquire(&entry->mutex);
snprintf(entry->tables_name, MAX_REL_LEN, "%s", objects);
SpinLockRelease(&entry->mutex);
}
}
/* Alocate memory for a new entry */
static pgssAggEntry *
agg_entry_alloc(pgssAggHashKey *key)
{
pgssAggEntry *entry = NULL;
bool found;
entry = (pgssAggEntry *) hash_search(pgss_agghash, key, HASH_ENTER, &found);
if (!found)
{
SpinLockAcquire(&entry->mutex);
memset(&entry->counters, 0, sizeof(pgssAggCounters));
entry->counters.total_calls = 0;
SpinLockRelease(&entry->mutex);
}
return entry;
}
static void
update_agg_counters(uint64 bucket, uint64 queryid, uint64 id, AGG_KEY type)
{
pgssAggHashKey key;
pgssAggEntry *entry;
key.id = id;
key.type = (int64) type;
key.queryid = queryid;
key.bucket_id = bucket;
entry = agg_entry_alloc(&key);
if (!entry)
return;
SpinLockAcquire(&entry->mutex);
entry->key.queryid = queryid;
entry->key.id = id;
entry->key.type = key.type;
entry->key.bucket_id = bucket;
entry->counters.total_calls++;
SpinLockRelease(&entry->mutex);
}
Datum
pg_stat_agg(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
HASH_SEQ_STATUS hash_seq;
pgssAggEntry *entry;
/* hash table must exist already */
if (!pgss || !pgss_agghash)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_stat_monitor: must be loaded via shared_preload_libraries")));
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("pg_stat_monitor: set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("pg_stat_monitor: materialize mode required, but it is not " \
"allowed in this context")));
/* Switch into long-lived context to construct returned data structures */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "pg_stat_monitor: return type must be a row type");
if (tupdesc->natts != 4)
elog(ERROR, "pg_stat_monitor: incorrect number of output arguments, required %d", tupdesc->natts);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/*
* Get shared lock, load or reload the query text file if we must, and
* iterate over the hashtable entries.
*
* With a large hash table, we might be holding the lock rather longer
* than one could wish. However, this only blocks creation of new hash
* table entries, and the larger the hash table the less likely that is to
* be needed. So we can hope this is okay. Perhaps someday we'll decide
* we need to partition the hash table to limit the time spent holding any
* one lock.
*/
LWLockAcquire(pgss->lock, LW_SHARED);
hash_seq_init(&hash_seq, pgss_agghash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
Datum values[6];
bool nulls[6];
int i = 0;
char queryid_txt[32];
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
sprintf(queryid_txt, "%08lX", entry->key.queryid);
values[i++] = CStringGetTextDatum(queryid_txt);
values[i++] = Int64GetDatumFast(entry->key.id);
values[i++] = Int64GetDatumFast(entry->key.type);
values[i++] = Int64GetDatumFast(entry->counters.total_calls);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
/* clean up and return the tuplestore */
LWLockRelease(pgss->lock);
tuplestore_donestoring(tupstore);
return 0;
}
#define FIFO_HEAD(b) pgss->query_fifo[b].head
#define FIFO_TAIL(b) pgss->query_fifo[b].tail
@@ -2595,6 +2110,8 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query)
uint64 id = 0;
uint64 len = 0;
uint64 offset = 0;
pgssSharedState *pgss = pgsm_get_ss();
uint64 tail = FIFO_TAIL(bucket_id);
unsigned char *buf = pgss_qbuf[bucket_id];
@@ -2619,7 +2136,7 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query)
if (id == queryid)
return id;
tail = (tail + offset) % query_buf_size_bucket;
tail = (tail + offset) % pgsm_get_bucket_size();
}
return 0;
}
@@ -2627,8 +2144,9 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query)
static void
store_query(uint64 queryid, const char *query, uint64 query_len)
{
int next;
int offset = 0;
int next;
int offset = 0;
pgssSharedState *pgss = pgsm_get_ss();
if (query_len > PGSM_QUERY_MAX_LEN)
query_len = PGSM_QUERY_MAX_LEN;
@@ -2640,7 +2158,7 @@ store_query(uint64 queryid, const char *query, uint64 query_len)
return;
next = FIFO_HEAD(pgss->current_wbucket) + query_len + sizeof (uint64) + sizeof (uint64);
if (next >= query_buf_size_bucket)
if (next >= pgsm_get_bucket_size())
next = 0;
/* Buffer is full */
@@ -2669,6 +2187,8 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
#endif
{
PlannedStmt *result;
pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry();
if (MyProc)
{
int i = MyProc - ProcGlobal->allProcs;
@@ -2751,8 +2271,9 @@ static PlannedStmt *pgss_planner_hook(Query *parse, int opt, ParamListInfo param
static void
update_wait_event(void)
{
PGPROC *proc = NULL;
int i;
PGPROC *proc = NULL;
int i;
pgssWaitEventEntry **pgssWaitEventEntries = pgsm_get_wait_event_entry();
LWLockAcquire(ProcArrayLock, LW_SHARED);
for (i = 0; i < ProcGlobal->allProcCount; i++)