Issue (#30) - Code refactoring.

This commit is contained in:
Ibrar Ahmed
2020-04-15 09:49:02 +00:00
parent c9de79b6ba
commit beca034067
7 changed files with 777 additions and 309 deletions

View File

@@ -16,18 +16,24 @@
PG_MODULE_MAGIC;
/*---- Initicalization Function Declarations ----*/
void _PG_init(void);
void _PG_fini(void);
/*---- Local variables ----*/
/* Current nesting depth of ExecutorRun+ProcessUtility calls */
static int nested_level = 0;
static double respose_time_lower_bound = .01;
static double respose_time_step = .1;
static struct rusage rusage_start;
static struct rusage rusage_end;
static volatile sig_atomic_t sigterm = false;
static void handle_sigterm(SIGNAL_ARGS);
int query_buf_size_bucket;
HTAB *
CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size);
/* Saved hook values in case of unload */
static planner_hook_type planner_hook_next = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
@@ -55,25 +61,6 @@ static HTAB *pgss_waiteventshash = NULL;
static pgssBucketEntry **pgssBucketEntries = NULL;
static pgssWaitEventEntry **pgssWaitEventEntries = NULL;
/*---- GUC variables ----*/
static int pgss_max; /* max # statements to track */
static int pgss_track; /* tracking level */
static bool pgss_track_utility; /* whether to track utility commands */
static bool pgss_save; /* whether to save stats across shutdown */
static int max_bucket_time;
static int max_buckets;
static int max_bucket_size; /* max # statements to track in a bucket */
static int max_object_cache;
static bool pgss_normalized_query;
static int pgss_query_max_len;
static int pgss_query_buf_size;
static int pgss_query_buf_size_bucket;
/*---- Function declarations ----*/
void _PG_init(void);
void _PG_fini(void);
PG_FUNCTION_INFO_V1(pg_stat_monitor_reset);
PG_FUNCTION_INFO_V1(pg_stat_monitor_1_2);
@@ -129,8 +116,8 @@ static int comp_location(const void *a, const void *b);
static uint64 get_next_wbucket(pgssSharedState *pgss);
static void store_query(unsigned long queryid, const char *query, unsigned long query_len);
static unsigned long locate_query(unsigned long bucket_id, unsigned long queryid, char * query);
static void store_query(uint64 queryid, const char *query, uint64 query_len);
static uint64 locate_query(uint64 bucket_id, uint64 queryid, char * query);
/* Wait Event Local Functions */
static void register_wait_event(void);
@@ -144,6 +131,7 @@ static uint64 get_query_id(pgssJumbleState *jstate, Query *query);
void
_PG_init(void)
{
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
/*
* In order to create our shared memory area, we have to be loaded via
* shared_preload_libraries. If not, fall out without hooking into any of
@@ -155,163 +143,11 @@ _PG_init(void)
if (!process_shared_preload_libraries_in_progress)
return;
/*
* Define (or redefine) custom GUC variables.
*/
DefineCustomIntVariable("pg_stat_monitor.max",
"Sets the maximum number of statements tracked by pg_stat_monitor.",
NULL,
&pgss_max,
5000,
5000,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pg_stat_monitor.query_max_len",
"Sets the maximum length of query",
NULL,
&pgss_query_max_len,
1024,
1024,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomEnumVariable("pg_stat_monitor.track",
"Selects which statements are tracked by pg_stat_monitor.",
NULL,
&pgss_track,
PGSS_TRACK_TOP,
track_options,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable("pg_stat_monitor.track_utility",
"Selects whether utility commands are tracked by pg_stat_monitor.",
NULL,
&pgss_track_utility,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable("pg_stat_monitor.normalized_query",
"Selects whether save query in normalized format.",
NULL,
&pgss_normalized_query,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable("pg_stat_monitor.save",
"Save pg_stat_monitor statistics across server shutdowns.",
NULL,
&pgss_save,
true,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pg_stat_monitor.max_buckets ",
"Sets the maximum number of buckets.",
NULL,
&max_buckets,
MAX_BUCKETS,
1,
MAX_BUCKETS,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pg_stat_monitor.bucket_time",
"Sets the time in seconds per bucket.",
NULL,
&max_bucket_time,
60,
1,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pg_stat_monitor.max_object_cache ",
"Sets the maximum number of object cache",
NULL,
&max_object_cache,
MAX_OBJECT_CACHE,
10,
MAX_OBJECT_CACHE,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomRealVariable("pg_stat_monitor.respose_time_lower_bound",
"Sets the time in millisecond.",
NULL,
&respose_time_lower_bound,
.1,
.1,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomRealVariable("pg_stat_monitor.respose_time_step",
"Sets the respose time steps in millisecond.",
NULL,
&respose_time_step,
.1,
.1,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("pg_stat_monitor.shared_buffer",
"Sets the shared_buffer size",
NULL,
&pgss_query_buf_size,
500000,
500000,
INT_MAX,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
/* Inilize the GUC variables */
init_guc();
EmitWarningsOnPlaceholders("pg_stat_monitor");
max_bucket_size = pgss_max / max_buckets;
/*
* Request additional shared resources. (These are no-ops if we're not in
* the postmaster process.) We'll allocate or attach to the shared
@@ -319,28 +155,29 @@ _PG_init(void)
*/
RequestAddinShmemSpace(pgss_memsize());
RequestNamedLWLockTranche("pg_stat_monitor", 1);
/* Register Wait events */
register_wait_event();
/*
* Install hooks.
*/
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pgss_shmem_startup;
prev_post_parse_analyze_hook = post_parse_analyze_hook;
post_parse_analyze_hook = pgss_post_parse_analyze;
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = pgss_ExecutorStart;
prev_ExecutorRun = ExecutorRun_hook;
ExecutorRun_hook = pgss_ExecutorRun;
prev_ExecutorFinish = ExecutorFinish_hook;
ExecutorFinish_hook = pgss_ExecutorFinish;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = pgss_ExecutorEnd;
prev_ProcessUtility = ProcessUtility_hook;
ProcessUtility_hook = pgss_ProcessUtility;
planner_hook_next = planner_hook;
planner_hook = pgss_planner_hook;
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pgss_shmem_startup;
prev_post_parse_analyze_hook = post_parse_analyze_hook;
post_parse_analyze_hook = pgss_post_parse_analyze;
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = pgss_ExecutorStart;
prev_ExecutorRun = ExecutorRun_hook;
ExecutorRun_hook = pgss_ExecutorRun;
prev_ExecutorFinish = ExecutorFinish_hook;
ExecutorFinish_hook = pgss_ExecutorFinish;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = pgss_ExecutorEnd;
prev_ProcessUtility = ProcessUtility_hook;
ProcessUtility_hook = pgss_ProcessUtility;
planner_hook_next = planner_hook;
planner_hook = pgss_planner_hook;
}
/*
@@ -349,17 +186,29 @@ _PG_init(void)
void
_PG_fini(void)
{
/* Uninstall hooks. */
shmem_startup_hook = prev_shmem_startup_hook;
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
shmem_startup_hook = prev_shmem_startup_hook;
post_parse_analyze_hook = prev_post_parse_analyze_hook;
ExecutorStart_hook = prev_ExecutorStart;
ExecutorRun_hook = prev_ExecutorRun;
ExecutorFinish_hook = prev_ExecutorFinish;
ExecutorEnd_hook = prev_ExecutorEnd;
ProcessUtility_hook = prev_ProcessUtility;
ExecutorStart_hook = prev_ExecutorStart;
ExecutorRun_hook = prev_ExecutorRun;
ExecutorFinish_hook = prev_ExecutorFinish;
ExecutorEnd_hook = prev_ExecutorEnd;
ProcessUtility_hook = prev_ProcessUtility;
entry_reset();
}
HTAB *
CreateHash(const char *hash_name, int key_size, int entry_size, int hash_size)
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = key_size;
info.entrysize = entry_size;
return ShmemInitHash(hash_name, hash_size, hash_size, &info, HASH_ELEM | HASH_BLOBS);
}
/*
* shmem_startup hook: allocate or attach to shared memory,
* then load any pre-existing statistics from file.
@@ -370,9 +219,12 @@ static void
pgss_shmem_startup(void)
{
bool found = false;
HASHCTL info;
int32 i;
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
Assert(IsHashInitialize());
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
@@ -394,43 +246,40 @@ pgss_shmem_startup(void)
{
/* First time through ... */
pgss->lock = &(GetNamedLWLockTranche("pg_stat_monitor"))->lock;
pgss->cur_median_usage = ASSUMED_MEDIAN_INIT;
SpinLockInit(&pgss->mutex);
pgss->extent = 0;
pgss->n_writers = 0;
ResetSharedState(pgss);
}
pgss_query_buf_size_bucket = pgss_query_buf_size / max_buckets;
for (i = 0; i < max_buckets; i++)
pgss_qbuf[i] = (unsigned char *) ShmemAlloc(pgss_query_buf_size_bucket);
query_buf_size_bucket = pgsm_query_buf_size / pgsm_max_buckets;
for (i = 0; i < pgsm_max_buckets; i++)
pgss_qbuf[i] = (unsigned char *) ShmemAlloc(query_buf_size_bucket);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssHashKey);
info.entrysize = sizeof(pgssEntry);
pgss_hash = ShmemInitHash("pg_stat_monitor: Queries hashtable",
pgss_max, pgss_max,
&info,
HASH_ELEM | HASH_BLOBS);
pgss_hash = CreateHash("pg_stat_monitor: Queries hashtable",
sizeof(pgssHashKey),
sizeof(pgssEntry),
pgsm_max);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssBucketHashKey);
info.entrysize = sizeof(pgssBucketEntry);
pgss_buckethash = CreateHash("pg_stat_monitor: Bucket hashtable",
sizeof(pgssBucketHashKey),
sizeof(pgssBucketEntry),
pgsm_max_buckets);
pgss_buckethash = ShmemInitHash("pg_stat_monitor: Buckets hashtable",
max_buckets, max_buckets,
&info,
HASH_ELEM | HASH_BLOBS);
pgss_waiteventshash = CreateHash("pg_stat_monitor: Wait Event hashtable",
sizeof(pgssWaitEventKey),
sizeof(pgssWaitEventEntry),
100);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssWaitEventKey);
info.entrysize = sizeof(pgssWaitEventEntry);
pgss_object_hash = CreateHash("pg_stat_monitor: Object hashtable",
sizeof(pgssObjectHashKey),
sizeof(pgssObjectEntry),
pgsm_object_cache);
pgss_waiteventshash = ShmemInitHash("pg_stat_monitor: Wait Event hashtable",
100, 100,
&info,
HASH_ELEM | HASH_BLOBS);
pgss_agghash = CreateHash("pg_stat_monitor: Aggregate hashtable",
sizeof(pgssAggHashKey),
sizeof(pgssAggEntry),
pgsm_max * 3);
#define MAX_BACKEND_PROCESES (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
Assert(!IsHashInitialize());
pgssWaitEventEntries = malloc(sizeof (pgssWaitEventEntry) * MAX_BACKEND_PROCESES);
for (i = 0; i < MAX_BACKEND_PROCESES; i++)
@@ -448,17 +297,8 @@ pgss_shmem_startup(void)
}
}
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssObjectHashKey);
info.entrysize = sizeof(pgssObjectEntry);
pgss_object_hash = ShmemInitHash("pg_stat_monitor: Objects hashtable",
max_object_cache, max_object_cache,
&info,
HASH_ELEM | HASH_BLOBS);
pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * max_buckets);
for (i = 0; i < max_buckets; i++)
pgssBucketEntries = malloc(sizeof (pgssBucketEntry) * pgsm_max_buckets);
for (i = 0; i < pgsm_max_buckets; i++)
{
pgssBucketHashKey key;
pgssBucketEntry *entry = NULL;
@@ -475,18 +315,6 @@ pgss_shmem_startup(void)
}
}
/*
* Create a aggregate hash 3 times than the the normal hash because we have
* three different type of aggregate stored in the aggregate hash. Aggregate
* by database, aggraget by user and aggragete by host.
*/
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgssAggHashKey);
info.entrysize = sizeof(pgssAggEntry);
pgss_agghash = ShmemInitHash("pg_stat_monitor: Aggregated Information hashtable",
pgss_max * 3, pgss_max * 3,
&info,
HASH_ELEM | HASH_BLOBS);
LWLockRelease(AddinShmemInitLock);
/*
@@ -506,12 +334,13 @@ pgss_shmem_startup(void)
static void
pgss_shmem_shutdown(int code, Datum arg)
{
elog(DEBUG2, "pg_stat_monitor: %s()", __FUNCTION__);
/* Don't try to dump during a crash. */
if (code)
return;
/* Safety check ... shouldn't get here unless shmem is set up. */
if (!pgss || !pgss_hash)
if (IsHashInitialize())
return;
}
@@ -532,7 +361,7 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query)
Assert(query->queryId == UINT64CONST(0));
/* Safety check... */
if (!pgss || !pgss_hash)
if (IsHashInitialize())
return;
/*
@@ -753,7 +582,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
*
* Likewise, we don't track execution of DEALLOCATE.
*/
if (pgss_track_utility && pgss_enabled() &&
if (pgsm_track_utility && pgss_enabled() &&
!IsA(parsetree, ExecuteStmt) &&
!IsA(parsetree, PrepareStmt) &&
!IsA(parsetree, DeallocateStmt))
@@ -923,7 +752,7 @@ pgss_store(const char *query, uint64 queryId,
Assert(query != NULL);
/* Safety check... */
if (!pgss || !pgss_hash || !pgss_qbuf[pgss->current_wbucket])
if (IsHashInitialize() || !pgss_qbuf[pgss->current_wbucket])
return;
/*
@@ -1026,7 +855,7 @@ pgss_store(const char *query, uint64 queryId,
goto exit;
}
if (pgss_normalized_query)
if (pgsm_normalized_query)
store_query(queryId, norm_query ? norm_query : query, query_len);
else
store_query(queryId, query, query_len);
@@ -1086,13 +915,13 @@ pgss_store(const char *query, uint64 queryId,
for (i = 0; i < MAX_RESPONSE_BUCKET - 1; i++)
{
if (total_time < respose_time_lower_bound + (respose_time_step * i))
if (total_time < pgsm_respose_time_lower_bound + (pgsm_respose_time_step * i))
{
pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[i]++;
break;
}
}
if (total_time > respose_time_lower_bound + (respose_time_step * MAX_RESPONSE_BUCKET))
if (total_time > pgsm_respose_time_lower_bound + (pgsm_respose_time_step * MAX_RESPONSE_BUCKET))
pgssBucketEntries[entry->key.bucket_id]->counters.resp_calls[MAX_RESPONSE_BUCKET - 1]++;
e->counters.calls.rows += rows;
@@ -1162,7 +991,7 @@ pg_stat_wait_events(PG_FUNCTION_ARGS)
pgssWaitEventEntry *entry;
char *query_txt;
char queryid_txt[64];
query_txt = (char*) malloc(pgss_query_max_len);
query_txt = (char*) malloc(pgsm_query_max_len);
/* hash table must exist already */
if (!pgss || !pgss_hash || !pgss_object_hash)
@@ -1260,7 +1089,7 @@ pg_stat_monitor_internal(FunctionCallInfo fcinfo,
pgssEntry *entry;
char *query_txt;
char queryid_txt[64];
query_txt = (char*) malloc(pgss_query_max_len);
query_txt = (char*) malloc(pgsm_query_max_len);
/* Superusers or members of pg_read_all_stats members are allowed */
is_allowed_role = is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS);
@@ -1419,7 +1248,7 @@ pgss_memsize(void)
Size size;
size = MAXALIGN(sizeof(pgssSharedState));
size = add_size(size, hash_estimate_size(pgss_max, sizeof(pgssEntry)));
size = add_size(size, hash_estimate_size(pgsm_max, sizeof(pgssEntry)));
return size;
}
@@ -1448,13 +1277,13 @@ entry_alloc(pgssSharedState *pgss, pgssHashKey *key, Size query_offset, int quer
pgssEntry *entry = NULL;
bool found = false;
if (pgss->bucket_entry[pgss->current_wbucket] >= max_bucket_size)
if (pgss->bucket_entry[pgss->current_wbucket] >= (pgsm_max / pgsm_max_buckets))
{
pgss->bucket_overflow[pgss->current_wbucket]++;
return NULL;
}
if (hash_get_num_entries(pgss_hash) >= pgss_max)
if (hash_get_num_entries(pgss_hash) >= pgsm_max)
return NULL;
/* Find or create an entry with desired hash code */
@@ -1480,16 +1309,16 @@ static uint64
get_next_wbucket(pgssSharedState *pgss)
{
struct timeval tv;
unsigned long current_usec;
uint64 current_usec;
uint64 bucket_id;
gettimeofday(&tv,NULL);
current_usec = tv.tv_sec;
if ((current_usec - pgss->prev_bucket_usec) > max_bucket_time)
if ((current_usec - pgss->prev_bucket_usec) > pgsm_bucket_time)
{
bucket_id = pgss->current_wbucket + 1;
if (bucket_id == max_buckets)
if (bucket_id == pgsm_max_buckets)
bucket_id = 0;
LWLockAcquire(pgss->lock, LW_EXCLUSIVE);
@@ -1589,6 +1418,7 @@ entry_reset()
{
hash_search(pgss_waiteventshash, &dbentry->key, HASH_REMOVE, NULL);
}
pgss->current_wbucket = 0;
free(pgssWaitEventEntries);
free(pgssBucketEntries);
LWLockRelease(pgss->lock);
@@ -2223,7 +2053,7 @@ JumbleExpr(pgssJumbleState *jstate, Node *node)
break;
default:
/* Only a warning, since we can stumble along anyway */
elog(WARNING, "unrecognized node type: %d",
elog(INFO, "unrecognized node type: %d",
(int) nodeTag(node));
break;
}
@@ -2670,28 +2500,30 @@ pg_stat_agg(PG_FUNCTION_ARGS)
return 0;
}
static unsigned long
locate_query(unsigned long bucket_id, unsigned long queryid, char * query)
#define FIFO_HEAD(b) pgss->query_fifo[b].head
#define FIFO_TAIL(b) pgss->query_fifo[b].tail
static uint64
locate_query(uint64 bucket_id, uint64 queryid, char * query)
{
unsigned long id = 0;
unsigned long len = 0;
unsigned long offset = 0;
unsigned long head = pgss->query_fifo[bucket_id].head;
unsigned long tail = pgss->query_fifo[bucket_id].tail;
uint64 id = 0;
uint64 len = 0;
uint64 offset = 0;
uint64 tail = FIFO_TAIL(bucket_id);
unsigned char *buf = pgss_qbuf[bucket_id];
while (head != tail)
while (FIFO_HEAD(bucket_id) != tail)
{
offset = 0;
memcpy(&id, &buf[tail + offset], sizeof (unsigned long)); /* query id */
memcpy(&id, &buf[tail + offset], sizeof (uint64)); /* query id */
offset += sizeof (unsigned long);
memcpy(&len, &buf[tail + offset], sizeof (unsigned long)); /* query len */
offset += sizeof (uint64);
memcpy(&len, &buf[tail + offset], sizeof (uint64)); /* query len */
if (len == 0)
return 0;
offset += sizeof (unsigned long);
offset += sizeof (uint64);
if (query != NULL)
{
memcpy(query, &buf[tail + offset], len); /* Actual query */
@@ -2701,19 +2533,19 @@ locate_query(unsigned long bucket_id, unsigned long queryid, char * query)
if (id == queryid)
return id;
tail = (tail + offset) % pgss_query_buf_size_bucket;
tail = (tail + offset) % query_buf_size_bucket;
}
return 0;
}
static void
store_query(unsigned long queryid, const char *query, unsigned long query_len)
store_query(uint64 queryid, const char *query, uint64 query_len)
{
int next;
int offset = 0;
if (query_len > pgss_query_max_len)
query_len = pgss_query_max_len;
if (query_len > pgsm_query_max_len)
query_len = pgsm_query_max_len;
/* Already have query in the shared buffer, there
* is no need to add that again.
@@ -2721,25 +2553,25 @@ store_query(unsigned long queryid, const char *query, unsigned long query_len)
if (locate_query(pgss->current_wbucket, queryid, NULL) == queryid)
return;
next = pgss->query_fifo[pgss->current_wbucket].head + query_len + sizeof (unsigned long) + sizeof (unsigned long);
if (next >= pgss_query_buf_size_bucket)
next = FIFO_HEAD(pgss->current_wbucket) + query_len + sizeof (uint64) + sizeof (uint64);
if (next >= query_buf_size_bucket)
next = 0;
/* Buffer is full */
if (next == pgss->query_fifo[pgss->current_wbucket].tail)
if (next == FIFO_HEAD(pgss->current_wbucket))
{
elog(DEBUG2, "pg_stat_monitor: no space left in shared_buffer");
elog(INFO, "pg_stat_monitor: no space left in shared_buffer");
return;
}
offset = 0;
memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head], &queryid, sizeof (unsigned long)); /* query id */
offset += sizeof (unsigned long);
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket)], &queryid, sizeof (uint64)); /* query id */
offset += sizeof (uint64);
memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head + offset], &query_len, sizeof (unsigned long)); /* query len */
offset += sizeof (unsigned long);
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], &query_len, sizeof (uint64)); /* query len */
offset += sizeof (uint64);
memcpy(&pgss_qbuf[pgss->current_wbucket][pgss->query_fifo[pgss->current_wbucket].head + offset], query, query_len); /* actual query */
memcpy(&pgss_qbuf[pgss->current_wbucket][FIFO_HEAD(pgss->current_wbucket) + offset], query, query_len); /* actual query */
pgss->query_fifo[pgss->current_wbucket].head = next;
}