mirror of
https://github.com/percona/pg_stat_monitor.git
synced 2026-02-04 14:06:20 +00:00
PG-154: Add backup option in case of no space left in the bucket.
This commit is contained in:
@@ -16,7 +16,9 @@
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
#define BUILD_VERSION "0.6.0"
|
||||
#define BUILD_VERSION "0.6.0"
|
||||
#define PG_STAT_STATEMENTS_COLS 41 /* maximum of above */
|
||||
#define PGSM_TEXT_FILE "/tmp/pg_stat_monitor_query"
|
||||
|
||||
/*---- Initicalization Function Declarations ----*/
|
||||
void _PG_init(void);
|
||||
@@ -32,13 +34,14 @@ static int plan_nested_level = 0;
|
||||
static int exec_nested_level = 0;
|
||||
#endif
|
||||
|
||||
FILE *qfile;
|
||||
static bool system_init = false;
|
||||
static struct rusage rusage_start;
|
||||
static struct rusage rusage_end;
|
||||
static unsigned char *pgss_qbuf[MAX_BUCKETS];
|
||||
|
||||
|
||||
static bool IsSystemInitialized(void);
|
||||
static void dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len);
|
||||
|
||||
/* Saved hook values in case of unload */
|
||||
static planner_hook_type planner_hook_next = NULL;
|
||||
@@ -53,7 +56,6 @@ void pgsm_emit_log_hook(ErrorData *edata);
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
static ExecutorCheckPerms_hook_type prev_ExecutorCheckPerms_hook = NULL;
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_version);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_reset);
|
||||
PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2);
|
||||
@@ -130,8 +132,9 @@ static int comp_location(const void *a, const void *b);
|
||||
|
||||
static uint64 get_next_wbucket(pgssSharedState *pgss);
|
||||
|
||||
static void store_query(uint64 queryid, const char *query, uint64 query_len);
|
||||
static uint64 locate_query(uint64 bucket_id, uint64 queryid, char * query);
|
||||
static void store_query(int bucket_id, uint64 queryid, const char *query, uint64 query_len);
|
||||
static uint64 read_query(unsigned char *buf, uint64 queryid, char * query);
|
||||
int read_query_buffer(int bucket_id, uint64 queryid, char *query_txt);
|
||||
|
||||
static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
|
||||
|
||||
@@ -141,6 +144,8 @@ static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
|
||||
/*
|
||||
* In order to create our shared memory area, we have to be loaded via
|
||||
@@ -156,6 +161,13 @@ _PG_init(void)
|
||||
/* Inilize the GUC variables */
|
||||
init_guc();
|
||||
|
||||
for (i = 0; i < PGSM_MAX_BUCKETS; i++)
|
||||
{
|
||||
char file_name[1024];
|
||||
sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, i);
|
||||
unlink(file_name);
|
||||
}
|
||||
|
||||
EmitWarningsOnPlaceholders("pg_stat_monitor");
|
||||
|
||||
/*
|
||||
@@ -233,7 +245,6 @@ pg_stat_monitor_version(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_TEXT_P(cstring_to_text(BUILD_VERSION));
|
||||
}
|
||||
|
||||
#define PG_STAT_STATEMENTS_COLS 41 /* maximum of above */
|
||||
|
||||
/*
|
||||
* Post-parse-analysis hook: mark query with a queryId
|
||||
@@ -660,7 +671,7 @@ pgss_hash_string(const char *str, int len)
|
||||
static PgBackendStatus*
|
||||
pg_get_backend_status(void)
|
||||
{
|
||||
LocalPgBackendStatus *local_beentry;
|
||||
LocalPgBackendStatus *local_beentry;
|
||||
int num_backends = pgstat_fetch_stat_numbackends();
|
||||
int i;
|
||||
|
||||
@@ -861,9 +872,9 @@ static void pgss_store(uint64 queryId,
|
||||
goto exit;
|
||||
}
|
||||
if (PGSM_NORMALIZED_QUERY)
|
||||
store_query(queryId, norm_query ? norm_query : query, query_len);
|
||||
store_query(key.bucket_id, queryId, norm_query ? norm_query : query, query_len);
|
||||
else
|
||||
store_query(queryId, query, query_len);
|
||||
store_query(key.bucket_id, queryId, query, query_len);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1090,9 +1101,20 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
if(locate_query(entry->key.bucket_id, queryid, query_txt) == 0)
|
||||
query_txt = NULL;
|
||||
|
||||
if (!hash_find_query_entry(entry->key.bucket_id, queryid))
|
||||
{
|
||||
sprintf(query_txt, "%s", "pg_stat_monitor: queryid not found in hash and in temporay file");
|
||||
}
|
||||
else
|
||||
{
|
||||
unsigned char *buf = pgss_qbuf[entry->key.bucket_id];
|
||||
if(read_query(buf, queryid, query_txt) == 0)
|
||||
{
|
||||
len = read_query_buffer(entry->key.bucket_id, queryid, query_txt);
|
||||
if (len != MAX_QUERY_BUFFER_BUCKET)
|
||||
sprintf(query_txt, "%s", "pg_stat_monitor: query not found either in hash nor in temporay file");
|
||||
}
|
||||
}
|
||||
if (query_txt)
|
||||
sprintf(queryid_txt, "%08lX", queryid);
|
||||
else
|
||||
@@ -1213,8 +1235,9 @@ static uint64
|
||||
get_next_wbucket(pgssSharedState *pgss)
|
||||
{
|
||||
struct timeval tv;
|
||||
uint64 current_usec;
|
||||
uint64 bucket_id;
|
||||
uint64 current_usec;
|
||||
uint64 bucket_id;
|
||||
char file_name[1024];
|
||||
|
||||
gettimeofday(&tv,NULL);
|
||||
current_usec = tv.tv_sec;
|
||||
@@ -1229,6 +1252,11 @@ get_next_wbucket(pgssSharedState *pgss)
|
||||
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
|
||||
buf = pgss_qbuf[bucket_id];
|
||||
hash_entry_dealloc(bucket_id);
|
||||
hash_query_entry_dealloc(bucket_id);
|
||||
sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, (int)bucket_id);
|
||||
unlink(file_name);
|
||||
printf("\nRemove file %s\n", file_name);
|
||||
|
||||
/* reset the query buffer */
|
||||
memset(buf, 0, sizeof (uint64));
|
||||
LWLockRelease(pgss->lock);
|
||||
@@ -2199,17 +2227,16 @@ intarray_get_datum(int32 arr[], int len)
|
||||
}
|
||||
|
||||
static uint64
|
||||
locate_query(uint64 bucket_id, uint64 queryid, char * query)
|
||||
read_query(unsigned char *buf, uint64 queryid, char * query)
|
||||
{
|
||||
bool found = false;
|
||||
uint64 query_id = 0;
|
||||
uint64 query_len = 0;
|
||||
uint64 rlen = 0;
|
||||
uint64 buf_len = 0;
|
||||
unsigned char *buf = pgss_qbuf[bucket_id];
|
||||
|
||||
memcpy(&buf_len, buf, sizeof (uint64));
|
||||
if (buf_len <= sizeof (uint64))
|
||||
if (buf_len <= 0)
|
||||
return 0;
|
||||
|
||||
rlen = sizeof (uint64); /* Move forwad to skip length bytes */
|
||||
@@ -2246,7 +2273,7 @@ locate_query(uint64 bucket_id, uint64 queryid, char * query)
|
||||
}
|
||||
|
||||
static void
|
||||
store_query(uint64 queryid, const char *query, uint64 query_len)
|
||||
store_query(int bucket_id, uint64 queryid, const char *query, uint64 query_len)
|
||||
{
|
||||
uint64 buf_len = 0;
|
||||
pgssSharedState *pgss = pgsm_get_ss();
|
||||
@@ -2258,18 +2285,20 @@ store_query(uint64 queryid, const char *query, uint64 query_len)
|
||||
/* Already have query in the shared buffer, there
|
||||
* is no need to add that again.
|
||||
*/
|
||||
if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid)
|
||||
if (hash_find_query_entry(bucket_id, queryid))
|
||||
return;
|
||||
|
||||
if (!hash_create_query_entry(bucket_id, queryid))
|
||||
return;
|
||||
|
||||
memcpy(&buf_len, buf, sizeof (uint64));
|
||||
if (buf_len == 0)
|
||||
buf_len += sizeof (uint64);
|
||||
|
||||
if ((buf_len + query_len + sizeof(uint64) + sizeof(uint64)) > pgsm_get_bucket_size())
|
||||
if (QUERY_BUFFER_OVERFLOW(buf_len, query_len))
|
||||
{
|
||||
/* Buffer is full */
|
||||
elog(INFO, "pg_stat_monitor: no space left in shared_buffer");
|
||||
return;
|
||||
dump_queries_buffer(bucket_id, buf, MAX_QUERY_BUFFER_BUCKET);
|
||||
buf_len = sizeof (uint64);
|
||||
}
|
||||
|
||||
memcpy(&buf[buf_len], &queryid, sizeof (uint64)); /* query id */
|
||||
@@ -2417,7 +2446,11 @@ pg_stat_monitor_settings(PG_FUNCTION_ARGS)
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
#if PG_VERSION_NUM >= 130000
|
||||
for(i = 0; i < 11; i++)
|
||||
#else
|
||||
for(i = 0; i < 10; i++)
|
||||
#endif
|
||||
{
|
||||
Datum values[7];
|
||||
bool nulls[7];
|
||||
@@ -2490,3 +2523,78 @@ IsSystemInitialized(void)
|
||||
return (system_init && IsHashInitialize());
|
||||
}
|
||||
|
||||
static void
|
||||
dump_queries_buffer(int bucket_id, unsigned char *buf, int buf_len)
|
||||
{
|
||||
int fd = 0;
|
||||
char file_name[1024];
|
||||
|
||||
sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, bucket_id);
|
||||
printf("\nWriting to %s\n", file_name);
|
||||
fd = OpenTransientFile(file_name, O_RDWR | O_CREAT | O_APPEND | PG_BINARY);
|
||||
if (fd < 0)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
file_name)));
|
||||
|
||||
if (write(fd, buf, buf_len) != buf_len)
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write file \"%s\": %m",
|
||||
file_name)));
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
}
|
||||
|
||||
int
|
||||
read_query_buffer(int bucket_id, uint64 queryid, char *query_txt)
|
||||
{
|
||||
int fd = 0;
|
||||
int buf_len;
|
||||
char file_name[1024];
|
||||
unsigned char *buf = NULL;
|
||||
int off = 0;
|
||||
|
||||
sprintf(file_name, "%s.%d", PGSM_TEXT_FILE, bucket_id);
|
||||
fd = OpenTransientFile(file_name, O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
goto exit;
|
||||
|
||||
buf = (unsigned char*) palloc(MAX_QUERY_BUFFER_BUCKET);
|
||||
for(;;)
|
||||
{
|
||||
if (lseek(fd, off, SEEK_SET) != off)
|
||||
goto exit;
|
||||
|
||||
buf_len = read(fd, buf, MAX_QUERY_BUFFER_BUCKET);
|
||||
if (buf_len != MAX_QUERY_BUFFER_BUCKET)
|
||||
{
|
||||
if (errno != ENOENT)
|
||||
goto exit;
|
||||
|
||||
if (buf_len == 0)
|
||||
break;
|
||||
}
|
||||
off += buf_len;
|
||||
if (read_query(buf, queryid, query_txt))
|
||||
break;
|
||||
}
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
if (buf)
|
||||
pfree(buf);
|
||||
return buf_len;
|
||||
|
||||
exit:
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read file \"%s\": %m",
|
||||
file_name)));
|
||||
if (fd > 0)
|
||||
CloseTransientFile(fd);
|
||||
if (buf)
|
||||
pfree(buf);
|
||||
return buf_len;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user