From d657759c97ca0ea045acbb75e583094578f6f8ca Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Sat, 25 Aug 2018 13:03:36 +0300 Subject: [PATCH] Views to Provide some insight about the distributed transactions on Citus MX With this commit, we implement two views that are very similar to pg_stat_activity, but showing queries that are involved in distributed queries: - citus_dist_stat_activity: Shows all the distributed queries - citus_worker_stat_activity: Shows all the queries on the shards that are initiated by distributed queries. Both views have the same columns in the outputs. In very basic terms, both of the views are meant to provide some useful insights about the distributed transactions within the cluster. As the names reveal, both views are similar to pg_stat_activity. Also note that these views can be pretty useful on Citus MX clusters. Note that when the views are queried from the worker nodes, they'd not show the distributed transactions that are initiated from the coordinator node. The reason is that the worker nodes do not know the host/port of the coordinator. Thus, it is advisable to query the views from the coordinator. If we bucket the columns that the views returns, we'd end up with the following: - Hostnames and ports: - query_hostname, query_hostport: The node that the query is running - master_query_host_name, master_query_host_port: The node in the cluster initiated the query. Note that for citus_dist_stat_activity view, the query_hostname-query_hostport is always the same with master_query_host_name-master_query_host_port. The distinction is mostly relevant for citus_worker_stat_activity. For example, on Citus MX, a users starts a transaction on Node-A, which starts worker transactions on Node-B and Node-C. In that case, the query hostnames would be Node-B and Node-C whereas the master_query_host_name would Node-A. - Distributed transaction related things: This is mostly the process_id, distributed transactionId and distributed transaction number. - pg_stat_activity columns: These two views get all the columns from pg_stat_activity. We're basically joining pg_stat_activity with get_all_active_transactions on process_id. --- Makefile | 1 + citus.control | 2 +- src/backend/distributed/Makefile | 4 +- .../distributed/citus--8.0-4--8.0-5.sql | 68 + src/backend/distributed/citus.control | 2 +- .../distributed/transaction/backend_data.c | 17 +- .../transaction/citus_dist_stat_activity.c | 1223 +++++++++++++++++ .../distributed/transaction/lock_graph.c | 11 +- src/include/distributed/lock_graph.h | 6 + .../isolation_citus_dist_activity.out | 181 +++ .../isolation_citus_dist_activity_0.out | 181 +++ src/test/regress/expected/multi_extension.out | 1 + src/test/regress/isolation_schedule | 2 +- .../specs/isolation_citus_dist_activity.spec | 88 ++ src/test/regress/sql/multi_extension.sql | 1 + 15 files changed, 1773 insertions(+), 15 deletions(-) create mode 100644 src/backend/distributed/citus--8.0-4--8.0-5.sql create mode 100644 src/backend/distributed/transaction/citus_dist_stat_activity.c create mode 100644 src/test/regress/expected/isolation_citus_dist_activity.out create mode 100644 src/test/regress/expected/isolation_citus_dist_activity_0.out create mode 100644 src/test/regress/specs/isolation_citus_dist_activity.spec diff --git a/Makefile b/Makefile index 55685f1c3..c47c5c69f 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/test/relation_access_tracking.o \ src/backend/distributed/test/sequential_execution.o \ src/backend/distributed/transaction/backend_data.o \ + src/backend/distributed/transaction/citus_dist_stat_activity.o \ src/backend/distributed/transaction/distributed_deadlock_detection.o \ src/backend/distributed/transaction/lock_graph.o \ src/backend/distributed/transaction/multi_shard_transaction.o \ diff --git a/citus.control b/citus.control index 28db138ec..9e53b5012 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-4' +default_version = '8.0-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 6e6787b06..514a78a49 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 8.0-4 + 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -223,6 +223,8 @@ $(EXTENSION)--8.0-3.sql: $(EXTENSION)--8.0-2.sql $(EXTENSION)--8.0-2--8.0-3.sql cat $^ > $@ $(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql cat $^ > $@ +$(EXTENSION)--8.0-5.sql: $(EXTENSION)--8.0-4.sql $(EXTENSION)--8.0-4--8.0-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-4--8.0-5.sql b/src/backend/distributed/citus--8.0-4--8.0-5.sql new file mode 100644 index 000000000..e3852c947 --- /dev/null +++ b/src/backend/distributed/citus--8.0-4--8.0-5.sql @@ -0,0 +1,68 @@ +/* citus--8.0-4--8.0-5.sql */ +SET search_path = 'pg_catalog'; + + +DROP FUNCTION IF EXISTS get_all_active_transactions(); + + +CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; + +COMMENT ON FUNCTION get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz) +IS 'returns distributed transaction ids of active distributed transactions'; + + +CREATE OR REPLACE FUNCTION citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT master_query_host_name text, OUT master_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_dist_stat_activity$$; + +COMMENT ON FUNCTION citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT master_query_host_name text, OUT master_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on distributed tables'; + +CREATE VIEW citus.citus_dist_stat_activity AS +SELECT * FROM pg_catalog.citus_dist_stat_activity(); +ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; + + + +CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT master_query_host_name text, OUT master_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT master_query_host_name text, OUT master_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on shards of distributed tables'; + +CREATE VIEW citus.citus_worker_stat_activity AS +SELECT * FROM pg_catalog.citus_worker_stat_activity(); +ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; + +RESET search_path; + diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 28db138ec..9e53b5012 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-4' +default_version = '8.0-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 9be74f109..0ab031901 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -195,8 +195,8 @@ get_all_active_transactions(PG_FUNCTION_ARGS) int backendIndex = 0; - Datum values[5]; - bool isNulls[5]; + Datum values[6]; + bool isNulls[6]; CheckCitusVersion(ERROR); @@ -249,6 +249,7 @@ get_all_active_transactions(PG_FUNCTION_ARGS) { BackendData *currentBackend = &backendManagementShmemData->backends[backendIndex]; + bool coordinatorOriginatedQuery = false; SpinLockAcquire(¤tBackend->mutex); @@ -262,8 +263,16 @@ get_all_active_transactions(PG_FUNCTION_ARGS) values[0] = ObjectIdGetDatum(currentBackend->databaseId); values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid); values[2] = Int32GetDatum(currentBackend->transactionId.initiatorNodeIdentifier); - values[3] = UInt64GetDatum(currentBackend->transactionId.transactionNumber); - values[4] = TimestampTzGetDatum(currentBackend->transactionId.timestamp); + + /* + * We prefer to use worker_query instead of transactionOriginator in the user facing + * functions since its more intuitive. Thus, we negate the result before returning. + */ + coordinatorOriginatedQuery = currentBackend->transactionId.transactionOriginator; + values[3] = !coordinatorOriginatedQuery; + + values[4] = UInt64GetDatum(currentBackend->transactionId.transactionNumber); + values[5] = TimestampTzGetDatum(currentBackend->transactionId.timestamp); SpinLockRelease(¤tBackend->mutex); diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c new file mode 100644 index 000000000..9952facda --- /dev/null +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -0,0 +1,1223 @@ +/*------------------------------------------------------------------------- + * + * citus_dist_stat_activity.c + * + * This file contains functions for monitoring the distributed transactions + * accross the cluster. + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "fmgr.h" +#include "postmaster/postmaster.h" + +#include "funcapi.h" +#include "access/htup_details.h" +#include "catalog/pg_type.h" +#include "datatype/timestamp.h" +#include "distributed/backend_data.h" +#include "distributed/connection_management.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/transaction_identifier.h" +#include "executor/spi.h" +#include "nodes/execnodes.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/spin.h" +#include "storage/s_lock.h" +#include "utils/builtins.h" +#if PG_VERSION_NUM >= 100000 +#include "utils/fmgrprotos.h" +#endif +#include "utils/inet.h" +#include "utils/timestamp.h" + + +/* + * citus_dist_stat_activity() and citus_worker_stat_activity() is similar to + * pg_stat_activity. Those functions basically return join of + * pg_stat_activity and get_all_active_transactions() on each node + * in the cluster. The only difference is that citus_dist_stat_activity() + * gets transactions where worker_query = false and citus_worker_stat_activity() + * gets transactions where worker_query = true. + * + * In other words, citus_dist_stat_activity returns only the queries that are the + * distributed queries. citus_worker_stat_activity returns only the queries that + * are worker queries (e.g., queries on the shards) initiated by those distributed + * queries. To understand this better, let us give an example. If a users starts + * a query like "UPDATE table SET value = 1", this query would show up on + * citus_dist_stat_activity. The same query would generate #shard worker queries, + * all of which would show up on citus_worker_stat_activity. + * + * An important note on this views is that they only show the activity + * that are inside distributed transactions. Distributed transactions + * cover the following: + * - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT) + * - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs) + * - All recursively planned subqueries + * - All queries within transaction blocks (BEGIN; query; COMMMIT;) + * + * In other words, the following types of queries won't be observed in these + * views: + * - Router queries that are not inside transaction blocks + * - Real-time queries that are not inside transaction blocks + * - Task-tracker queries + * + * + * The following information for all the distributed transactions: + * query_host_name text + * query_host_port int + * database_id oid + * databaese_name name + * process_id integer + * initiator_node_host text + * initiator_node_port int + * distributed_transaction_number bigint + * distributed_transaction_stamp timestamp with time zone + * usesysid oid + * usename name + * application_name text + * client_addr inet + * client_hostname text + * client_port integer + * backend_start timestamp with time zone + * xact_start timestamp with time zone + * query_start timestamp with time zone + * state_change timestamp with time zone + * wait_event_type text + * wait_event text + * state text + * backend_xid xid + * backend_xmin xid + * query text + * backend_type text + */ + +/* + * We get CITUS_DIST_STAT_ACTIVITY_QUERY_COLS from workers and manually add + * CITUS_DIST_STAT_ADDITIONAL_COLS for hostname and hostport. Also, instead of + * showing the initiator_node_id we expand it to initiator_node_host and + * initiator_node_port. + */ +#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 23 +#define CITUS_DIST_STAT_ADDITIONAL_COLS 3 +#define CITUS_DIST_STAT_ACTIVITY_COLS \ + CITUS_DIST_STAT_ACTIVITY_QUERY_COLS + CITUS_DIST_STAT_ADDITIONAL_COLS + + +#define coordinator_host_name "coordinator_host" + +/* + * We get the query_host_name and query_host_port while opening the connection to + * the node. We also replace initiator_node_identifier with initiator_node_host + * and initiator_node_port. Thus, they are not in the query below. + * + * Also, backend_type introduced with pg 10+ so we have null in the previous verions. + */ + +#if PG_VERSION_NUM >= 100000 + + #define CITUS_DIST_STAT_ACTIVITY_QUERY \ + "\ + SELECT \ + dist_txs.initiator_node_identifier, \ + dist_txs.transaction_number, \ + dist_txs.transaction_stamp, \ + pg_stat_activity.datid, \ + pg_stat_activity.datname, \ + pg_stat_activity.pid, \ + pg_stat_activity.usesysid, \ + pg_stat_activity.usename, \ + pg_stat_activity.application_name, \ + pg_stat_activity.client_addr, \ + pg_stat_activity.client_hostname, \ + pg_stat_activity.client_port, \ + pg_stat_activity.backend_start, \ + pg_stat_activity.xact_start, \ + pg_stat_activity.query_start, \ + pg_stat_activity.state_change, \ + pg_stat_activity.wait_event_type, \ + pg_stat_activity.wait_event, \ + pg_stat_activity.state, \ + pg_stat_activity.backend_xid, \ + pg_stat_activity.backend_xmin, \ + pg_stat_activity.query, \ + pg_stat_activity.backend_type \ + FROM \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp), \ + pg_stat_activity \ + WHERE \ + pg_stat_activity.pid = dist_txs.process_id \ + AND \ + dist_txs.worker_query = %s; " +#else + #define CITUS_DIST_STAT_ACTIVITY_QUERY \ + "\ + SELECT \ + dist_txs.initiator_node_identifier, \ + dist_txs.transaction_number, \ + dist_txs.transaction_stamp, \ + pg_stat_activity.datid, \ + pg_stat_activity.datname, \ + pg_stat_activity.pid, \ + pg_stat_activity.usesysid, \ + pg_stat_activity.usename, \ + pg_stat_activity.application_name, \ + pg_stat_activity.client_addr, \ + pg_stat_activity.client_hostname, \ + pg_stat_activity.client_port, \ + pg_stat_activity.backend_start, \ + pg_stat_activity.xact_start, \ + pg_stat_activity.query_start, \ + pg_stat_activity.state_change, \ + pg_stat_activity.wait_event_type, \ + pg_stat_activity.wait_event, \ + pg_stat_activity.state, \ + pg_stat_activity.backend_xid, \ + pg_stat_activity.backend_xmin, \ + pg_stat_activity.query, \ + null \ + FROM \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp), \ + pg_stat_activity \ + WHERE \ + pg_stat_activity.pid = dist_txs.process_id \ + AND \ + dist_txs.worker_query = %s; " + +#endif + +typedef struct CitusDistStat +{ + text *query_host_name; + int query_host_port; + + text *master_query_host_name; + int master_query_host_port; + uint64 distributed_transaction_number; + TimestampTz distributed_transaction_stamp; + + /* fields from pg_stat_statement */ + Oid database_id; + Name databaese_name; + int process_id; + Oid usesysid; + Name usename; + text *application_name; + inet *client_addr; + text *client_hostname; + int client_port; + TimestampTz backend_start; + TimestampTz xact_start; + TimestampTz query_start; + TimestampTz state_change; + text *wait_event_type; + text *wait_event; + text *state; + TransactionId backend_xid; + TransactionId backend_xmin; + text *query; + text *backend_type; +} CitusDistStat; + + +/* local forward declarations */ +static List * CitusDistStatActivity(const char *statQuery); +static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo); +static CitusDistStat * ParseCitusDistStat(PGresult *result, int64 rowIndex); + +/* utility functions to parse the fields from PGResult */ +static text * ParseTextField(PGresult *result, int rowIndex, int colIndex); +static Name ParseNameField(PGresult *result, int rowIndex, int colIndex); +static inet * ParseInetField(PGresult *result, int rowIndex, int colIndex); +static TransactionId ParseXIDField(PGresult *result, int rowIndex, int colIndex); + +/* utility functions to fetch the fields from heapTuple */ +static List * GetLocalNodeCitusDistStat(const char *statQuery); +static List * LocalNodeCitusDistStat(const char *statQuery, const char *hostname, int + port); +static CitusDistStat * HeapTupleToCitusDistStat(HeapTuple result, TupleDesc + rowDescriptor); +static int64 ParseIntFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex); +static text * ParseTextFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int + colIndex); +static Name ParseNameFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex); +static inet * ParseInetFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int + colIndex); +static TimestampTz ParseTimestampTzFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, + int colIndex); +static TransactionId ParseXIDFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int + colIndex); + + +PG_FUNCTION_INFO_V1(citus_dist_stat_activity); +PG_FUNCTION_INFO_V1(citus_worker_stat_activity); + + +/* + * citus_dist_stat_activity connects to all nodes in the cluster and returns + * pg_stat_activity like result set but only consisting of queries that are + * on the distributed tables and inside distributed transactions. + */ +Datum +citus_dist_stat_activity(PG_FUNCTION_ARGS) +{ + List *citusDistStatStatements = NIL; + StringInfo citusDistStatQuery = NULL; + const char *workerQuery = "false"; + + CheckCitusVersion(ERROR); + + /* set the workerQuery to false in the query */ + citusDistStatQuery = makeStringInfo(); + appendStringInfo(citusDistStatQuery, CITUS_DIST_STAT_ACTIVITY_QUERY, + workerQuery); + + citusDistStatStatements = CitusDistStatActivity(citusDistStatQuery->data); + + ReturnCitusDistStats(citusDistStatStatements, fcinfo); + + PG_RETURN_VOID(); +} + + +/* + * citus_worker_stat_activity connects to all nodes in the cluster and returns + * pg_stat_activity like result set but only consisting of queries that are + * on the shards of distributed tables and inside distributed transactions. + */ +Datum +citus_worker_stat_activity(PG_FUNCTION_ARGS) +{ + List *citusWorkerStatStatements = NIL; + StringInfo cituWorkerStatQuery = NULL; + const char *workerQuery = "true"; + + CheckCitusVersion(ERROR); + + /* set the workerQuery to true in the query */ + cituWorkerStatQuery = makeStringInfo(); + appendStringInfo(cituWorkerStatQuery, CITUS_DIST_STAT_ACTIVITY_QUERY, + workerQuery); + + citusWorkerStatStatements = CitusDistStatActivity(cituWorkerStatQuery->data); + + ReturnCitusDistStats(citusWorkerStatStatements, fcinfo); + + PG_RETURN_VOID(); +} + + +/* + * CitusDistStatActivity gets the stats query, connects to each node in the + * cluster, executes the query and parses the results. The function returns + * list of CitusDistStat struct for further processing. + * + * The function connects to each active primary node in the pg_dist_node. Plus, + * if the query is being executed on the coordinator, the function connects to + * localhost as well. The implication of this is that whenever the query is executed + * from a MX worker node, it wouldn't be able to get information from the queries + * executed on the coordinator given that there is not metadata information about that. + */ +static List * +CitusDistStatActivity(const char *statQuery) +{ + List *citusStatsList = NIL; + + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + /* + * For the local node, we can avoid opening connections. This might be + * important when we're on the coordinator since it requires configuring + * the authentication for self-connection via any user who calls the citus + * stat activity functions. + */ + citusStatsList = GetLocalNodeCitusDistStat(statQuery); + + /* + * We prefer to connect with the current user to the remote nodes. This will + * ensure that we have the same privilage restrictions that pg_stat_activity + * enforces. + */ + nodeUser = CurrentUserName(); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + if (workerNode->groupId == GetLocalGroupId()) + { + /* we already get these stats via GetLocalNodeCitusDistStat() */ + continue; + } + + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + + querySent = SendRemoteCommand(connection, statQuery); + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + } + } + + /* receive query results */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + continue; + } + + rowCount = PQntuples(result); + colCount = PQnfields(result); + + if (colCount != CITUS_DIST_STAT_ACTIVITY_QUERY_COLS) + { + /* + * We don't expect to hit this error, but keep it here in case there + * is a version mistmatch. + */ + ereport(WARNING, (errmsg("unexpected number of columns from " + "citus stat query"))); + continue; + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + CitusDistStat *citusDistStat = ParseCitusDistStat(result, rowIndex); + + /* + * Add the query_host_name and query_host_port which denote where + * the query is being running. + */ + citusDistStat->query_host_name = cstring_to_text(connection->hostname); + citusDistStat->query_host_port = connection->port; + + citusStatsList = lappend(citusStatsList, citusDistStat); + } + + PQclear(result); + ForgetResults(connection); + } + + return citusStatsList; +} + + +/* + * GetLocalNodeCitusDistStat simple executes the given query with SPI to get + * the result of the given stat query on the local node. + */ +static List * +GetLocalNodeCitusDistStat(const char *statQuery) +{ + List *citusStatsList = NIL; + + List *workerNodeList = NIL; + ListCell *workerNodeCell = NULL; + int localGroupId = -1; + + if (IsCoordinator()) + { + /* + * Coordinator's nodename and nodeport doesn't show-up in the metadata, + * so mark it manually as executing from the coordinator. + */ + citusStatsList = LocalNodeCitusDistStat(statQuery, coordinator_host_name, + PostPortNumber); + + return citusStatsList; + } + + localGroupId = GetLocalGroupId(); + + /* get the current worker's node stats */ + workerNodeList = ActivePrimaryNodeList(); + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + + if (workerNode->groupId == localGroupId) + { + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + citusStatsList = LocalNodeCitusDistStat(statQuery, nodeName, nodePort); + + break; + } + } + + return citusStatsList; +} + + +/* + * ParseCitusDistStat is a helper function which basically gets a PGresult + * and parses the results for rowIndex. Finally, returns CitusDistStat for + * further processing of the data retrieved. + * + * HeapTupleToCitusDistStat() and ParseCitusDistStat() are doing the same thing on + * different input data structures. Thus, any change to here should be reflected in + * the other function as well. + */ +static CitusDistStat * +ParseCitusDistStat(PGresult *result, int64 rowIndex) +{ + CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); + int initiator_node_identifier = 0; + WorkerNode *initiatorWorkerNode = NULL; + + /* + * Replace initiator_node_identifier with initiator_node_hostname + * and initiator_node_port given that those are a lot more useful. + * + * The rules are following: + * - If initiator_node_identifier belongs to a worker, simply get it + * from the metadata + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on the coordinator, get the localhost + * and port + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on a worker node, manually mark it + * as "coordinator_host" given that we cannot know the host and port + */ + initiator_node_identifier = ParseIntField(result, rowIndex, 0); + if (initiator_node_identifier != 0) + { + bool nodeExists = false; + + initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); + + /* a query should run on an existing node */ + Assert(nodeExists); + citusDistStat->master_query_host_name = + cstring_to_text(initiatorWorkerNode->workerName); + citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; + } + else if (initiator_node_identifier == 0 && IsCoordinator()) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = PostPortNumber; + } + else + { + /* + * We could only get here if the function is called from metadata workers and + * the query is initiated from the coordinator. + */ + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = 0; + } + + citusDistStat->distributed_transaction_number = ParseIntField(result, rowIndex, 1); + citusDistStat->distributed_transaction_stamp = + ParseTimestampTzField(result, rowIndex, 2); + + /* fields from pg_stat_statement */ + citusDistStat->database_id = ParseIntField(result, rowIndex, 3); + citusDistStat->databaese_name = ParseNameField(result, rowIndex, 4); + citusDistStat->process_id = ParseIntField(result, rowIndex, 5); + citusDistStat->usesysid = ParseIntField(result, rowIndex, 6); + citusDistStat->usename = ParseNameField(result, rowIndex, 7); + citusDistStat->application_name = ParseTextField(result, rowIndex, 8); + citusDistStat->client_addr = ParseInetField(result, rowIndex, 9); + citusDistStat->client_hostname = ParseTextField(result, rowIndex, 10); + citusDistStat->client_port = ParseIntField(result, rowIndex, 11); + citusDistStat->backend_start = ParseTimestampTzField(result, rowIndex, 12); + citusDistStat->xact_start = ParseTimestampTzField(result, rowIndex, 13); + citusDistStat->query_start = ParseTimestampTzField(result, rowIndex, 14); + citusDistStat->state_change = ParseTimestampTzField(result, rowIndex, 15); + citusDistStat->wait_event_type = ParseTextField(result, rowIndex, 16); + citusDistStat->wait_event = ParseTextField(result, rowIndex, 17); + citusDistStat->state = ParseTextField(result, rowIndex, 18); + citusDistStat->backend_xid = ParseXIDField(result, rowIndex, 19); + citusDistStat->backend_xmin = ParseXIDField(result, rowIndex, 20); + citusDistStat->query = ParseTextField(result, rowIndex, 21); + citusDistStat->backend_type = ParseTextField(result, rowIndex, 22); + + return citusDistStat; +} + + +/* + * LocalNodeCitusDistStat simply executes the given query via SPI and parses + * the results back in a list for further processing. + * + * hostname and port is provided for filling the fields on the return list, obviously + * not for executing the SPI. + */ +static List * +LocalNodeCitusDistStat(const char *statQuery, const char *hostname, int port) +{ + List *localNodeCitusDistStatList = NIL; + int spiConnectionResult = 0; + int spiQueryResult = 0; + bool readOnly = true; + uint32 rowIndex = 0; + + MemoryContext upperContext = CurrentMemoryContext, oldContext = NULL; + + spiConnectionResult = SPI_connect(); + if (spiConnectionResult != SPI_OK_CONNECT) + { + ereport(WARNING, (errmsg("could not connect to SPI manager to get " + "the local stat activity"))); + + SPI_finish(); + + return NIL; + } + + spiQueryResult = SPI_execute(statQuery, readOnly, 0); + if (spiQueryResult != SPI_OK_SELECT) + { + ereport(WARNING, (errmsg("execution was not successful while trying to get " + "the local stat activity"))); + + SPI_finish(); + + return NIL; + } + + /* + * SPI_connect switches to its own memory context, which is destroyed by + * the call to SPI_finish. SPI_palloc is provided to allocate memory in + * the previous ("upper") context, but that is inadequate when we need to + * call other functions that themselves use the normal palloc (such as + * lappend). So we switch to the upper context ourselves as needed. + */ + oldContext = MemoryContextSwitchTo(upperContext); + + for (rowIndex = 0; rowIndex < SPI_processed; rowIndex++) + { + HeapTuple row = NULL; + TupleDesc rowDescriptor = SPI_tuptable->tupdesc; + CitusDistStat *citusDistStat = NULL; + + /* we use pointers from the tuple, so copy it before processing */ + row = SPI_copytuple(SPI_tuptable->vals[rowIndex]); + citusDistStat = HeapTupleToCitusDistStat(row, rowDescriptor); + + /* + * Add the query_host_name and query_host_port which denote where + * the query is being running. + */ + citusDistStat->query_host_name = cstring_to_text(hostname); + citusDistStat->query_host_port = port; + + localNodeCitusDistStatList = lappend(localNodeCitusDistStatList, citusDistStat); + } + + MemoryContextSwitchTo(oldContext); + + SPI_finish(); + + return localNodeCitusDistStatList; +} + + +/* + * HeapTupleToCitusDistStat is a helper function which basically gets a heapTuple + * and fetches the results for the given tuple. Finally, returns CitusDistStat for + * further processing of the data retrieved. + * + * HeapTupleToCitusDistStat() and ParseCitusDistStat() are doing the same thing on + * different input data structures. Thus, any change to here should be reflected in + * the other function as well. + */ +static CitusDistStat * +HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) +{ + CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); + int initiator_node_identifier = 0; + WorkerNode *initiatorWorkerNode = NULL; + + /* + * Replace initiator_node_identifier with initiator_node_hostname + * and initiator_node_port given that those are a lot more useful. + * + * The rules are following: + * - If initiator_node_identifier belongs to a worker, simply get it + * from the metadata + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on the coordinator, get the localhost + * and port + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on a worker node, manually mark it + * as "coordinator_host" given that we cannot know the host and port + */ + initiator_node_identifier = ParseIntFieldFromHeapTuple(result, rowDescriptor, 1); + if (initiator_node_identifier != 0) + { + bool nodeExists = false; + + initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); + + /* a query should run on an existing node */ + Assert(nodeExists); + citusDistStat->master_query_host_name = + cstring_to_text(initiatorWorkerNode->workerName); + citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; + } + else if (initiator_node_identifier == 0 && IsCoordinator()) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = PostPortNumber; + } + else + { + /* + * We could only get here if the function is called from metadata workers and + * the query is initiated from the coordinator. + */ + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = 0; + } + + citusDistStat->distributed_transaction_number = + ParseIntFieldFromHeapTuple(result, rowDescriptor, 2); + citusDistStat->distributed_transaction_stamp = + ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 3); + + /* fields from pg_stat_statement */ + citusDistStat->database_id = ParseIntFieldFromHeapTuple(result, rowDescriptor, 4); + citusDistStat->databaese_name = ParseNameFieldFromHeapTuple(result, rowDescriptor, 5); + citusDistStat->process_id = ParseIntFieldFromHeapTuple(result, rowDescriptor, 6); + citusDistStat->usesysid = ParseIntFieldFromHeapTuple(result, rowDescriptor, 7); + citusDistStat->usename = ParseNameFieldFromHeapTuple(result, rowDescriptor, 8); + citusDistStat->application_name = + ParseTextFieldFromHeapTuple(result, rowDescriptor, 9); + citusDistStat->client_addr = ParseInetFieldFromHeapTuple(result, rowDescriptor, 10); + citusDistStat->client_hostname = + ParseTextFieldFromHeapTuple(result, rowDescriptor, 11); + citusDistStat->client_port = ParseIntFieldFromHeapTuple(result, rowDescriptor, 12); + citusDistStat->backend_start = + ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 13); + citusDistStat->xact_start = + ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 14); + citusDistStat->query_start = + ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 15); + citusDistStat->state_change = + ParseTimestampTzFieldFromHeapTuple(result, rowDescriptor, 16); + citusDistStat->wait_event_type = + ParseTextFieldFromHeapTuple(result, rowDescriptor, 17); + citusDistStat->wait_event = ParseTextFieldFromHeapTuple(result, rowDescriptor, 18); + citusDistStat->state = ParseTextFieldFromHeapTuple(result, rowDescriptor, 19); + citusDistStat->backend_xid = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 20); + citusDistStat->backend_xmin = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 21); + citusDistStat->query = ParseTextFieldFromHeapTuple(result, rowDescriptor, 22); + citusDistStat->backend_type = ParseTextFieldFromHeapTuple(result, rowDescriptor, 23); + + return citusDistStat; +} + + +/* + * ParseIntFieldFromHeapTuple fetches an int64 from a heapTuple or returns 0 if the + * result is NULL. + */ +static int64 +ParseIntFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + return 0; + } + + return DatumGetInt64(resultDatum); +} + + +/* + * ParseTextFieldFromHeapTuple parses a text from a heapTuple or returns + * NULL if the result is NULL. + */ +static text * +ParseTextFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + return NULL; + } + + return (text *) DatumGetPointer(resultDatum); +} + + +/* + * ParseNameFieldFromHeapTuple fetches a name from a heapTuple result or returns NULL if the + * result is NULL. + */ +static Name +ParseNameFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + return NULL; + } + + return (Name) DatumGetPointer(resultDatum); +} + + +/* + * ParseInetFieldFromHeapTuple fetcges an inet from a heapTuple or returns NULL if the + * result is NULL. + */ +static inet * +ParseInetFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + return NULL; + } + + return DatumGetInetP(resultDatum); +} + + +/* + * ParseTimestampTzFieldFromHeapTuple parses a timestamptz from a heapTuple or returns + * DT_NOBEGIN if the result is NULL. + */ +static TimestampTz +ParseTimestampTzFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + return DT_NOBEGIN; + } + + return DatumGetTimestampTz(resultDatum); +} + + +/* + * ParseXIDFieldFromHeapTuple parses a XID from a heapTuple or returns + * PG_UINT32_MAX if the result is NULL. + */ +static TransactionId +ParseXIDFieldFromHeapTuple(HeapTuple tuple, TupleDesc tupdesc, int colIndex) +{ + Datum resultDatum; + bool isNull = false; + + resultDatum = SPI_getbinval(tuple, tupdesc, colIndex, &isNull); + if (isNull) + { + /* + * We'd show NULL if user hits the max transaction id, but that should be + * one of the minor problems they'd probably hit. + */ + return PG_UINT32_MAX; + } + + return DatumGetTransactionId(resultDatum); +} + + +/* + * ParseTextField parses a text from a remote result or returns NULL if the + * result is NULL. + */ +static text * +ParseTextField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum textDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return NULL; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + textDatum = DirectFunctionCall1(textin, resultStringDatum); + + return (text *) DatumGetPointer(textDatum); +} + + +/* + * ParseNameField parses a name from a remote result or returns NULL if the + * result is NULL. + */ +static Name +ParseNameField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum nameDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return (Name) nameDatum; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + nameDatum = DirectFunctionCall1(namein, resultStringDatum); + + return (Name) DatumGetPointer(nameDatum); +} + + +/* + * ParseInetField parses an inet from a remote result or returns NULL if the + * result is NULL. + */ +static inet * +ParseInetField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum inetDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return NULL; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + inetDatum = DirectFunctionCall1(inet_in, resultStringDatum); + + return DatumGetInetP(inetDatum); +} + + +/* + * ParseXIDField parses an XID from a remote result or returns 0 if the + * result is NULL. + */ +static TransactionId +ParseXIDField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum XIDDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + /* + * We'd show NULL if user hits the max transaction id, but that should be + * one of the minor problems they'd probably hit. + */ + return PG_UINT32_MAX; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + XIDDatum = DirectFunctionCall1(xidin, resultStringDatum); + + return DatumGetTransactionId(XIDDatum); +} + + +/* + * ReturnCitusDistStats returns the stats for a set returning function. + */ +static void +ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo) +{ + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDesc = NULL; + Tuplestorestate *tupleStore = NULL; + MemoryContext per_query_ctx = NULL; + MemoryContext oldContext = NULL; + + ListCell *citusStatsCell = NULL; + + /* check to see if caller supports us returning a tuplestore */ + if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(resultInfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + per_query_ctx = resultInfo->econtext->ecxt_per_query_memory; + oldContext = MemoryContextSwitchTo(per_query_ctx); + + tupleStore = tuplestore_begin_heap(true, false, work_mem); + resultInfo->returnMode = SFRM_Materialize; + resultInfo->setResult = tupleStore; + resultInfo->setDesc = tupleDesc; + MemoryContextSwitchTo(oldContext); + + foreach(citusStatsCell, citusStatsList) + { + CitusDistStat *citusDistStat = (CitusDistStat *) lfirst(citusStatsCell); + + Datum values[CITUS_DIST_STAT_ACTIVITY_COLS]; + bool nulls[CITUS_DIST_STAT_ACTIVITY_COLS]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + if (citusDistStat->query_host_name != NULL) + { + values[0] = PointerGetDatum(citusDistStat->query_host_name); + } + else + { + nulls[0] = true; + } + + values[1] = Int32GetDatum(citusDistStat->query_host_port); + + if (citusDistStat->master_query_host_name != NULL) + { + values[2] = PointerGetDatum(citusDistStat->master_query_host_name); + } + else + { + nulls[2] = true; + } + + values[3] = Int32GetDatum(citusDistStat->master_query_host_port); + values[4] = UInt64GetDatum(citusDistStat->distributed_transaction_number); + + if (citusDistStat->distributed_transaction_stamp != DT_NOBEGIN) + { + values[5] = TimestampTzGetDatum(citusDistStat->distributed_transaction_stamp); + } + else + { + nulls[5] = true; + } + + values[6] = ObjectIdGetDatum(citusDistStat->database_id); + + if (citusDistStat->databaese_name != NULL) + { + values[7] = CStringGetDatum(NameStr(*citusDistStat->databaese_name)); + } + else + { + nulls[7] = true; + } + + values[8] = Int32GetDatum(citusDistStat->process_id); + values[9] = ObjectIdGetDatum(citusDistStat->usesysid); + + if (citusDistStat->usename != NULL) + { + values[10] = CStringGetDatum(NameStr(*citusDistStat->usename)); + } + else + { + nulls[10] = true; + } + + if (citusDistStat->application_name != NULL) + { + values[11] = PointerGetDatum(citusDistStat->application_name); + } + else + { + nulls[11] = true; + } + + if (citusDistStat->client_addr != NULL) + { + values[12] = InetPGetDatum(citusDistStat->client_addr); + } + else + { + nulls[12] = true; + } + + if (citusDistStat->client_hostname != NULL) + { + values[13] = PointerGetDatum(citusDistStat->client_hostname); + } + else + { + nulls[13] = true; + } + + values[14] = Int32GetDatum(citusDistStat->client_port); + + if (citusDistStat->backend_start != DT_NOBEGIN) + { + values[15] = TimestampTzGetDatum(citusDistStat->backend_start); + } + else + { + nulls[15] = true; + } + + if (citusDistStat->xact_start != DT_NOBEGIN) + { + values[16] = TimestampTzGetDatum(citusDistStat->xact_start); + } + else + { + nulls[16] = true; + } + + if (citusDistStat->query_start != DT_NOBEGIN) + { + values[17] = TimestampTzGetDatum(citusDistStat->query_start); + } + else + { + nulls[17] = true; + } + + if (citusDistStat->state_change != DT_NOBEGIN) + { + values[18] = TimestampTzGetDatum(citusDistStat->state_change); + } + else + { + nulls[18] = true; + } + + if (citusDistStat->wait_event_type != NULL) + { + values[19] = PointerGetDatum(citusDistStat->wait_event_type); + } + else + { + nulls[19] = true; + } + + if (citusDistStat->wait_event != NULL) + { + values[20] = PointerGetDatum(citusDistStat->wait_event); + } + else + { + nulls[20] = true; + } + + if (citusDistStat->state != NULL) + { + values[21] = PointerGetDatum(citusDistStat->state); + } + else + { + nulls[21] = true; + } + + if (citusDistStat->backend_xid != PG_UINT32_MAX) + { + values[22] = TransactionIdGetDatum(citusDistStat->backend_xid); + } + else + { + nulls[22] = true; + } + + if (citusDistStat->backend_xmin != PG_UINT32_MAX) + { + values[23] = TransactionIdGetDatum(citusDistStat->backend_xmin); + } + else + { + nulls[23] = true; + } + + if (citusDistStat->query != NULL) + { + values[24] = PointerGetDatum(citusDistStat->query); + } + else + { + nulls[24] = true; + } + + if (citusDistStat->backend_type != NULL) + { + values[25] = PointerGetDatum(citusDistStat->backend_type); + } + else + { + nulls[25] = true; + } + + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); +} diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 793fa3cca..d3b51b3e5 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -44,9 +44,6 @@ typedef struct PROCStack static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); -static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); -static bool ParseBoolField(PGresult *result, int rowIndex, int colIndex); -static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildLocalWaitGraph(void); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); @@ -204,7 +201,7 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) * ParseIntField parses a int64 from a remote result or returns 0 if the * result is NULL. */ -static int64 +int64 ParseIntField(PGresult *result, int rowIndex, int colIndex) { char *resultString = NULL; @@ -224,7 +221,7 @@ ParseIntField(PGresult *result, int rowIndex, int colIndex) * ParseBoolField parses a bool from a remote result or returns false if the * result is NULL. */ -static bool +bool ParseBoolField(PGresult *result, int rowIndex, int colIndex) { char *resultString = NULL; @@ -248,7 +245,7 @@ ParseBoolField(PGresult *result, int rowIndex, int colIndex) * ParseTimestampTzField parses a timestamptz from a remote result or returns * 0 if the result is NULL. */ -static TimestampTz +TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) { char *resultString = NULL; @@ -257,7 +254,7 @@ ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) if (PQgetisnull(result, rowIndex, colIndex)) { - return 0; + return DT_NOBEGIN; } resultString = PQgetvalue(result, rowIndex, colIndex); diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index c2adfc62b..977e6dba8 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -14,6 +14,7 @@ #include "postgres.h" +#include "libpq-fe.h" #include "datatype/timestamp.h" @@ -58,5 +59,10 @@ extern WaitGraph * BuildGlobalWaitGraph(void); extern bool IsProcessWaitingForLock(PGPROC *proc); extern bool IsInDistributedTransaction(BackendData *backendData); +/* some utility function to parse results */ +extern int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); +extern bool ParseBoolField(PGresult *result, int rowIndex, int colIndex); +extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); + #endif /* LOCK_GRAPH_H */ diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out new file mode 100644 index 000000000..c9284c513 --- /dev/null +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -0,0 +1,181 @@ +Parsed test spec with 3 sessions + +starting permutation: s1-begin s2-begin s3-begin s1-alter-table s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE test_table ADD COLUMN x INT; + +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + ALTER TABLE test_table ADD COLUMN x INT; +coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +SELECT worker_apply_shard_ddl_command (105833, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT worker_apply_shard_ddl_command (105832, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT worker_apply_shard_ddl_command (105831, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT worker_apply_shard_ddl_command (105830, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-insert s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-insert: + INSERT INTO test_table VALUES (100, 100); + +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + INSERT INTO test_table VALUES (100, 100); +coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +INSERT INTO public.test_table_105836 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-select s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-select: + SELECT count(*) FROM test_table; + +count + +0 +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + SELECT count(*) FROM test_table; +coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +COPY (SELECT count(*) AS count FROM test_table_105841 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105840 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105839 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105838 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-select-router s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-select-router: + SELECT count(*) FROM test_table WHERE column1 = 55; + +count + +0 +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + diff --git a/src/test/regress/expected/isolation_citus_dist_activity_0.out b/src/test/regress/expected/isolation_citus_dist_activity_0.out new file mode 100644 index 000000000..66ebd59a6 --- /dev/null +++ b/src/test/regress/expected/isolation_citus_dist_activity_0.out @@ -0,0 +1,181 @@ +Parsed test spec with 3 sessions + +starting permutation: s1-begin s2-begin s3-begin s1-alter-table s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE test_table ADD COLUMN x INT; + +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + ALTER TABLE test_table ADD COLUMN x INT; +coordinator_host57636 coordinator_host57636 idle in transaction postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +SELECT worker_apply_shard_ddl_command (105297, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57638 coordinator_host57636 idle in transaction postgres regression +SELECT worker_apply_shard_ddl_command (105296, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57637 coordinator_host57636 idle in transaction postgres regression +SELECT worker_apply_shard_ddl_command (105295, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57638 coordinator_host57636 idle in transaction postgres regression +SELECT worker_apply_shard_ddl_command (105294, 'public', ' + ALTER TABLE test_table ADD COLUMN x INT; +')localhost 57637 coordinator_host57636 idle in transaction postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-insert s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-insert: + INSERT INTO test_table VALUES (100, 100); + +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + INSERT INTO test_table VALUES (100, 100); +coordinator_host57636 coordinator_host57636 idle in transaction postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +INSERT INTO public.test_table_105300 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transaction postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-select s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-select: + SELECT count(*) FROM test_table; + +count + +0 +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + SELECT count(*) FROM test_table; +coordinator_host57636 coordinator_host57636 idle in transaction postgres regression +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +COPY (SELECT count(*) AS count FROM test_table_105305 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transaction postgres regression +COPY (SELECT count(*) AS count FROM test_table_105304 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transaction postgres regression +COPY (SELECT count(*) AS count FROM test_table_105303 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transaction postgres regression +COPY (SELECT count(*) AS count FROM test_table_105302 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transaction postgres regression +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + + +starting permutation: s1-begin s2-begin s3-begin s1-select-router s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-select-router: + SELECT count(*) FROM test_table WHERE column1 = 55; + +count + +0 +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +step s3-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +step s2-rollback: + ROLLBACK; + +step s1-commit: + COMMIT; + +step s3-rollback: + ROLLBACK; + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c14785c18..531b44f49 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -147,6 +147,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; ALTER EXTENSION citus UPDATE TO '8.0-4'; +ALTER EXTENSION citus UPDATE TO '8.0-5'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index b19551a3d..728f7dbb4 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -8,7 +8,6 @@ test: isolation_create_table_vs_add_remove_node # isolation_cluster_management such that tests # that come later can be parallelized test: isolation_cluster_management - test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement test: isolation_concurrent_dml isolation_data_migration @@ -43,3 +42,4 @@ test: isolation_delete_vs_all test: isolation_truncate_vs_all test: isolation_drop_vs_all test: isolation_ddl_vs_all +test: isolation_citus_dist_activity diff --git a/src/test/regress/specs/isolation_citus_dist_activity.spec b/src/test/regress/specs/isolation_citus_dist_activity.spec new file mode 100644 index 000000000..4a3edf0de --- /dev/null +++ b/src/test/regress/specs/isolation_citus_dist_activity.spec @@ -0,0 +1,88 @@ +setup +{ + SET citus.shard_replication_factor TO 1; + SET citus.shard_count TO 4; + + CREATE TABLE test_table(column1 int, column2 int); + SELECT create_distributed_table('test_table', 'column1'); +} + +teardown +{ + DROP TABLE test_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-alter-table" +{ + ALTER TABLE test_table ADD COLUMN x INT; +} + +step "s1-select" +{ + SELECT count(*) FROM test_table; +} + +step "s1-select-router" +{ + SELECT count(*) FROM test_table WHERE column1 = 55; +} + +step "s1-insert" +{ + INSERT INTO test_table VALUES (100, 100); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-rollback" +{ + ROLLBACK; +} + +step "s2-view-dist" +{ + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity ORDER BY query DESC; + +} + +session "s3" + +step "s3-begin" +{ + BEGIN; +} + +step "s3-rollback" +{ + ROLLBACK; +} + +step "s3-view-worker" +{ + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; +} + + +permutation "s1-begin" "s2-begin" "s3-begin" "s1-alter-table" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" +permutation "s1-begin" "s2-begin" "s3-begin" "s1-insert" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" +permutation "s1-begin" "s2-begin" "s3-begin" "s1-select" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" + +# router selects don't show up because BEGIN is not sent for performance reasons +permutation "s1-begin" "s2-begin" "s3-begin" "s1-select-router" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 822dc0555..d6a6e1d1f 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -147,6 +147,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; ALTER EXTENSION citus UPDATE TO '8.0-4'; +ALTER EXTENSION citus UPDATE TO '8.0-5'; -- show running version SHOW citus.version;