Add groundwork for citus_stat_statements api

pull/2235/head
Murat Tuncer 2018-06-25 15:08:05 +03:00
parent 5ce18327a7
commit 4d35b92016
17 changed files with 156 additions and 18 deletions

View File

@ -24,6 +24,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/executor/multi_server_executor.o \ src/backend/distributed/executor/multi_server_executor.o \
src/backend/distributed/executor/multi_task_tracker_executor.o \ src/backend/distributed/executor/multi_task_tracker_executor.o \
src/backend/distributed/executor/multi_utility.o \ src/backend/distributed/executor/multi_utility.o \
src/backend/distributed/executor/query_stats.o \
src/backend/distributed/executor/subplan_execution.o \ src/backend/distributed/executor/subplan_execution.o \
src/backend/distributed/master/citus_create_restore_point.o \ src/backend/distributed/master/citus_create_restore_point.o \
src/backend/distributed/master/master_citus_tools.o \ src/backend/distributed/master/master_citus_tools.o \

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '7.5-3' default_version = '7.5-4'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -16,7 +16,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.2-1 7.2-2 7.2-3 \ 7.2-1 7.2-2 7.2-3 \
7.3-1 7.3-2 7.3-3 \ 7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2 7.4-3 \ 7.4-1 7.4-2 7.4-3 \
7.5-1 7.5-2 7.5-3 7.5-1 7.5-2 7.5-3 7.5-4
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -206,6 +206,8 @@ $(EXTENSION)--7.5-2.sql: $(EXTENSION)--7.5-1.sql $(EXTENSION)--7.5-1--7.5-2.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--7.5-3.sql: $(EXTENSION)--7.5-2.sql $(EXTENSION)--7.5-2--7.5-3.sql $(EXTENSION)--7.5-3.sql: $(EXTENSION)--7.5-2.sql $(EXTENSION)--7.5-2--7.5-3.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--7.5-4.sql: $(EXTENSION)--7.5-3.sql $(EXTENSION)--7.5-3--7.5-4.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,47 @@
/* citus--7.5-3--7.5-4 */
CREATE FUNCTION pg_catalog.citus_query_stats(OUT queryid bigint,
OUT userid oid,
OUT dbid oid,
OUT executor bigint,
OUT partition_key text,
OUT calls bigint)
RETURNS SETOF record
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_query_stats$$;
CREATE FUNCTION pg_catalog.citus_stat_statements_reset()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_stat_statements_reset$$;
CREATE FUNCTION pg_catalog.citus_stat_statements(OUT queryid bigint,
OUT userid oid,
OUT dbid oid,
OUT query text,
OUT executor bigint,
OUT partition_key text,
OUT calls bigint)
RETURNS SETOF record
LANGUAGE plpgsql
AS $citus_stat_statements$
BEGIN
IF EXISTS (
SELECT extname FROM pg_extension
WHERE extname = 'pg_stat_statements')
THEN
RETURN QUERY SELECT pss.queryid, pss.userid, pss.dbid, pss.query, cqs.executor,
cqs.partition_key, cqs.calls
FROM pg_stat_statements(true) pss
JOIN citus_query_stats() cqs
USING (queryid);
ELSE
RAISE EXCEPTION 'pg_stat_statements is not installed'
USING HINT = 'install pg_stat_statements extension and try again';
END IF;
END;
$citus_stat_statements$;
CREATE VIEW citus.citus_stat_statements as SELECT * FROM pg_catalog.citus_stat_statements();
ALTER VIEW citus.citus_stat_statements SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_statements TO public;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '7.5-3' default_version = '7.5-4'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -513,7 +513,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
} }
workerJob->taskList = taskList; workerJob->taskList = taskList;
workerJob->partitionValueConst = ExtractInsertPartitionValueConst(jobQuery); workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
} }
RebuildQueryStrings(jobQuery, taskList); RebuildQueryStrings(jobQuery, taskList);

View File

@ -56,7 +56,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
{ {
if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2) if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
{ {
Const *partitionValueConst = job->partitionValueConst; Const *partitionValueConst = job->partitionKeyValue;
if (partitionValueConst != NULL && !partitionValueConst->constisnull) if (partitionValueConst != NULL && !partitionValueConst->constisnull)
{ {

View File

@ -0,0 +1,60 @@
/*-------------------------------------------------------------------------
*
* query_stats.c
* Statement-level statistics for distributed queries.
*
* Copyright (c) 2012-2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "distributed/query_stats.h"
PG_FUNCTION_INFO_V1(citus_stat_statements_reset);
PG_FUNCTION_INFO_V1(citus_query_stats);
/* placeholder for InitializeCitusQueryStats */
void
InitializeCitusQueryStats(void)
{
/* placeholder for future implementation */
}
/* placeholder for CitusQueryStatsExecutorsEntry */
void
CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType,
char *partitionKey)
{
/* placeholder for future implementation */
}
/*
* placeholder function for citus_stat_statements_reset
*/
Datum
citus_stat_statements_reset(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus_stat_statements_reset() is only supported on "
"Citus Enterprise")));
PG_RETURN_VOID();
}
/*
* placeholder function for citus_query_stats
*/
Datum
citus_query_stats(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("citus_query_stats() is only supported on "
"Citus Enterprise")));
PG_RETURN_VOID();
}

View File

@ -940,6 +940,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
} }
distributedPlan->relationIdList = localPlan->relationOids; distributedPlan->relationIdList = localPlan->relationOids;
distributedPlan->queryId = localPlan->queryId;
distributedPlanData = (Node *) distributedPlan; distributedPlanData = (Node *) distributedPlan;

View File

@ -1279,7 +1279,7 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
Job *job = NULL; Job *job = NULL;
bool requiresMasterEvaluation = false; bool requiresMasterEvaluation = false;
bool deferredPruning = false; bool deferredPruning = false;
Const *partitionValueConst = NULL; Const *partitionKeyValue = NULL;
bool isMultiRowInsert = IsMultiRowInsert(query); bool isMultiRowInsert = IsMultiRowInsert(query);
if (isMultiRowInsert) if (isMultiRowInsert)
@ -1324,14 +1324,14 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
RebuildQueryStrings(originalQuery, taskList); RebuildQueryStrings(originalQuery, taskList);
/* remember the partition column value */ /* remember the partition column value */
partitionValueConst = ExtractInsertPartitionValueConst(originalQuery); partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
} }
job = CreateJob(originalQuery); job = CreateJob(originalQuery);
job->taskList = taskList; job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation; job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning; job->deferredPruning = deferredPruning;
job->partitionValueConst = partitionValueConst; job->partitionKeyValue = partitionKeyValue;
return job; return job;
} }
@ -1545,7 +1545,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
bool requiresMasterEvaluation = false; bool requiresMasterEvaluation = false;
RangeTblEntry *updateOrDeleteRTE = NULL; RangeTblEntry *updateOrDeleteRTE = NULL;
bool isMultiShardModifyQuery = false; bool isMultiShardModifyQuery = false;
Const *partitionValueConst = NULL; Const *partitionKeyValue = NULL;
/* router planner should create task even if it deosn't hit a shard at all */ /* router planner should create task even if it deosn't hit a shard at all */
replacePrunedQueryWithDummy = true; replacePrunedQueryWithDummy = true;
@ -1557,14 +1557,14 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
&placementList, &shardId, &relationShardList, &placementList, &shardId, &relationShardList,
replacePrunedQueryWithDummy, replacePrunedQueryWithDummy,
&isMultiShardModifyQuery, &isMultiShardModifyQuery,
&partitionValueConst); &partitionKeyValue);
if (*planningError) if (*planningError)
{ {
return NULL; return NULL;
} }
job = CreateJob(originalQuery); job = CreateJob(originalQuery);
job->partitionValueConst = partitionValueConst; job->partitionKeyValue = partitionKeyValue;
updateOrDeleteRTE = GetUpdateOrDeleteRTE(originalQuery); updateOrDeleteRTE = GetUpdateOrDeleteRTE(originalQuery);
@ -2648,13 +2648,13 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
/* /*
* ExtractInsertPartitionValueConst extracts the partition column value * ExtractInsertPartitionKeyValue extracts the partition column value
* from an INSERT query. If the expression in the partition column is * from an INSERT query. If the expression in the partition column is
* non-constant or it is a multi-row INSERT with multiple different partition * non-constant or it is a multi-row INSERT with multiple different partition
* column values, the function returns NULL. * column values, the function returns NULL.
*/ */
Const * Const *
ExtractInsertPartitionValueConst(Query *query) ExtractInsertPartitionKeyValue(Query *query)
{ {
Oid distributedTableId = ExtractFirstDistributedTableId(query); Oid distributedTableId = ExtractFirstDistributedTableId(query);
uint32 rangeTableId = 1; uint32 rangeTableId = 1;

View File

@ -39,6 +39,7 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/query_stats.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h" #include "distributed/statistics_collection.h"
@ -241,6 +242,7 @@ _PG_init(void)
InitializeBackendManagement(); InitializeBackendManagement();
InitializeConnectionManagement(); InitializeConnectionManagement();
InitPlacementConnectionManagement(); InitPlacementConnectionManagement();
InitializeCitusQueryStats();
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)

View File

@ -84,7 +84,7 @@ copyJobInfo(Job *newnode, Job *from)
COPY_SCALAR_FIELD(subqueryPushdown); COPY_SCALAR_FIELD(subqueryPushdown);
COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(requiresMasterEvaluation);
COPY_SCALAR_FIELD(deferredPruning); COPY_SCALAR_FIELD(deferredPruning);
COPY_NODE_FIELD(partitionValueConst); COPY_NODE_FIELD(partitionKeyValue);
} }
@ -109,6 +109,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(workerJob);
COPY_NODE_FIELD(masterQuery); COPY_NODE_FIELD(masterQuery);
COPY_SCALAR_FIELD(routerExecutable); COPY_SCALAR_FIELD(routerExecutable);
COPY_SCALAR_FIELD(queryId);
COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(relationIdList);
COPY_NODE_FIELD(insertSelectSubquery); COPY_NODE_FIELD(insertSelectSubquery);

View File

@ -182,6 +182,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_BOOL_FIELD(routerExecutable); WRITE_BOOL_FIELD(routerExecutable);
WRITE_UINT64_FIELD(queryId);
WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(relationIdList);
WRITE_NODE_FIELD(insertSelectSubquery); WRITE_NODE_FIELD(insertSelectSubquery);
@ -319,7 +320,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_BOOL_FIELD(subqueryPushdown); WRITE_BOOL_FIELD(subqueryPushdown);
WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(requiresMasterEvaluation);
WRITE_BOOL_FIELD(deferredPruning); WRITE_BOOL_FIELD(deferredPruning);
WRITE_NODE_FIELD(partitionValueConst); WRITE_NODE_FIELD(partitionKeyValue);
} }

View File

@ -181,7 +181,7 @@ readJobInfo(Job *local_node)
READ_BOOL_FIELD(subqueryPushdown); READ_BOOL_FIELD(subqueryPushdown);
READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(requiresMasterEvaluation);
READ_BOOL_FIELD(deferredPruning); READ_BOOL_FIELD(deferredPruning);
READ_NODE_FIELD(partitionValueConst); READ_NODE_FIELD(partitionKeyValue);
} }
@ -208,6 +208,7 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_NODE_FIELD(workerJob); READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_BOOL_FIELD(routerExecutable); READ_BOOL_FIELD(routerExecutable);
READ_UINT64_FIELD(queryId);
READ_NODE_FIELD(relationIdList); READ_NODE_FIELD(relationIdList);
READ_NODE_FIELD(insertSelectSubquery); READ_NODE_FIELD(insertSelectSubquery);

View File

@ -127,7 +127,7 @@ typedef struct Job
bool subqueryPushdown; bool subqueryPushdown;
bool requiresMasterEvaluation; /* only applies to modify jobs */ bool requiresMasterEvaluation; /* only applies to modify jobs */
bool deferredPruning; bool deferredPruning;
Const *partitionValueConst; Const *partitionKeyValue;
} Job; } Job;
@ -244,6 +244,9 @@ typedef struct DistributedPlan
/* a router executable query is executed entirely on a worker */ /* a router executable query is executed entirely on a worker */
bool routerExecutable; bool routerExecutable;
/* query identifier (copied from the top-level PlannedStmt) */
uint64 queryId;
/* which relations are accessed by this distributed plan */ /* which relations are accessed by this distributed plan */
List *relationIdList; List *relationIdList;

View File

@ -41,7 +41,7 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
bool *multiShardModifyQuery, bool *multiShardModifyQuery,
Const **partitionValueConst); Const **partitionValueConst);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern Const * ExtractInsertPartitionValueConst(Query *query); extern Const * ExtractInsertPartitionKeyValue(Query *query);
extern List * TargetShardIntervalsForQuery(Query *query, extern List * TargetShardIntervalsForQuery(Query *query,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
bool *multiShardQuery, bool *multiShardQuery,

View File

@ -0,0 +1,19 @@
/*-------------------------------------------------------------------------
*
* stats_statements.h
* Statement-level statistics for distributed queries.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef QUERY_STATS_H
#define QUERY_STATS_H
#include "distributed/multi_server_executor.h"
extern void InitializeCitusQueryStats(void);
extern void CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType,
char *partitionKey);
#endif /* QUERY_STATS_H */