Experimental backend for pg_dist_statements view.

pull/1314/head
Lukas Fittl 2017-04-07 19:00:43 -07:00
parent 8c5d0f686b
commit 2619b060ab
11 changed files with 779 additions and 3 deletions

View File

@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-1 6.2-2 6.2-3
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -18,7 +18,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
# directories with source files # directories with source files
SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker SUBDIRS = . commands connection executor master metadata planner relay stats test transaction utils worker
# That patsubst rule searches all directories listed in SUBDIRS for .c # That patsubst rule searches all directories listed in SUBDIRS for .c
# files, and adds the corresponding .o files to OBJS # files, and adds the corresponding .o files to OBJS
@ -134,6 +134,8 @@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sq
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql $(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.2-3.sql: $(EXTENSION)--6.2-2.sql $(EXTENSION)--6.2-2--6.2-3.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,22 @@
/* citus--6.2-1--6.2-2.sql */
SET search_path = 'pg_catalog';
CREATE OR REPLACE FUNCTION citus_statement_executors(OUT queryid bigint, OUT userid oid, OUT dbid oid, OUT executor bigint, OUT calls bigint)
RETURNS SETOF record
LANGUAGE C STRICT COST 1000
AS 'MODULE_PATHNAME', $$citus_statement_executors$$;
CREATE OR REPLACE FUNCTION citus_statement_executors_reset()
RETURNS void
LANGUAGE C STRICT COST 1000
AS 'MODULE_PATHNAME', $$citus_statement_executors_reset$$;
/*CREATE VIEW pg_dist_statements AS
SELECT CASE d.executor WHEN 1 THEN 'real-time' WHEN 2 THEN 'task-tracker' WHEN 3 THEN 'router' END AS executor,
s.userid, s.dbid, s.queryid, s.query, s.calls, s.total_time, s.min_time,
s.max_time, s.mean_time, s.stddev_time, s.rows
FROM citus_statement_executors() d
JOIN pg_stat_statements s USING (queryid, userid, dbid);*/
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.2-2' default_version = '6.2-3'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -22,6 +22,7 @@
#include "distributed/multi_resowner.h" #include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/stats_statement_executors.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "executor/execdebug.h" #include "executor/execdebug.h"
#include "commands/copy.h" #include "commands/copy.h"
@ -421,6 +422,9 @@ CitusEndScan(CustomScanState *node)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
StoreStatsStatementExecutorsEntry(scanState->multiPlan->queryId,
scanState->executorType);
if (scanState->tuplestorestate) if (scanState->tuplestorestate)
{ {
tuplestore_end(scanState->tuplestorestate); tuplestore_end(scanState->tuplestorestate);

View File

@ -420,6 +420,8 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
} }
} }
multiPlan->queryId = localPlan->queryId;
multiPlanData = SerializeMultiPlan(multiPlan); multiPlanData = SerializeMultiPlan(multiPlan);
customScan->custom_private = list_make1(multiPlanData); customScan->custom_private = list_make1(multiPlanData);

View File

@ -26,6 +26,7 @@
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_explain.h" #include "distributed/multi_explain.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
@ -36,6 +37,7 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/stats_statement_executors.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -163,6 +165,8 @@ _PG_init(void)
SetConfigOption("allow_system_table_mods", "true", PGC_POSTMASTER, SetConfigOption("allow_system_table_mods", "true", PGC_POSTMASTER,
PGC_S_OVERRIDE); PGC_S_OVERRIDE);
} }
InitializeStatsStatementExecutors();
} }

View File

@ -0,0 +1,720 @@
/*-------------------------------------------------------------------------
*
* stats_statements.c
* Statement-level statistics for distributed queries.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "distributed/stats_statement_executors.h"
#include "funcapi.h"
#include "storage/ipc.h"
#include "storage/fd.h"
#include "storage/spin.h"
#include "tcop/utility.h"
#include <unistd.h>
#define PGDS_DUMP_FILE "pg_stat/pg_dist_statements.stat"
#define PG_DIST_STATEMENTS_COLS 5
#define USAGE_DECREASE_FACTOR (0.99) /* decreased every StatStatementsEntryDealloc */
#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */
#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */
#define USAGE_INIT (1.0) /* including initial planning */
static const uint32 PGDS_FILE_HEADER = 0x0d756e0f;
static int pgds_max = 0; /* max #queries to store. pg_stat_statements.max is used */
/*
* Hashtable key that defines the identity of a hashtable entry. We use the
* same hash as pg_stat_statements
*/
typedef struct pgdsHashKey
{
Oid userid; /* user OID */
Oid dbid; /* database OID */
uint32 queryid; /* query identifier */
MultiExecutorType executorType; /* executor type */
} pgdsHashKey;
/*
* Statistics per query and executor type
*/
typedef struct pgdsEntry
{
pgdsHashKey key; /* hash key of entry - MUST BE FIRST */
int64 calls; /* # of times executed */
double usage; /* hashtable usage factor */
slock_t mutex; /* protects the counters only */
} pgdsEntry;
/*
* Global shared state
*/
typedef struct pgdsSharedState
{
LWLockId lock; /* protects hashtable search/modification */
double cur_median_usage; /* current median usage in hashtable */
} pgdsSharedState;
/* saved hook address in case of unload */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* Links to shared memory state */
static pgdsSharedState *pgds = NULL;
static HTAB *pgds_hash = NULL;
/*--- Functions --- */
Datum citus_statement_executors_reset(PG_FUNCTION_ARGS);
Datum citus_statement_executors(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(citus_statement_executors_reset);
PG_FUNCTION_INFO_V1(citus_statement_executors);
static void StatStatementsSetMax(void);
static Size StatStatementsMemSize(void);
static void StatStatementsShmemStartup(void);
static void StatStatementsShmemShudown(int code, Datum arg);
static pgdsEntry * StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky);
static void StatStatementsEntryDealloc(void);
static void StatStatementsEntryReset(void);
static uint32 StatStatementsHashFn(const void *key, Size keysize);
static int StatStatementsMatchFn(const void *key1, const void *key2, Size keysize);
void
InitializeStatsStatementExecutors(void)
{
/* set pgds_max if needed */
StatStatementsSetMax();
RequestAddinShmemSpace(StatStatementsMemSize());
#if PG_VERSION_NUM >= 90600
RequestNamedLWLockTranche("pg_dist_statements", 1);
#else
RequestAddinLWLocks(1);
#endif
/* Install hook */
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = StatStatementsShmemStartup;
}
static void
StatStatementsShmemStartup(void)
{
bool found;
HASHCTL info;
FILE *file;
int i;
uint32 header;
int32 num;
pgdsEntry *buffer = NULL;
if (prev_shmem_startup_hook)
{
prev_shmem_startup_hook();
}
/* reset in case this is a restart within the postmaster */
pgds = NULL;
/* Create or attach to the shared memory state */
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
/* global access lock */
pgds = ShmemInitStruct("pg_dist_statements",
sizeof(pgdsSharedState),
&found);
if (!found)
{
/* First time through ... */
#if PG_VERSION_NUM >= 90600
pgds->lock = &(GetNamedLWLockTranche("pg_dist_statements"))->lock;
#else
pgds->lock = LWLockAssign();
#endif
}
/* set pgds_max if needed */
StatStatementsSetMax();
memset(&info, 0, sizeof(info));
info.keysize = sizeof(pgdsHashKey);
info.entrysize = sizeof(pgdsEntry);
info.hash = StatStatementsHashFn;
info.match = StatStatementsMatchFn;
/* allocate stats shared memory hash */
pgds_hash = ShmemInitHash("pg_dist_statements hash",
pgds_max, pgds_max,
&info,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
LWLockRelease(AddinShmemInitLock);
if (!IsUnderPostmaster)
{
on_shmem_exit(StatStatementsShmemShudown, (Datum) 0);
}
/*
* Done if some other process already completed our initialization.
*/
if (found)
{
return;
}
/* Load stat file, don't care about locking */
file = AllocateFile(PGDS_DUMP_FILE, PG_BINARY_R);
if (file == NULL)
{
if (errno == ENOENT)
{
return; /* ignore not-found error */
}
goto error;
}
/* check is header is valid */
if (fread(&header, sizeof(uint32), 1, file) != 1 ||
header != PGDS_FILE_HEADER)
{
goto error;
}
/* get number of entries */
if (fread(&num, sizeof(int32), 1, file) != 1)
{
goto error;
}
for (i = 0; i < num; i++)
{
pgdsEntry temp;
pgdsEntry *entry;
if (fread(&temp, sizeof(pgdsEntry), 1, file) != 1)
{
goto error;
}
/* Skip loading "sticky" entries */
if (temp.calls == 0)
{
continue;
}
entry = StatStatementsEntryAlloc(&temp.key, false);
/* copy in the actual stats */
entry->calls = temp.calls;
entry->usage = temp.usage;
/* don't initialize spinlock, already done */
}
FreeFile(file);
/*
* Remove the file so it's not included in backups/replication slaves,
* etc. A new file will be written on next shutdown.
*/
unlink(PGDS_DUMP_FILE);
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read pg_dist_statements file \"%s\": %m",
PGDS_DUMP_FILE)));
if (buffer)
{
pfree(buffer);
}
if (file)
{
FreeFile(file);
}
/* delete bogus file, don't care of errors in this case */
unlink(PGDS_DUMP_FILE);
}
/*
* shmem_shutdown hook: dump statistics into file.
*
*/
static void
StatStatementsShmemShudown(int code, Datum arg)
{
FILE *file;
HASH_SEQ_STATUS hash_seq;
int32 num_entries;
pgdsEntry *entry;
/* Don't try to dump during a crash. */
if (code)
{
return;
}
if (!pgds)
{
return;
}
file = AllocateFile(PGDS_DUMP_FILE ".tmp", PG_BINARY_W);
if (file == NULL)
{
goto error;
}
if (fwrite(&PGDS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
{
goto error;
}
num_entries = hash_get_num_entries(pgds_hash);
if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
{
goto error;
}
hash_seq_init(&hash_seq, pgds_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
if (fwrite(entry, sizeof(pgdsEntry), 1, file) != 1)
{
/* note: we assume hash_seq_term won't change errno */
hash_seq_term(&hash_seq);
goto error;
}
}
if (FreeFile(file))
{
file = NULL;
goto error;
}
/*
* Rename file inplace
*/
if (rename(PGDS_DUMP_FILE ".tmp", PGDS_DUMP_FILE) != 0)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not rename pg_dist_statements file \"%s\": %m",
PGDS_DUMP_FILE ".tmp")));
}
return;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read pg_dist_statements file \"%s\": %m",
PGDS_DUMP_FILE)));
if (file)
{
FreeFile(file);
}
unlink(PGDS_DUMP_FILE);
}
/*
* Retrieve pg_stat_statement.max GUC value and store it into pgds_max, since
* we want to store the same number of entries as pg_stat_statements. Don't do
* anything if pgds_max is already set.
*/
static void
StatStatementsSetMax(void)
{
const char *pgss_max;
const char *name = "pg_stat_statements.max";
if (pgds_max != 0)
{
return;
}
pgss_max = GetConfigOption(name, true, false);
/*
* Retrieving pg_stat_statements.max can fail if pgss is loaded after citus
* in shared_preload_libraries, or not at all. Don't do any tracking in such
* a case.
*/
if (!pgss_max)
{
pgds_max = 1000;
}
else
{
pgds_max = atoi(pgss_max);
}
}
static Size
StatStatementsMemSize(void)
{
Size size;
Assert(pgds_max != 0);
size = MAXALIGN(sizeof(pgdsSharedState));
size = add_size(size, hash_estimate_size(pgds_max, sizeof(pgdsEntry)));
return size;
}
void
StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType)
{
volatile pgdsEntry *e;
pgdsHashKey key;
pgdsEntry *entry;
/* Safety check... */
if (!pgds || !pgds_hash)
{
return;
}
/* Set up key for hashtable search */
key.userid = GetUserId();
key.dbid = MyDatabaseId;
key.queryid = queryId;
key.executorType = executorType;
/* Lookup the hash table entry with shared lock. */
LWLockAcquire(pgds->lock, LW_SHARED);
entry = (pgdsEntry *) hash_search(pgds_hash, &key, HASH_FIND, NULL);
/* Create new entry, if not present */
if (!entry)
{
/* Need exclusive lock to make a new hashtable entry - promote */
LWLockRelease(pgds->lock);
LWLockAcquire(pgds->lock, LW_EXCLUSIVE);
/* OK to create a new hashtable entry */
entry = StatStatementsEntryAlloc(&key, false);
}
/*
* Grab the spinlock while updating the counters (see comment about
* locking rules at the head of the pg_stat_statements file)
*/
e = (volatile pgdsEntry *) entry;
SpinLockAcquire(&e->mutex);
/* "Unstick" entry if it was previously sticky */
if (e->calls == 0)
{
e->usage = USAGE_INIT;
}
e->calls += 1;
SpinLockRelease(&e->mutex);
LWLockRelease(pgds->lock);
}
/*
* Allocate a new hashtable entry.
* caller must hold an exclusive lock on pgds->lock
*/
static pgdsEntry *
StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky)
{
pgdsEntry *entry;
bool found;
/* Make space if needed */
while (hash_get_num_entries(pgds_hash) >= pgds_max)
{
StatStatementsEntryDealloc();
}
/* Find or create an entry with desired hash code */
entry = (pgdsEntry *) hash_search(pgds_hash, key, HASH_ENTER, &found);
if (!found)
{
/* New entry, initialize it */
/* set the appropriate initial usage count */
entry->usage = sticky ? pgds->cur_median_usage : USAGE_INIT;
/* re-initialize the mutex each time ... we assume no one using it */
SpinLockInit(&entry->mutex);
}
entry->calls = 0;
entry->usage = (0.0);
return entry;
}
/*
* qsort comparator for sorting into increasing usage order
*/
static int
entry_cmp(const void *lhs, const void *rhs)
{
double l_usage = (*(pgdsEntry *const *) lhs)->usage;
double r_usage = (*(pgdsEntry *const *) rhs)->usage;
if (l_usage < r_usage)
{
return -1;
}
else if (l_usage > r_usage)
{
return +1;
}
else
{
return 0;
}
}
/*
* Deallocate least used entries.
* Caller must hold an exclusive lock on pgds->lock.
*/
static void
StatStatementsEntryDealloc(void)
{
HASH_SEQ_STATUS hash_seq;
pgdsEntry **entries;
pgdsEntry *entry;
int nvictims;
int i;
/*
* Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
* While we're scanning the table, apply the decay factor to the usage
* values.
*/
entries = palloc(hash_get_num_entries(pgds_hash) * sizeof(pgdsEntry *));
i = 0;
hash_seq_init(&hash_seq, pgds_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
entries[i++] = entry;
/* "Sticky" entries get a different usage decay rate. */
if (entry->calls == 0)
{
entry->usage *= STICKY_DECREASE_FACTOR;
}
else
{
entry->usage *= USAGE_DECREASE_FACTOR;
}
}
qsort(entries, i, sizeof(pgdsEntry *), entry_cmp);
if (i > 0)
{
/* Record the (approximate) median usage */
pgds->cur_median_usage = entries[i / 2]->usage;
}
nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
nvictims = Min(nvictims, i);
for (i = 0; i < nvictims; i++)
{
hash_search(pgds_hash, &entries[i]->key, HASH_REMOVE, NULL);
}
pfree(entries);
}
static void
StatStatementsEntryReset(void)
{
HASH_SEQ_STATUS hash_seq;
pgdsEntry *entry;
LWLockAcquire(pgds->lock, LW_EXCLUSIVE);
hash_seq_init(&hash_seq, pgds_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(pgds_hash, &entry->key, HASH_REMOVE, NULL);
}
LWLockRelease(pgds->lock);
}
/*
* Calculate hash value for a key
*/
static uint32
StatStatementsHashFn(const void *key, Size keysize)
{
const pgdsHashKey *k = (const pgdsHashKey *) key;
return hash_uint32((uint32) k->userid) ^
hash_uint32((uint32) k->dbid) ^
hash_uint32((uint32) k->queryid) ^
hash_uint32((uint32) k->executorType);
}
/*
* Compare two keys - zero means match
*/
static int
StatStatementsMatchFn(const void *key1, const void *key2, Size keysize)
{
const pgdsHashKey *k1 = (const pgdsHashKey *) key1;
const pgdsHashKey *k2 = (const pgdsHashKey *) key2;
if (k1->userid == k2->userid &&
k1->dbid == k2->dbid &&
k1->queryid == k2->queryid &&
k1->executorType == k2->executorType)
{
return 0;
}
else
{
return 1;
}
}
/*
* Reset statistics.
*/
Datum
citus_statement_executors_reset(PG_FUNCTION_ARGS)
{
StatStatementsEntryReset();
PG_RETURN_VOID();
}
Datum
citus_statement_executors(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
HASH_SEQ_STATUS hash_seq;
pgdsEntry *entry;
if (!pgds)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_dist_statements: shared memory not initialized")));
}
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(rsinfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* Switch into long-lived context to construct returned data structures */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
LWLockAcquire(pgds->lock, LW_SHARED);
hash_seq_init(&hash_seq, pgds_hash);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
Datum values[PG_DIST_STATEMENTS_COLS];
bool nulls[PG_DIST_STATEMENTS_COLS];
int i = 0;
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
SpinLockAcquire(&entry->mutex);
/* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
if (entry->calls == 0)
{
SpinLockRelease(&entry->mutex);
continue;
}
values[i++] = Int64GetDatum(entry->key.queryid);
values[i++] = ObjectIdGetDatum(entry->key.userid);
values[i++] = ObjectIdGetDatum(entry->key.dbid);
values[i++] = ObjectIdGetDatum(entry->key.executorType);
values[i++] = Int64GetDatumFast(entry->calls);
SpinLockRelease(&entry->mutex);
Assert(i == PG_DIST_STATEMENTS_COLS);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
LWLockRelease(pgds->lock);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
return (Datum) 0;
}

View File

@ -282,6 +282,7 @@ OutMultiPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_BOOL_FIELD(routerExecutable); WRITE_BOOL_FIELD(routerExecutable);
WRITE_UINT_FIELD(queryId);
WRITE_NODE_FIELD(planningError); WRITE_NODE_FIELD(planningError);
} }

View File

@ -189,6 +189,7 @@ ReadMultiPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob); READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_BOOL_FIELD(routerExecutable); READ_BOOL_FIELD(routerExecutable);
READ_UINT_FIELD(queryId);
READ_NODE_FIELD(planningError); READ_NODE_FIELD(planningError);
READ_DONE(); READ_DONE();

View File

@ -220,6 +220,8 @@ typedef struct MultiPlan
Query *masterQuery; Query *masterQuery;
bool routerExecutable; bool routerExecutable;
uint32 queryId; /* query identifier (copied from the top-level PlannedStmt) */
/* /*
* NULL if this a valid plan, an error description otherwise. This will * NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support, * e.g. be set if SQL features are present that a planner doesn't support,

View File

@ -0,0 +1,18 @@
/*-------------------------------------------------------------------------
*
* stats_statements.h
* Statement-level statistics for distributed queries.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef STATS_STATEMENT_EXECUTORS_H
#define STATS_STATEMENT_EXECUTORS_H
#include "distributed/multi_server_executor.h"
void InitializeStatsStatementExecutors(void);
void StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType);
#endif /* STATS_STATEMENT_EXECUTORS_H */