mirror of https://github.com/citusdata/citus.git
Merge 2619b060ab
into 7097336972
commit
b30da65f49
|
@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||
6.2-1 6.2-2
|
||||
6.2-1 6.2-2 6.2-3
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -18,7 +18,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
|
|||
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
|
||||
|
||||
# directories with source files
|
||||
SUBDIRS = . commands connection executor master metadata planner relay test transaction utils worker
|
||||
SUBDIRS = . commands connection executor master metadata planner relay stats test transaction utils worker
|
||||
|
||||
# That patsubst rule searches all directories listed in SUBDIRS for .c
|
||||
# files, and adds the corresponding .o files to OBJS
|
||||
|
@ -134,6 +134,8 @@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sq
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--6.2-3.sql: $(EXTENSION)--6.2-2.sql $(EXTENSION)--6.2-2--6.2-3.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/* citus--6.2-1--6.2-2.sql */
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_statement_executors(OUT queryid bigint, OUT userid oid, OUT dbid oid, OUT executor bigint, OUT calls bigint)
|
||||
RETURNS SETOF record
|
||||
LANGUAGE C STRICT COST 1000
|
||||
AS 'MODULE_PATHNAME', $$citus_statement_executors$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION citus_statement_executors_reset()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT COST 1000
|
||||
AS 'MODULE_PATHNAME', $$citus_statement_executors_reset$$;
|
||||
|
||||
/*CREATE VIEW pg_dist_statements AS
|
||||
SELECT CASE d.executor WHEN 1 THEN 'real-time' WHEN 2 THEN 'task-tracker' WHEN 3 THEN 'router' END AS executor,
|
||||
s.userid, s.dbid, s.queryid, s.query, s.calls, s.total_time, s.min_time,
|
||||
s.max_time, s.mean_time, s.stddev_time, s.rows
|
||||
FROM citus_statement_executors() d
|
||||
JOIN pg_stat_statements s USING (queryid, userid, dbid);*/
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '6.2-2'
|
||||
default_version = '6.2-3'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "distributed/multi_resowner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/stats_statement_executors.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "executor/execdebug.h"
|
||||
#include "commands/copy.h"
|
||||
|
@ -421,6 +422,9 @@ CitusEndScan(CustomScanState *node)
|
|||
{
|
||||
CitusScanState *scanState = (CitusScanState *) node;
|
||||
|
||||
StoreStatsStatementExecutorsEntry(scanState->multiPlan->queryId,
|
||||
scanState->executorType);
|
||||
|
||||
if (scanState->tuplestorestate)
|
||||
{
|
||||
tuplestore_end(scanState->tuplestorestate);
|
||||
|
|
|
@ -499,6 +499,8 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
|
|||
}
|
||||
}
|
||||
|
||||
multiPlan->queryId = localPlan->queryId;
|
||||
|
||||
multiPlanData = SerializeMultiPlan(multiPlan);
|
||||
|
||||
customScan->custom_private = list_make1(multiPlanData);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
|
@ -36,6 +37,7 @@
|
|||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/stats_statement_executors.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -164,6 +166,8 @@ _PG_init(void)
|
|||
SetConfigOption("allow_system_table_mods", "true", PGC_POSTMASTER,
|
||||
PGC_S_OVERRIDE);
|
||||
}
|
||||
|
||||
InitializeStatsStatementExecutors();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,720 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* stats_statements.c
|
||||
* Statement-level statistics for distributed queries.
|
||||
*
|
||||
* Copyright (c) 2017, Citus Data, Inc.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
#include "distributed/stats_statement_executors.h"
|
||||
#include "funcapi.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/spin.h"
|
||||
#include "tcop/utility.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#define PGDS_DUMP_FILE "pg_stat/pg_dist_statements.stat"
|
||||
#define PG_DIST_STATEMENTS_COLS 5
|
||||
#define USAGE_DECREASE_FACTOR (0.99) /* decreased every StatStatementsEntryDealloc */
|
||||
#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */
|
||||
#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */
|
||||
#define USAGE_INIT (1.0) /* including initial planning */
|
||||
|
||||
static const uint32 PGDS_FILE_HEADER = 0x0d756e0f;
|
||||
static int pgds_max = 0; /* max #queries to store. pg_stat_statements.max is used */
|
||||
|
||||
/*
|
||||
* Hashtable key that defines the identity of a hashtable entry. We use the
|
||||
* same hash as pg_stat_statements
|
||||
*/
|
||||
typedef struct pgdsHashKey
|
||||
{
|
||||
Oid userid; /* user OID */
|
||||
Oid dbid; /* database OID */
|
||||
uint32 queryid; /* query identifier */
|
||||
MultiExecutorType executorType; /* executor type */
|
||||
} pgdsHashKey;
|
||||
|
||||
/*
|
||||
* Statistics per query and executor type
|
||||
*/
|
||||
typedef struct pgdsEntry
|
||||
{
|
||||
pgdsHashKey key; /* hash key of entry - MUST BE FIRST */
|
||||
int64 calls; /* # of times executed */
|
||||
double usage; /* hashtable usage factor */
|
||||
slock_t mutex; /* protects the counters only */
|
||||
} pgdsEntry;
|
||||
|
||||
/*
|
||||
* Global shared state
|
||||
*/
|
||||
typedef struct pgdsSharedState
|
||||
{
|
||||
LWLockId lock; /* protects hashtable search/modification */
|
||||
double cur_median_usage; /* current median usage in hashtable */
|
||||
} pgdsSharedState;
|
||||
|
||||
/* saved hook address in case of unload */
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
/* Links to shared memory state */
|
||||
static pgdsSharedState *pgds = NULL;
|
||||
static HTAB *pgds_hash = NULL;
|
||||
|
||||
/*--- Functions --- */
|
||||
|
||||
Datum citus_statement_executors_reset(PG_FUNCTION_ARGS);
|
||||
Datum citus_statement_executors(PG_FUNCTION_ARGS);
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_statement_executors_reset);
|
||||
PG_FUNCTION_INFO_V1(citus_statement_executors);
|
||||
|
||||
static void StatStatementsSetMax(void);
|
||||
static Size StatStatementsMemSize(void);
|
||||
|
||||
static void StatStatementsShmemStartup(void);
|
||||
static void StatStatementsShmemShudown(int code, Datum arg);
|
||||
static pgdsEntry * StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky);
|
||||
static void StatStatementsEntryDealloc(void);
|
||||
static void StatStatementsEntryReset(void);
|
||||
static uint32 StatStatementsHashFn(const void *key, Size keysize);
|
||||
static int StatStatementsMatchFn(const void *key1, const void *key2, Size keysize);
|
||||
|
||||
void
|
||||
InitializeStatsStatementExecutors(void)
|
||||
{
|
||||
/* set pgds_max if needed */
|
||||
StatStatementsSetMax();
|
||||
RequestAddinShmemSpace(StatStatementsMemSize());
|
||||
#if PG_VERSION_NUM >= 90600
|
||||
RequestNamedLWLockTranche("pg_dist_statements", 1);
|
||||
#else
|
||||
RequestAddinLWLocks(1);
|
||||
#endif
|
||||
|
||||
/* Install hook */
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = StatStatementsShmemStartup;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
StatStatementsShmemStartup(void)
|
||||
{
|
||||
bool found;
|
||||
HASHCTL info;
|
||||
FILE *file;
|
||||
int i;
|
||||
uint32 header;
|
||||
int32 num;
|
||||
pgdsEntry *buffer = NULL;
|
||||
|
||||
if (prev_shmem_startup_hook)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
|
||||
/* reset in case this is a restart within the postmaster */
|
||||
pgds = NULL;
|
||||
|
||||
/* Create or attach to the shared memory state */
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
/* global access lock */
|
||||
pgds = ShmemInitStruct("pg_dist_statements",
|
||||
sizeof(pgdsSharedState),
|
||||
&found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* First time through ... */
|
||||
#if PG_VERSION_NUM >= 90600
|
||||
pgds->lock = &(GetNamedLWLockTranche("pg_dist_statements"))->lock;
|
||||
#else
|
||||
pgds->lock = LWLockAssign();
|
||||
#endif
|
||||
}
|
||||
|
||||
/* set pgds_max if needed */
|
||||
StatStatementsSetMax();
|
||||
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(pgdsHashKey);
|
||||
info.entrysize = sizeof(pgdsEntry);
|
||||
info.hash = StatStatementsHashFn;
|
||||
info.match = StatStatementsMatchFn;
|
||||
|
||||
/* allocate stats shared memory hash */
|
||||
pgds_hash = ShmemInitHash("pg_dist_statements hash",
|
||||
pgds_max, pgds_max,
|
||||
&info,
|
||||
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
|
||||
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
if (!IsUnderPostmaster)
|
||||
{
|
||||
on_shmem_exit(StatStatementsShmemShudown, (Datum) 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Done if some other process already completed our initialization.
|
||||
*/
|
||||
if (found)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* Load stat file, don't care about locking */
|
||||
file = AllocateFile(PGDS_DUMP_FILE, PG_BINARY_R);
|
||||
if (file == NULL)
|
||||
{
|
||||
if (errno == ENOENT)
|
||||
{
|
||||
return; /* ignore not-found error */
|
||||
}
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* check is header is valid */
|
||||
if (fread(&header, sizeof(uint32), 1, file) != 1 ||
|
||||
header != PGDS_FILE_HEADER)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* get number of entries */
|
||||
if (fread(&num, sizeof(int32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
for (i = 0; i < num; i++)
|
||||
{
|
||||
pgdsEntry temp;
|
||||
pgdsEntry *entry;
|
||||
|
||||
if (fread(&temp, sizeof(pgdsEntry), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Skip loading "sticky" entries */
|
||||
if (temp.calls == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
entry = StatStatementsEntryAlloc(&temp.key, false);
|
||||
|
||||
/* copy in the actual stats */
|
||||
entry->calls = temp.calls;
|
||||
entry->usage = temp.usage;
|
||||
|
||||
/* don't initialize spinlock, already done */
|
||||
}
|
||||
|
||||
FreeFile(file);
|
||||
|
||||
/*
|
||||
* Remove the file so it's not included in backups/replication slaves,
|
||||
* etc. A new file will be written on next shutdown.
|
||||
*/
|
||||
unlink(PGDS_DUMP_FILE);
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read pg_dist_statements file \"%s\": %m",
|
||||
PGDS_DUMP_FILE)));
|
||||
if (buffer)
|
||||
{
|
||||
pfree(buffer);
|
||||
}
|
||||
if (file)
|
||||
{
|
||||
FreeFile(file);
|
||||
}
|
||||
|
||||
/* delete bogus file, don't care of errors in this case */
|
||||
unlink(PGDS_DUMP_FILE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* shmem_shutdown hook: dump statistics into file.
|
||||
*
|
||||
*/
|
||||
static void
|
||||
StatStatementsShmemShudown(int code, Datum arg)
|
||||
{
|
||||
FILE *file;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
int32 num_entries;
|
||||
pgdsEntry *entry;
|
||||
|
||||
/* Don't try to dump during a crash. */
|
||||
if (code)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pgds)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
file = AllocateFile(PGDS_DUMP_FILE ".tmp", PG_BINARY_W);
|
||||
if (file == NULL)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (fwrite(&PGDS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
num_entries = hash_get_num_entries(pgds_hash);
|
||||
|
||||
if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
hash_seq_init(&hash_seq, pgds_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
if (fwrite(entry, sizeof(pgdsEntry), 1, file) != 1)
|
||||
{
|
||||
/* note: we assume hash_seq_term won't change errno */
|
||||
hash_seq_term(&hash_seq);
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
if (FreeFile(file))
|
||||
{
|
||||
file = NULL;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/*
|
||||
* Rename file inplace
|
||||
*/
|
||||
if (rename(PGDS_DUMP_FILE ".tmp", PGDS_DUMP_FILE) != 0)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not rename pg_dist_statements file \"%s\": %m",
|
||||
PGDS_DUMP_FILE ".tmp")));
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
ereport(LOG,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read pg_dist_statements file \"%s\": %m",
|
||||
PGDS_DUMP_FILE)));
|
||||
|
||||
if (file)
|
||||
{
|
||||
FreeFile(file);
|
||||
}
|
||||
unlink(PGDS_DUMP_FILE);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Retrieve pg_stat_statement.max GUC value and store it into pgds_max, since
|
||||
* we want to store the same number of entries as pg_stat_statements. Don't do
|
||||
* anything if pgds_max is already set.
|
||||
*/
|
||||
static void
|
||||
StatStatementsSetMax(void)
|
||||
{
|
||||
const char *pgss_max;
|
||||
const char *name = "pg_stat_statements.max";
|
||||
|
||||
if (pgds_max != 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
pgss_max = GetConfigOption(name, true, false);
|
||||
|
||||
/*
|
||||
* Retrieving pg_stat_statements.max can fail if pgss is loaded after citus
|
||||
* in shared_preload_libraries, or not at all. Don't do any tracking in such
|
||||
* a case.
|
||||
*/
|
||||
if (!pgss_max)
|
||||
{
|
||||
pgds_max = 1000;
|
||||
}
|
||||
else
|
||||
{
|
||||
pgds_max = atoi(pgss_max);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static Size
|
||||
StatStatementsMemSize(void)
|
||||
{
|
||||
Size size;
|
||||
|
||||
Assert(pgds_max != 0);
|
||||
|
||||
size = MAXALIGN(sizeof(pgdsSharedState));
|
||||
size = add_size(size, hash_estimate_size(pgds_max, sizeof(pgdsEntry)));
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType)
|
||||
{
|
||||
volatile pgdsEntry *e;
|
||||
|
||||
pgdsHashKey key;
|
||||
pgdsEntry *entry;
|
||||
|
||||
/* Safety check... */
|
||||
if (!pgds || !pgds_hash)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* Set up key for hashtable search */
|
||||
key.userid = GetUserId();
|
||||
key.dbid = MyDatabaseId;
|
||||
key.queryid = queryId;
|
||||
key.executorType = executorType;
|
||||
|
||||
/* Lookup the hash table entry with shared lock. */
|
||||
LWLockAcquire(pgds->lock, LW_SHARED);
|
||||
|
||||
entry = (pgdsEntry *) hash_search(pgds_hash, &key, HASH_FIND, NULL);
|
||||
|
||||
/* Create new entry, if not present */
|
||||
if (!entry)
|
||||
{
|
||||
/* Need exclusive lock to make a new hashtable entry - promote */
|
||||
LWLockRelease(pgds->lock);
|
||||
LWLockAcquire(pgds->lock, LW_EXCLUSIVE);
|
||||
|
||||
/* OK to create a new hashtable entry */
|
||||
entry = StatStatementsEntryAlloc(&key, false);
|
||||
}
|
||||
|
||||
/*
|
||||
* Grab the spinlock while updating the counters (see comment about
|
||||
* locking rules at the head of the pg_stat_statements file)
|
||||
*/
|
||||
e = (volatile pgdsEntry *) entry;
|
||||
|
||||
SpinLockAcquire(&e->mutex);
|
||||
|
||||
/* "Unstick" entry if it was previously sticky */
|
||||
if (e->calls == 0)
|
||||
{
|
||||
e->usage = USAGE_INIT;
|
||||
}
|
||||
|
||||
e->calls += 1;
|
||||
|
||||
SpinLockRelease(&e->mutex);
|
||||
|
||||
LWLockRelease(pgds->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Allocate a new hashtable entry.
|
||||
* caller must hold an exclusive lock on pgds->lock
|
||||
*/
|
||||
static pgdsEntry *
|
||||
StatStatementsEntryAlloc(pgdsHashKey *key, bool sticky)
|
||||
{
|
||||
pgdsEntry *entry;
|
||||
bool found;
|
||||
|
||||
/* Make space if needed */
|
||||
while (hash_get_num_entries(pgds_hash) >= pgds_max)
|
||||
{
|
||||
StatStatementsEntryDealloc();
|
||||
}
|
||||
|
||||
/* Find or create an entry with desired hash code */
|
||||
entry = (pgdsEntry *) hash_search(pgds_hash, key, HASH_ENTER, &found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* New entry, initialize it */
|
||||
|
||||
/* set the appropriate initial usage count */
|
||||
entry->usage = sticky ? pgds->cur_median_usage : USAGE_INIT;
|
||||
|
||||
/* re-initialize the mutex each time ... we assume no one using it */
|
||||
SpinLockInit(&entry->mutex);
|
||||
}
|
||||
|
||||
entry->calls = 0;
|
||||
entry->usage = (0.0);
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* qsort comparator for sorting into increasing usage order
|
||||
*/
|
||||
static int
|
||||
entry_cmp(const void *lhs, const void *rhs)
|
||||
{
|
||||
double l_usage = (*(pgdsEntry *const *) lhs)->usage;
|
||||
double r_usage = (*(pgdsEntry *const *) rhs)->usage;
|
||||
|
||||
if (l_usage < r_usage)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
else if (l_usage > r_usage)
|
||||
{
|
||||
return +1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Deallocate least used entries.
|
||||
* Caller must hold an exclusive lock on pgds->lock.
|
||||
*/
|
||||
static void
|
||||
StatStatementsEntryDealloc(void)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgdsEntry **entries;
|
||||
pgdsEntry *entry;
|
||||
int nvictims;
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
|
||||
* While we're scanning the table, apply the decay factor to the usage
|
||||
* values.
|
||||
*/
|
||||
entries = palloc(hash_get_num_entries(pgds_hash) * sizeof(pgdsEntry *));
|
||||
|
||||
i = 0;
|
||||
hash_seq_init(&hash_seq, pgds_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
entries[i++] = entry;
|
||||
|
||||
/* "Sticky" entries get a different usage decay rate. */
|
||||
if (entry->calls == 0)
|
||||
{
|
||||
entry->usage *= STICKY_DECREASE_FACTOR;
|
||||
}
|
||||
else
|
||||
{
|
||||
entry->usage *= USAGE_DECREASE_FACTOR;
|
||||
}
|
||||
}
|
||||
|
||||
qsort(entries, i, sizeof(pgdsEntry *), entry_cmp);
|
||||
|
||||
if (i > 0)
|
||||
{
|
||||
/* Record the (approximate) median usage */
|
||||
pgds->cur_median_usage = entries[i / 2]->usage;
|
||||
}
|
||||
|
||||
nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
|
||||
nvictims = Min(nvictims, i);
|
||||
|
||||
for (i = 0; i < nvictims; i++)
|
||||
{
|
||||
hash_search(pgds_hash, &entries[i]->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
pfree(entries);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
StatStatementsEntryReset(void)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgdsEntry *entry;
|
||||
|
||||
LWLockAcquire(pgds->lock, LW_EXCLUSIVE);
|
||||
|
||||
hash_seq_init(&hash_seq, pgds_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
hash_search(pgds_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
}
|
||||
|
||||
LWLockRelease(pgds->lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Calculate hash value for a key
|
||||
*/
|
||||
static uint32
|
||||
StatStatementsHashFn(const void *key, Size keysize)
|
||||
{
|
||||
const pgdsHashKey *k = (const pgdsHashKey *) key;
|
||||
|
||||
return hash_uint32((uint32) k->userid) ^
|
||||
hash_uint32((uint32) k->dbid) ^
|
||||
hash_uint32((uint32) k->queryid) ^
|
||||
hash_uint32((uint32) k->executorType);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Compare two keys - zero means match
|
||||
*/
|
||||
static int
|
||||
StatStatementsMatchFn(const void *key1, const void *key2, Size keysize)
|
||||
{
|
||||
const pgdsHashKey *k1 = (const pgdsHashKey *) key1;
|
||||
const pgdsHashKey *k2 = (const pgdsHashKey *) key2;
|
||||
|
||||
if (k1->userid == k2->userid &&
|
||||
k1->dbid == k2->dbid &&
|
||||
k1->queryid == k2->queryid &&
|
||||
k1->executorType == k2->executorType)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Reset statistics.
|
||||
*/
|
||||
Datum
|
||||
citus_statement_executors_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
StatStatementsEntryReset();
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
citus_statement_executors(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
TupleDesc tupdesc;
|
||||
Tuplestorestate *tupstore;
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
pgdsEntry *entry;
|
||||
|
||||
if (!pgds)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("pg_dist_statements: shared memory not initialized")));
|
||||
}
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"set-valued function called in context that cannot accept a set")));
|
||||
}
|
||||
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("materialize mode required, but it is not " \
|
||||
"allowed in this context")));
|
||||
}
|
||||
|
||||
/* Switch into long-lived context to construct returned data structures */
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
||||
|
||||
/* Build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
{
|
||||
elog(ERROR, "return type must be a row type");
|
||||
}
|
||||
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->returnMode = SFRM_Materialize;
|
||||
rsinfo->setResult = tupstore;
|
||||
rsinfo->setDesc = tupdesc;
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
LWLockAcquire(pgds->lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&hash_seq, pgds_hash);
|
||||
while ((entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
Datum values[PG_DIST_STATEMENTS_COLS];
|
||||
bool nulls[PG_DIST_STATEMENTS_COLS];
|
||||
int i = 0;
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
SpinLockAcquire(&entry->mutex);
|
||||
|
||||
/* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
|
||||
if (entry->calls == 0)
|
||||
{
|
||||
SpinLockRelease(&entry->mutex);
|
||||
continue;
|
||||
}
|
||||
values[i++] = Int64GetDatum(entry->key.queryid);
|
||||
values[i++] = ObjectIdGetDatum(entry->key.userid);
|
||||
values[i++] = ObjectIdGetDatum(entry->key.dbid);
|
||||
values[i++] = ObjectIdGetDatum(entry->key.executorType);
|
||||
values[i++] = Int64GetDatumFast(entry->calls);
|
||||
SpinLockRelease(&entry->mutex);
|
||||
|
||||
Assert(i == PG_DIST_STATEMENTS_COLS);
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
}
|
||||
|
||||
LWLockRelease(pgds->lock);
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupstore);
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
|
@ -282,6 +282,7 @@ OutMultiPlan(OUTFUNC_ARGS)
|
|||
WRITE_NODE_FIELD(workerJob);
|
||||
WRITE_NODE_FIELD(masterQuery);
|
||||
WRITE_BOOL_FIELD(routerExecutable);
|
||||
WRITE_UINT_FIELD(queryId);
|
||||
WRITE_NODE_FIELD(planningError);
|
||||
}
|
||||
|
||||
|
|
|
@ -189,6 +189,7 @@ ReadMultiPlan(READFUNC_ARGS)
|
|||
READ_NODE_FIELD(workerJob);
|
||||
READ_NODE_FIELD(masterQuery);
|
||||
READ_BOOL_FIELD(routerExecutable);
|
||||
READ_UINT_FIELD(queryId);
|
||||
READ_NODE_FIELD(planningError);
|
||||
|
||||
READ_DONE();
|
||||
|
|
|
@ -220,6 +220,8 @@ typedef struct MultiPlan
|
|||
Query *masterQuery;
|
||||
bool routerExecutable;
|
||||
|
||||
uint32 queryId; /* query identifier (copied from the top-level PlannedStmt) */
|
||||
|
||||
/*
|
||||
* NULL if this a valid plan, an error description otherwise. This will
|
||||
* e.g. be set if SQL features are present that a planner doesn't support,
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* stats_statements.h
|
||||
* Statement-level statistics for distributed queries.
|
||||
*
|
||||
* Copyright (c) 2017, Citus Data, Inc.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef STATS_STATEMENT_EXECUTORS_H
|
||||
#define STATS_STATEMENT_EXECUTORS_H
|
||||
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
||||
void InitializeStatsStatementExecutors(void);
|
||||
void StoreStatsStatementExecutorsEntry(uint32 queryId, MultiExecutorType executorType);
|
||||
|
||||
#endif /* STATS_STATEMENT_EXECUTORS_H */
|
Loading…
Reference in New Issue