From 2619b060ab194bb474167f0410047076051099bd Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Fri, 7 Apr 2017 19:00:43 -0700 Subject: [PATCH] Experimental backend for pg_dist_statements view. --- src/backend/distributed/Makefile | 6 +- .../distributed/citus--6.2-2--6.2-3.sql | 22 + src/backend/distributed/citus.control | 2 +- .../distributed/executor/multi_executor.c | 4 + .../distributed/planner/multi_planner.c | 2 + src/backend/distributed/shared_library_init.c | 4 + .../distributed/stats/statement_executors.c | 720 ++++++++++++++++++ .../distributed/utils/citus_outfuncs.c | 1 + .../distributed/utils/citus_readfuncs.c | 1 + .../distributed/multi_physical_planner.h | 2 + .../distributed/stats_statement_executors.h | 18 + 11 files changed, 779 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--6.2-2--6.2-3.sql create mode 100644 src/backend/distributed/stats/statement_executors.c create mode 100644 src/include/distributed/stats_statement_executors.h diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 75c099e3b..bc749a3a7 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ - 6.2-1 6.2-2 + 6.2-1 6.2-2 6.2-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -18,7 +18,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir) DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) # directories with source files -SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker +SUBDIRS = . commands connection executor master metadata planner relay stats test transaction utils worker # That patsubst rule searches all directories listed in SUBDIRS for .c # files, and adds the corresponding .o files to OBJS @@ -134,6 +134,8 @@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sq cat $^ > $@ $(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql cat $^ > $@ +$(EXTENSION)--6.2-3.sql: $(EXTENSION)--6.2-2.sql $(EXTENSION)--6.2-2--6.2-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.2-2--6.2-3.sql b/src/backend/distributed/citus--6.2-2--6.2-3.sql new file mode 100644 index 000000000..d4baf8b61 --- /dev/null +++ b/src/backend/distributed/citus--6.2-2--6.2-3.sql @@ -0,0 +1,22 @@ +/* citus--6.2-1--6.2-2.sql */ + +SET search_path = 'pg_catalog'; + +CREATE OR REPLACE FUNCTION citus_statement_executors(OUT queryid bigint, OUT userid oid, OUT dbid oid, OUT executor bigint, OUT calls bigint) + RETURNS SETOF record + LANGUAGE C STRICT COST 1000 + AS 'MODULE_PATHNAME', $$citus_statement_executors$$; + +CREATE OR REPLACE FUNCTION citus_statement_executors_reset() + RETURNS void + LANGUAGE C STRICT COST 1000 + AS 'MODULE_PATHNAME', $$citus_statement_executors_reset$$; + +/*CREATE VIEW pg_dist_statements AS +SELECT CASE d.executor WHEN 1 THEN 'real-time' WHEN 2 THEN 'task-tracker' WHEN 3 THEN 'router' END AS executor, + s.userid, s.dbid, s.queryid, s.query, s.calls, s.total_time, s.min_time, + s.max_time, s.mean_time, s.stddev_time, s.rows + FROM citus_statement_executors() d + JOIN pg_stat_statements s USING (queryid, userid, dbid);*/ + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 42fc2f856..b34908f53 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.2-2' +default_version = '6.2-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index dd2c26733..77d075e64 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -22,6 +22,7 @@ #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/stats_statement_executors.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" #include "commands/copy.h" @@ -421,6 +422,9 @@ CitusEndScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; + StoreStatsStatementExecutorsEntry(scanState->multiPlan->queryId, + scanState->executorType); + if (scanState->tuplestorestate) { tuplestore_end(scanState->tuplestorestate); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index d81776541..b97b7cd98 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -420,6 +420,8 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } + multiPlan->queryId = localPlan->queryId; + multiPlanData = SerializeMultiPlan(multiPlan); customScan->custom_private = list_make1(multiPlanData); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c3dcf4e88..11b1a6646 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -26,6 +26,7 @@ #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_explain.h" +#include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_planner.h" @@ -36,6 +37,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/remote_commands.h" +#include "distributed/stats_statement_executors.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" @@ -163,6 +165,8 @@ _PG_init(void) SetConfigOption("allow_system_table_mods", "true", PGC_POSTMASTER, PGC_S_OVERRIDE); } + + InitializeStatsStatementExecutors(); } diff --git a/src/backend/distributed/stats/statement_executors.c b/src/backend/distributed/stats/statement_executors.c new file mode 100644 index 000000000..7302bdb33 --- /dev/null +++ b/src/backend/distributed/stats/statement_executors.c @@ -0,0 +1,720 @@ +/*------------------------------------------------------------------------- + * + * stats_statements.c + * Statement-level statistics for distributed queries. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/stats_statement_executors.h" +#include "funcapi.h" +#include "storage/ipc.h" +#include "storage/fd.h" +#include "storage/spin.h" +#include "tcop/utility.h" + +#include + +#define PGDS_DUMP_FILE "pg_stat/pg_dist_statements.stat" +#define PG_DIST_STATEMENTS_COLS 5 +#define USAGE_DECREASE_FACTOR (0.99) /* decreased every StatStatementsEntryDealloc */ +#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */ +#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */ +#define USAGE_INIT (1.0) /* including initial planning */ + +static const uint32 PGDS_FILE_HEADER = 0x0d756e0f; +static int pgds_max = 0; /* max #queries to store. pg_stat_statements.max is used */ + +/* + * Hashtable key that defines the identity of a hashtable entry. We use the + * same hash as pg_stat_statements + */ +typedef struct pgdsHashKey +{ + Oid userid; /* user OID */ + Oid dbid; /* database OID */ + uint32 queryid; /* query identifier */ + MultiExecutorType executorType; /* executor type */ +} pgdsHashKey; + +/* + * Statistics per query and executor type + */ +typedef struct pgdsEntry +{ + pgdsHashKey key; /* hash key of entry - MUST BE FIRST */ + int64 calls; /* # of times executed */ + double usage; /* hashtable usage factor */ + slock_t mutex; /* protects the counters only */ +} pgdsEntry; + +/* + * Global shared state + */ +typedef struct pgdsSharedState +{ + LWLockId lock; /* protects hashtable search/modification */ + double cur_median_usage; /* current median usage in hashtable */ +} pgdsSharedState; + +/* saved hook address in case of unload */ +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +/* Links to shared memory state */ +static pgdsSharedState *pgds = NULL; +static HTAB *pgds_hash = NULL; + +/*--- Functions --- */ + +Datum citus_statement_executors_reset(PG_FUNCTION_ARGS); +Datum citus_statement_executors(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(citus_statement_executors_reset); +PG_FUNCTION_INFO_V1(citus_statement_executors); + +static void StatStatementsSetMax(void); +static Size StatStatementsMemSize(void); + +static void StatStatementsShmemStartup(void); +static void StatStatementsShmemShudown(int code, Datum arg); +static pgdsEntry * StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky); +static void StatStatementsEntryDealloc(void); +static void StatStatementsEntryReset(void); +static uint32 StatStatementsHashFn(const void *key, Size keysize); +static int StatStatementsMatchFn(const void *key1, const void *key2, Size keysize); + +void +InitializeStatsStatementExecutors(void) +{ + /* set pgds_max if needed */ + StatStatementsSetMax(); + RequestAddinShmemSpace(StatStatementsMemSize()); +#if PG_VERSION_NUM >= 90600 + RequestNamedLWLockTranche("pg_dist_statements", 1); +#else + RequestAddinLWLocks(1); +#endif + + /* Install hook */ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = StatStatementsShmemStartup; +} + + +static void +StatStatementsShmemStartup(void) +{ + bool found; + HASHCTL info; + FILE *file; + int i; + uint32 header; + int32 num; + pgdsEntry *buffer = NULL; + + if (prev_shmem_startup_hook) + { + prev_shmem_startup_hook(); + } + + /* reset in case this is a restart within the postmaster */ + pgds = NULL; + + /* Create or attach to the shared memory state */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + /* global access lock */ + pgds = ShmemInitStruct("pg_dist_statements", + sizeof(pgdsSharedState), + &found); + + if (!found) + { + /* First time through ... */ +#if PG_VERSION_NUM >= 90600 + pgds->lock = &(GetNamedLWLockTranche("pg_dist_statements"))->lock; +#else + pgds->lock = LWLockAssign(); +#endif + } + + /* set pgds_max if needed */ + StatStatementsSetMax(); + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(pgdsHashKey); + info.entrysize = sizeof(pgdsEntry); + info.hash = StatStatementsHashFn; + info.match = StatStatementsMatchFn; + + /* allocate stats shared memory hash */ + pgds_hash = ShmemInitHash("pg_dist_statements hash", + pgds_max, pgds_max, + &info, + HASH_ELEM | HASH_FUNCTION | HASH_COMPARE); + + LWLockRelease(AddinShmemInitLock); + + if (!IsUnderPostmaster) + { + on_shmem_exit(StatStatementsShmemShudown, (Datum) 0); + } + + /* + * Done if some other process already completed our initialization. + */ + if (found) + { + return; + } + + /* Load stat file, don't care about locking */ + file = AllocateFile(PGDS_DUMP_FILE, PG_BINARY_R); + if (file == NULL) + { + if (errno == ENOENT) + { + return; /* ignore not-found error */ + } + goto error; + } + + /* check is header is valid */ + if (fread(&header, sizeof(uint32), 1, file) != 1 || + header != PGDS_FILE_HEADER) + { + goto error; + } + + /* get number of entries */ + if (fread(&num, sizeof(int32), 1, file) != 1) + { + goto error; + } + + for (i = 0; i < num; i++) + { + pgdsEntry temp; + pgdsEntry *entry; + + if (fread(&temp, sizeof(pgdsEntry), 1, file) != 1) + { + goto error; + } + + /* Skip loading "sticky" entries */ + if (temp.calls == 0) + { + continue; + } + + entry = StatStatementsEntryAlloc(&temp.key, false); + + /* copy in the actual stats */ + entry->calls = temp.calls; + entry->usage = temp.usage; + + /* don't initialize spinlock, already done */ + } + + FreeFile(file); + + /* + * Remove the file so it's not included in backups/replication slaves, + * etc. A new file will be written on next shutdown. + */ + unlink(PGDS_DUMP_FILE); + + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read pg_dist_statements file \"%s\": %m", + PGDS_DUMP_FILE))); + if (buffer) + { + pfree(buffer); + } + if (file) + { + FreeFile(file); + } + + /* delete bogus file, don't care of errors in this case */ + unlink(PGDS_DUMP_FILE); +} + + +/* + * shmem_shutdown hook: dump statistics into file. + * + */ +static void +StatStatementsShmemShudown(int code, Datum arg) +{ + FILE *file; + HASH_SEQ_STATUS hash_seq; + int32 num_entries; + pgdsEntry *entry; + + /* Don't try to dump during a crash. */ + if (code) + { + return; + } + + if (!pgds) + { + return; + } + + file = AllocateFile(PGDS_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + { + goto error; + } + + if (fwrite(&PGDS_FILE_HEADER, sizeof(uint32), 1, file) != 1) + { + goto error; + } + + num_entries = hash_get_num_entries(pgds_hash); + + if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) + { + goto error; + } + + hash_seq_init(&hash_seq, pgds_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (fwrite(entry, sizeof(pgdsEntry), 1, file) != 1) + { + /* note: we assume hash_seq_term won't change errno */ + hash_seq_term(&hash_seq); + goto error; + } + } + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file inplace + */ + if (rename(PGDS_DUMP_FILE ".tmp", PGDS_DUMP_FILE) != 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not rename pg_dist_statements file \"%s\": %m", + PGDS_DUMP_FILE ".tmp"))); + } + + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read pg_dist_statements file \"%s\": %m", + PGDS_DUMP_FILE))); + + if (file) + { + FreeFile(file); + } + unlink(PGDS_DUMP_FILE); +} + + +/* + * Retrieve pg_stat_statement.max GUC value and store it into pgds_max, since + * we want to store the same number of entries as pg_stat_statements. Don't do + * anything if pgds_max is already set. + */ +static void +StatStatementsSetMax(void) +{ + const char *pgss_max; + const char *name = "pg_stat_statements.max"; + + if (pgds_max != 0) + { + return; + } + + pgss_max = GetConfigOption(name, true, false); + + /* + * Retrieving pg_stat_statements.max can fail if pgss is loaded after citus + * in shared_preload_libraries, or not at all. Don't do any tracking in such + * a case. + */ + if (!pgss_max) + { + pgds_max = 1000; + } + else + { + pgds_max = atoi(pgss_max); + } +} + + +static Size +StatStatementsMemSize(void) +{ + Size size; + + Assert(pgds_max != 0); + + size = MAXALIGN(sizeof(pgdsSharedState)); + size = add_size(size, hash_estimate_size(pgds_max, sizeof(pgdsEntry))); + + return size; +} + + +void +StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType) +{ + volatile pgdsEntry *e; + + pgdsHashKey key; + pgdsEntry *entry; + + /* Safety check... */ + if (!pgds || !pgds_hash) + { + return; + } + + /* Set up key for hashtable search */ + key.userid = GetUserId(); + key.dbid = MyDatabaseId; + key.queryid = queryId; + key.executorType = executorType; + + /* Lookup the hash table entry with shared lock. */ + LWLockAcquire(pgds->lock, LW_SHARED); + + entry = (pgdsEntry *) hash_search(pgds_hash, &key, HASH_FIND, NULL); + + /* Create new entry, if not present */ + if (!entry) + { + /* Need exclusive lock to make a new hashtable entry - promote */ + LWLockRelease(pgds->lock); + LWLockAcquire(pgds->lock, LW_EXCLUSIVE); + + /* OK to create a new hashtable entry */ + entry = StatStatementsEntryAlloc(&key, false); + } + + /* + * Grab the spinlock while updating the counters (see comment about + * locking rules at the head of the pg_stat_statements file) + */ + e = (volatile pgdsEntry *) entry; + + SpinLockAcquire(&e->mutex); + + /* "Unstick" entry if it was previously sticky */ + if (e->calls == 0) + { + e->usage = USAGE_INIT; + } + + e->calls += 1; + + SpinLockRelease(&e->mutex); + + LWLockRelease(pgds->lock); +} + + +/* + * Allocate a new hashtable entry. + * caller must hold an exclusive lock on pgds->lock + */ +static pgdsEntry * +StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky) +{ + pgdsEntry *entry; + bool found; + + /* Make space if needed */ + while (hash_get_num_entries(pgds_hash) >= pgds_max) + { + StatStatementsEntryDealloc(); + } + + /* Find or create an entry with desired hash code */ + entry = (pgdsEntry *) hash_search(pgds_hash, key, HASH_ENTER, &found); + + if (!found) + { + /* New entry, initialize it */ + + /* set the appropriate initial usage count */ + entry->usage = sticky ? pgds->cur_median_usage : USAGE_INIT; + + /* re-initialize the mutex each time ... we assume no one using it */ + SpinLockInit(&entry->mutex); + } + + entry->calls = 0; + entry->usage = (0.0); + + return entry; +} + + +/* + * qsort comparator for sorting into increasing usage order + */ +static int +entry_cmp(const void *lhs, const void *rhs) +{ + double l_usage = (*(pgdsEntry *const *) lhs)->usage; + double r_usage = (*(pgdsEntry *const *) rhs)->usage; + + if (l_usage < r_usage) + { + return -1; + } + else if (l_usage > r_usage) + { + return +1; + } + else + { + return 0; + } +} + + +/* + * Deallocate least used entries. + * Caller must hold an exclusive lock on pgds->lock. + */ +static void +StatStatementsEntryDealloc(void) +{ + HASH_SEQ_STATUS hash_seq; + pgdsEntry **entries; + pgdsEntry *entry; + int nvictims; + int i; + + /* + * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them. + * While we're scanning the table, apply the decay factor to the usage + * values. + */ + entries = palloc(hash_get_num_entries(pgds_hash) * sizeof(pgdsEntry *)); + + i = 0; + hash_seq_init(&hash_seq, pgds_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + entries[i++] = entry; + + /* "Sticky" entries get a different usage decay rate. */ + if (entry->calls == 0) + { + entry->usage *= STICKY_DECREASE_FACTOR; + } + else + { + entry->usage *= USAGE_DECREASE_FACTOR; + } + } + + qsort(entries, i, sizeof(pgdsEntry *), entry_cmp); + + if (i > 0) + { + /* Record the (approximate) median usage */ + pgds->cur_median_usage = entries[i / 2]->usage; + } + + nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100); + nvictims = Min(nvictims, i); + + for (i = 0; i < nvictims; i++) + { + hash_search(pgds_hash, &entries[i]->key, HASH_REMOVE, NULL); + } + + pfree(entries); +} + + +static void +StatStatementsEntryReset(void) +{ + HASH_SEQ_STATUS hash_seq; + pgdsEntry *entry; + + LWLockAcquire(pgds->lock, LW_EXCLUSIVE); + + hash_seq_init(&hash_seq, pgds_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + hash_search(pgds_hash, &entry->key, HASH_REMOVE, NULL); + } + + LWLockRelease(pgds->lock); +} + + +/* + * Calculate hash value for a key + */ +static uint32 +StatStatementsHashFn(const void *key, Size keysize) +{ + const pgdsHashKey *k = (const pgdsHashKey *) key; + + return hash_uint32((uint32) k->userid) ^ + hash_uint32((uint32) k->dbid) ^ + hash_uint32((uint32) k->queryid) ^ + hash_uint32((uint32) k->executorType); +} + + +/* + * Compare two keys - zero means match + */ +static int +StatStatementsMatchFn(const void *key1, const void *key2, Size keysize) +{ + const pgdsHashKey *k1 = (const pgdsHashKey *) key1; + const pgdsHashKey *k2 = (const pgdsHashKey *) key2; + + if (k1->userid == k2->userid && + k1->dbid == k2->dbid && + k1->queryid == k2->queryid && + k1->executorType == k2->executorType) + { + return 0; + } + else + { + return 1; + } +} + + +/* + * Reset statistics. + */ +Datum +citus_statement_executors_reset(PG_FUNCTION_ARGS) +{ + StatStatementsEntryReset(); + PG_RETURN_VOID(); +} + + +Datum +citus_statement_executors(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + HASH_SEQ_STATUS hash_seq; + pgdsEntry *entry; + + if (!pgds) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_dist_statements: shared memory not initialized"))); + } + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Switch into long-lived context to construct returned data structures */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + LWLockAcquire(pgds->lock, LW_SHARED); + + hash_seq_init(&hash_seq, pgds_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + Datum values[PG_DIST_STATEMENTS_COLS]; + bool nulls[PG_DIST_STATEMENTS_COLS]; + int i = 0; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + SpinLockAcquire(&entry->mutex); + + /* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */ + if (entry->calls == 0) + { + SpinLockRelease(&entry->mutex); + continue; + } + values[i++] = Int64GetDatum(entry->key.queryid); + values[i++] = ObjectIdGetDatum(entry->key.userid); + values[i++] = ObjectIdGetDatum(entry->key.dbid); + values[i++] = ObjectIdGetDatum(entry->key.executorType); + values[i++] = Int64GetDatumFast(entry->calls); + SpinLockRelease(&entry->mutex); + + Assert(i == PG_DIST_STATEMENTS_COLS); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + LWLockRelease(pgds->lock); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 8aa485857..7c6c3e363 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -282,6 +282,7 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); WRITE_BOOL_FIELD(routerExecutable); + WRITE_UINT_FIELD(queryId); WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 37c9245fa..6c3d02f4d 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -189,6 +189,7 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); READ_BOOL_FIELD(routerExecutable); + READ_UINT_FIELD(queryId); READ_NODE_FIELD(planningError); READ_DONE(); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 0cf340899..3aa918978 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -220,6 +220,8 @@ typedef struct MultiPlan Query *masterQuery; bool routerExecutable; + uint32 queryId; /* query identifier (copied from the top-level PlannedStmt) */ + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, diff --git a/src/include/distributed/stats_statement_executors.h b/src/include/distributed/stats_statement_executors.h new file mode 100644 index 000000000..62479e96a --- /dev/null +++ b/src/include/distributed/stats_statement_executors.h @@ -0,0 +1,18 @@ +/*------------------------------------------------------------------------- + * + * stats_statements.h + * Statement-level statistics for distributed queries. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef STATS_STATEMENT_EXECUTORS_H +#define STATS_STATEMENT_EXECUTORS_H + +#include "distributed/multi_server_executor.h" + +void InitializeStatsStatementExecutors(void); +void StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType); + +#endif /* STATS_STATEMENT_EXECUTORS_H */