From 4d35b920167c2d7c67697be8e3ebf8f785d9011a Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Mon, 25 Jun 2018 15:08:05 +0300 Subject: [PATCH] Add groundwork for citus_stat_statements api --- Makefile | 1 + citus.control | 2 +- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.5-3--7.5-4.sql | 47 +++++++++++++++ src/backend/distributed/citus.control | 2 +- .../executor/multi_router_executor.c | 2 +- .../executor/multi_server_executor.c | 2 +- .../distributed/executor/query_stats.c | 60 +++++++++++++++++++ .../distributed/planner/distributed_planner.c | 1 + .../planner/multi_router_planner.c | 16 ++--- src/backend/distributed/shared_library_init.c | 2 + .../distributed/utils/citus_copyfuncs.c | 3 +- .../distributed/utils/citus_outfuncs.c | 3 +- .../distributed/utils/citus_readfuncs.c | 3 +- .../distributed/multi_physical_planner.h | 5 +- .../distributed/multi_router_planner.h | 2 +- src/include/distributed/query_stats.h | 19 ++++++ 17 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 src/backend/distributed/citus--7.5-3--7.5-4.sql create mode 100644 src/backend/distributed/executor/query_stats.c create mode 100644 src/include/distributed/query_stats.h diff --git a/Makefile b/Makefile index 7ac0416c1..8ff43160c 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/executor/multi_server_executor.o \ src/backend/distributed/executor/multi_task_tracker_executor.o \ src/backend/distributed/executor/multi_utility.o \ + src/backend/distributed/executor/query_stats.o \ src/backend/distributed/executor/subplan_execution.o \ src/backend/distributed/master/citus_create_restore_point.o \ src/backend/distributed/master/master_citus_tools.o \ diff --git a/citus.control b/citus.control index 6996ebbaf..23d7b6735 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.5-3' +default_version = '7.5-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index b8715804a..9af4c21d4 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -16,7 +16,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.2-1 7.2-2 7.2-3 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ - 7.5-1 7.5-2 7.5-3 + 7.5-1 7.5-2 7.5-3 7.5-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -206,6 +206,8 @@ $(EXTENSION)--7.5-2.sql: $(EXTENSION)--7.5-1.sql $(EXTENSION)--7.5-1--7.5-2.sql cat $^ > $@ $(EXTENSION)--7.5-3.sql: $(EXTENSION)--7.5-2.sql $(EXTENSION)--7.5-2--7.5-3.sql cat $^ > $@ +$(EXTENSION)--7.5-4.sql: $(EXTENSION)--7.5-3.sql $(EXTENSION)--7.5-3--7.5-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.5-3--7.5-4.sql b/src/backend/distributed/citus--7.5-3--7.5-4.sql new file mode 100644 index 000000000..42b535697 --- /dev/null +++ b/src/backend/distributed/citus--7.5-3--7.5-4.sql @@ -0,0 +1,47 @@ +/* citus--7.5-3--7.5-4 */ + +CREATE FUNCTION pg_catalog.citus_query_stats(OUT queryid bigint, + OUT userid oid, + OUT dbid oid, + OUT executor bigint, + OUT partition_key text, + OUT calls bigint) +RETURNS SETOF record +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_query_stats$$; + +CREATE FUNCTION pg_catalog.citus_stat_statements_reset() +RETURNS VOID +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_stat_statements_reset$$; + +CREATE FUNCTION pg_catalog.citus_stat_statements(OUT queryid bigint, + OUT userid oid, + OUT dbid oid, + OUT query text, + OUT executor bigint, + OUT partition_key text, + OUT calls bigint) +RETURNS SETOF record +LANGUAGE plpgsql +AS $citus_stat_statements$ +BEGIN + IF EXISTS ( + SELECT extname FROM pg_extension + WHERE extname = 'pg_stat_statements') + THEN + RETURN QUERY SELECT pss.queryid, pss.userid, pss.dbid, pss.query, cqs.executor, + cqs.partition_key, cqs.calls + FROM pg_stat_statements(true) pss + JOIN citus_query_stats() cqs + USING (queryid); + ELSE + RAISE EXCEPTION 'pg_stat_statements is not installed' + USING HINT = 'install pg_stat_statements extension and try again'; + END IF; +END; +$citus_stat_statements$; + +CREATE VIEW citus.citus_stat_statements as SELECT * FROM pg_catalog.citus_stat_statements(); +ALTER VIEW citus.citus_stat_statements SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stat_statements TO public; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 6996ebbaf..23d7b6735 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.5-3' +default_version = '7.5-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 829661e2e..820a11d09 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -513,7 +513,7 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) } workerJob->taskList = taskList; - workerJob->partitionValueConst = ExtractInsertPartitionValueConst(jobQuery); + workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); } RebuildQueryStrings(jobQuery, taskList); diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index b91e81b68..61fbaa527 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -56,7 +56,7 @@ JobExecutorType(DistributedPlan *distributedPlan) { if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2) { - Const *partitionValueConst = job->partitionValueConst; + Const *partitionValueConst = job->partitionKeyValue; if (partitionValueConst != NULL && !partitionValueConst->constisnull) { diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c new file mode 100644 index 000000000..6b3cfd041 --- /dev/null +++ b/src/backend/distributed/executor/query_stats.c @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * + * query_stats.c + * Statement-level statistics for distributed queries. + * + * Copyright (c) 2012-2018, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "fmgr.h" + +#include "distributed/query_stats.h" + +PG_FUNCTION_INFO_V1(citus_stat_statements_reset); +PG_FUNCTION_INFO_V1(citus_query_stats); + + +/* placeholder for InitializeCitusQueryStats */ +void +InitializeCitusQueryStats(void) +{ + /* placeholder for future implementation */ +} + + +/* placeholder for CitusQueryStatsExecutorsEntry */ +void +CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType, + char *partitionKey) +{ + /* placeholder for future implementation */ +} + + +/* + * placeholder function for citus_stat_statements_reset + */ +Datum +citus_stat_statements_reset(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("citus_stat_statements_reset() is only supported on " + "Citus Enterprise"))); + PG_RETURN_VOID(); +} + + +/* + * placeholder function for citus_query_stats + */ +Datum +citus_query_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("citus_query_stats() is only supported on " + "Citus Enterprise"))); + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ce006ebcd..195adc91a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -940,6 +940,7 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) } distributedPlan->relationIdList = localPlan->relationOids; + distributedPlan->queryId = localPlan->queryId; distributedPlanData = (Node *) distributedPlan; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 66fa6606a..da7e5460a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1279,7 +1279,7 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann Job *job = NULL; bool requiresMasterEvaluation = false; bool deferredPruning = false; - Const *partitionValueConst = NULL; + Const *partitionKeyValue = NULL; bool isMultiRowInsert = IsMultiRowInsert(query); if (isMultiRowInsert) @@ -1324,14 +1324,14 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann RebuildQueryStrings(originalQuery, taskList); /* remember the partition column value */ - partitionValueConst = ExtractInsertPartitionValueConst(originalQuery); + partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery); } job = CreateJob(originalQuery); job->taskList = taskList; job->requiresMasterEvaluation = requiresMasterEvaluation; job->deferredPruning = deferredPruning; - job->partitionValueConst = partitionValueConst; + job->partitionKeyValue = partitionKeyValue; return job; } @@ -1545,7 +1545,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon bool requiresMasterEvaluation = false; RangeTblEntry *updateOrDeleteRTE = NULL; bool isMultiShardModifyQuery = false; - Const *partitionValueConst = NULL; + Const *partitionKeyValue = NULL; /* router planner should create task even if it deosn't hit a shard at all */ replacePrunedQueryWithDummy = true; @@ -1557,14 +1557,14 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon &placementList, &shardId, &relationShardList, replacePrunedQueryWithDummy, &isMultiShardModifyQuery, - &partitionValueConst); + &partitionKeyValue); if (*planningError) { return NULL; } job = CreateJob(originalQuery); - job->partitionValueConst = partitionValueConst; + job->partitionKeyValue = partitionKeyValue; updateOrDeleteRTE = GetUpdateOrDeleteRTE(originalQuery); @@ -2648,13 +2648,13 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* - * ExtractInsertPartitionValueConst extracts the partition column value + * ExtractInsertPartitionKeyValue extracts the partition column value * from an INSERT query. If the expression in the partition column is * non-constant or it is a multi-row INSERT with multiple different partition * column values, the function returns NULL. */ Const * -ExtractInsertPartitionValueConst(Query *query) +ExtractInsertPartitionKeyValue(Query *query) { Oid distributedTableId = ExtractFirstDistributedTableId(query); uint32 rangeTableId = 1; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 4d89e3f0d..c0f1474c3 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -39,6 +39,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/query_pushdown_planning.h" +#include "distributed/query_stats.h" #include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/statistics_collection.h" @@ -241,6 +242,7 @@ _PG_init(void) InitializeBackendManagement(); InitializeConnectionManagement(); InitPlacementConnectionManagement(); + InitializeCitusQueryStats(); /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 03e100473..0b5e5eaaf 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -84,7 +84,7 @@ copyJobInfo(Job *newnode, Job *from) COPY_SCALAR_FIELD(subqueryPushdown); COPY_SCALAR_FIELD(requiresMasterEvaluation); COPY_SCALAR_FIELD(deferredPruning); - COPY_NODE_FIELD(partitionValueConst); + COPY_NODE_FIELD(partitionKeyValue); } @@ -109,6 +109,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(masterQuery); COPY_SCALAR_FIELD(routerExecutable); + COPY_SCALAR_FIELD(queryId); COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(insertSelectSubquery); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index c16d43125..9f5ff64b8 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -182,6 +182,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); WRITE_BOOL_FIELD(routerExecutable); + WRITE_UINT64_FIELD(queryId); WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(insertSelectSubquery); @@ -319,7 +320,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_BOOL_FIELD(subqueryPushdown); WRITE_BOOL_FIELD(requiresMasterEvaluation); WRITE_BOOL_FIELD(deferredPruning); - WRITE_NODE_FIELD(partitionValueConst); + WRITE_NODE_FIELD(partitionKeyValue); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index bed80f3cb..e3e970d0f 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -181,7 +181,7 @@ readJobInfo(Job *local_node) READ_BOOL_FIELD(subqueryPushdown); READ_BOOL_FIELD(requiresMasterEvaluation); READ_BOOL_FIELD(deferredPruning); - READ_NODE_FIELD(partitionValueConst); + READ_NODE_FIELD(partitionKeyValue); } @@ -208,6 +208,7 @@ ReadDistributedPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); READ_BOOL_FIELD(routerExecutable); + READ_UINT64_FIELD(queryId); READ_NODE_FIELD(relationIdList); READ_NODE_FIELD(insertSelectSubquery); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index b01ff563c..e8e091209 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -127,7 +127,7 @@ typedef struct Job bool subqueryPushdown; bool requiresMasterEvaluation; /* only applies to modify jobs */ bool deferredPruning; - Const *partitionValueConst; + Const *partitionKeyValue; } Job; @@ -244,6 +244,9 @@ typedef struct DistributedPlan /* a router executable query is executed entirely on a worker */ bool routerExecutable; + /* query identifier (copied from the top-level PlannedStmt) */ + uint64 queryId; + /* which relations are accessed by this distributed plan */ List *relationIdList; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 1e6a8819b..9206d1337 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -41,7 +41,7 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, bool *multiShardModifyQuery, Const **partitionValueConst); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); -extern Const * ExtractInsertPartitionValueConst(Query *query); +extern Const * ExtractInsertPartitionKeyValue(Query *query); extern List * TargetShardIntervalsForQuery(Query *query, RelationRestrictionContext *restrictionContext, bool *multiShardQuery, diff --git a/src/include/distributed/query_stats.h b/src/include/distributed/query_stats.h new file mode 100644 index 000000000..95dfb1cbc --- /dev/null +++ b/src/include/distributed/query_stats.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * stats_statements.h + * Statement-level statistics for distributed queries. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef QUERY_STATS_H +#define QUERY_STATS_H + +#include "distributed/multi_server_executor.h" + +extern void InitializeCitusQueryStats(void); +extern void CitusQueryStatsExecutorsEntry(uint64 queryId, MultiExecutorType executorType, + char *partitionKey); + +#endif /* QUERY_STATS_H */