mirror of https://github.com/citusdata/citus.git
Add citus_stat_counters view and citus_stat_counters_reset() function to reset it (#7917)
DESCRIPTION: Adds citus_stat_counters view that can be used to query stat counters that Citus collects while the feature is enabled, which is controlled by citus.enable_stat_counters. citus_stat_counters() can be used to query the stat counters for the provided database oid and citus_stat_counters_reset() can be used to reset them for the provided database oid or for the current database if nothing or 0 is provided. Today we don't persist stat counters on server shutdown. In other words, stat counters are automatically reset in case of a server restart. Details on the underlying design can be found in header comment of stat_counters.c and in the technical readme. ------- Here are the details about what we track as of this PR: For connection management, we have three statistics about the inter-node connections initiated by the node itself: * **connection_establishment_succeeded** * **connection_establishment_failed** * **connection_reused** While the first two are relatively easier to understand, the third one covers the case where a connection is reused. This can happen when a connection was already established to the desired node, Citus decided to cache it for some time (see citus.max_cached_conns_per_worker & citus.max_cached_connection_lifetime), and then reused it for a new remote operation. Here are the other important details about these connection statistics: 1. connection_establishment_failed doesn't care about the connections that we could establish but are lost later in the transaction. Plus, we cannot guarantee that the connections that are counted in connection_establishment_succeeded were not lost later. 2. connection_establishment_failed doesn't care about the optional connections (see OPTIONAL_CONNECTION flag) that we gave up establishing because of the connection throttling rules we follow (see citus.max_shared_pool_size & citus.local_shared_pool_size). The reaason for this is that we didn't even try to establish these connections. 3. For the rest of the cases where a connection failed for some reason, we always increment connection_establishment_failed even if the caller was okay with the failure and know how to recover from it (e.g., the adaptive executor knows how to fall back local execution when the target node is the local node and if it cannot establish a connection to the local node). The reason is that even if it's likely that we can still serve the operation, we still failed to establish the connection and we want to track this. 4. Finally, the connection failures that we count in connection_establishment_failed might be caused by any of the following reasons and for now we prefer to _not_ further distinguish them for simplicity: a. remote node is down or cannot accept any more connections, or overloaded such that citus.node_connection_timeout is not enough to establish a connection b. any internal Citus error that might result in preparing a bad connection string so that libpq fails when parsing the connection string even before actually trying to establish a connection via connect() call c. broken citus.node_conninfo or such Citus configuration that was incorrectly set by the user can also result in similar outcomes as in b d. internal waitevent set / poll errors or OOM in local node We also track two more statistics for query execution: * **query_execution_single_shard** * **query_execution_multi_shard** And more importantly, both query_execution_single_shard and query_execution_multi_shard are not only tracked for the top-level queries but also for the subplans etc. The reason is that for some queries, e.g., the ones that go through recursive planning, after Citus performs the heavy work as part of subplans, the work that needs to be done for the top-level query becomes quite straightforward. And for such query types, it would be deceiving if we only incremented the query stat counters for the top-level query. Similarly, for non-pushable INSERT .. SELECT and MERGE queries, we perform separate counter increments for the SELECT / source part of the query besides the final INSERT / MERGE query.pull/7974/head
parent
37e23f44b4
commit
3d61c4dc71
|
@ -57,6 +57,8 @@ The purpose of this document is to provide comprehensive technical documentation
|
||||||
- [Query from any node](#query-from-any-node)
|
- [Query from any node](#query-from-any-node)
|
||||||
- [Why didn’t we have dedicated Query Nodes and Data Nodes?](#why-didnt-we-have-dedicated-query-nodes-and-data-nodes)
|
- [Why didn’t we have dedicated Query Nodes and Data Nodes?](#why-didnt-we-have-dedicated-query-nodes-and-data-nodes)
|
||||||
- [Shard visibility](#shard-visibility)
|
- [Shard visibility](#shard-visibility)
|
||||||
|
- [Statistic tracking](#statistic-tracking)
|
||||||
|
- [Citus stat counters](#citus-stat-counters)
|
||||||
|
|
||||||
# Citus Concepts
|
# Citus Concepts
|
||||||
|
|
||||||
|
@ -2702,3 +2704,44 @@ Shards can be revealed via two settings:
|
||||||
|
|
||||||
- `citus.override_shard_visibility = off` disables shard hiding entirely
|
- `citus.override_shard_visibility = off` disables shard hiding entirely
|
||||||
- `citus.show_shards_for_app_name_prefixes`= 'pgAdmin,psql'` disables shard hiding only for specific application_name values, by prefix
|
- `citus.show_shards_for_app_name_prefixes`= 'pgAdmin,psql'` disables shard hiding only for specific application_name values, by prefix
|
||||||
|
|
||||||
|
## Statistic tracking
|
||||||
|
|
||||||
|
Statistic views defined by Postgres already work well for one Citus node, like `pg_stat_database`, `pg_stat_activity`, `pg_stat_statements`, etc. And for some of them, we even provide wrapper views in Citus to have a global (i.e., cluster-wide) view of the statistics, like `citus_stat_activity`.
|
||||||
|
|
||||||
|
And beside these, Citus itself also provides some additional statistics views to track the Citus-specific activities. Note that the way we collect statastics for each is quite different.
|
||||||
|
|
||||||
|
- `citus_stat_tenants` (needs documentation)
|
||||||
|
- `citus_stat_statements` (needs documentation)
|
||||||
|
- `citus_stat_counters`
|
||||||
|
|
||||||
|
### Citus stat counters
|
||||||
|
(
|
||||||
|
Citus keeps track of several stat counters and exposes them via the `citus_stat_counters` view. The counters are tracked once `citus.enable_stat_counters` is set to true. Also, `citus_stat_counters_reset()` can be used to reset the counters for a single database if a database id different than 0 (default, InvalidOid) is provided, otherwise, it resets the counters for all databases.
|
||||||
|
|
||||||
|
Details about the implementation and its caveats can be found in the header comment of [stat_counters.c](/src/backend/distributed/stat_counters.c). However, at the high level;
|
||||||
|
1. We allocate a shared memory array of length `MaxBackends` so that each backend has its own counter slot to reduce the contention while incrementing the counters at the runtime.
|
||||||
|
2. We also allocate a shared hash, whose entries correspond to different databases. Then, when a backend exits, it first aggregates its counters to the relevant entry in the shared hash, and then it resets its own counters because the same counter slot might be reused by another backend later.
|
||||||
|
|
||||||
|
Note that today we use the regular shared hash table API (`ShmemInitHash()`) to do this, but we should consider using `dshash_table()` once using many databases with Citus becomes "practically" possible because the performance of the regular shared hash table API is supposed to degrade when the number of entries in the hash table is large.
|
||||||
|
3. So, when `citus_stat_counters` is queried, we first aggregate the counters from the shared memory array and then we add this with the counters aggregated so far in the relevant shared hash entry for the database.
|
||||||
|
This means that if we weren't aggregating the counters in the shared hash when exiting, counters seen in `citus_stat_counters` could drift backwards in time.
|
||||||
|
|
||||||
|
Note that `citus_stat_counters` might observe the counters for a backend twice or perhaps unsee it if the backend was concurrently exiting, However, the next call to `citus_stat_counters` will see the correct values for the counters, so we can live with that for now. However, if one day we think that this is very undesirable, we can enforce blocking behavior between the whole period of `citus_stat_counters` queries and saving the counters in the shared hash entry. However, that will also mean that exiting backends will have to wait for any active `citus_stat_counters` queries to finish, so this needs to be carefully considered.
|
||||||
|
4. Finally, when `citus_stat_counters_reset()` is called, we reset the shared hash entry for the relevant database and also reset the relevant slots in the shared memory array for the provided database.
|
||||||
|
|
||||||
|
Note that there is chance that `citus_stat_counters_reset()` might partially fail to reset the counters for of a backend slot under some rare circumstances, but this should be very rare and we choose to ignore that for the sake of lock-free counter increments.
|
||||||
|
5. Also, today neither of `citus_stat_counters` nor `citus_stat_counters_reset()` explicitly exclude the backend slots that belong to exited backends during their operations. Instead, they consider any "not unused" backend slots where the relevant `PGPROC` points to a valid database oid, which doesn't guarantee that the backend slot is actively used. However, in practice, this is not a problem for neither of these operations due to the reasons mentioned in the relevant functions. However, if we ever decide that this fact slows down these operations, we can consider explicitly excluding the exited backend slots by checking the `BackendData`'s `activeBackend` field.
|
||||||
|
6. As of today, we don't persist stat counters on server shutdown. Although it seems quite straightforward to do so, we skipped doing that at v1. Once we decide to persist the counters, one can check the relevant functions that we have for `citus_stat_statements`, namely, `CitusQueryStatsShmemShutdown()` and `CitusQueryStatsShmemStartup()`. And since it has been quite a long time since we wrote these two functions, we should also make sure to check `pgstat_write_statsfile()` and `pgstat_read_statsfile()` in Postgres to double check if we're missing anything -it seems we have a few-.
|
||||||
|
7. Note that today we don't evict the entries of the said hash table that point to dropped databases because the wrapper view anyway filters them out (thanks to LEFT JOIN) and we don't expect a performance hit when reading from / writing to the hash table because of that unless users have a lot of databases that are dropped and recreated frequently. If some day we think that this is a problem, then we can let Citus maintenance daemon to do that for us periodically.
|
||||||
|
|
||||||
|
The reason why we don't just use a shared hash table for the counters is that it could be more expensive to do hash lookups for each increment. Plus, using a single counter slot for all the backends that are connected to the same database could lead to contention because that definitely requires a lock to be taken. However, incrementing a counter in today's implementation doesn't require acquiring any sort of locks.
|
||||||
|
|
||||||
|
Also, as of writing section, it seems quite likely that Postgres will expose their Cumulative Statistics infra starting with Postgres 18, see https://github.com/postgres/postgres/commit/7949d9594582ab49dee221e1db1aa5401ace49d4.
|
||||||
|
So, once this happens, we can also consider using the same infra to track Citus stat counters. However, we can only do that once we drop support for Postgres versions older than 18.
|
||||||
|
|
||||||
|
### A side note on query stat counters
|
||||||
|
|
||||||
|
Initially, we were thinking of tracking query stat counters at the very end of the planner, namely, via `FinalizePlan()` function. However, that would mean not tracking the execution of the prepared statements because a prepared statement is not planned again once its plan is cached. To give a bit more details, query plan for a prepared statement is typically cached when the same prepared statement is executed five times by Postgres (hard-coded value). Even further, a plan may even be cached after the first time it's executed if it's straightforward enough (e.g. when it doesn't have any parameters).
|
||||||
|
|
||||||
|
For this reason, we track the query stat counters at appropriate places within the CustomScan implementations provided by Citus for adaptive executor and non-pushable insert-select / merge executors.
|
||||||
|
|
|
@ -2727,11 +2727,15 @@ CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId)
|
||||||
ExprContext *econtext = GetPerTupleExprContext(estate);
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
||||||
econtext->ecxt_scantuple = slot;
|
econtext->ecxt_scantuple = slot;
|
||||||
const bool nonPublishableData = false;
|
const bool nonPublishableData = false;
|
||||||
|
|
||||||
|
/* we don't track query counters when distributing a table */
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
DestReceiver *copyDest =
|
DestReceiver *copyDest =
|
||||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
|
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
|
||||||
columnNameList,
|
columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
estate, NULL, nonPublishableData);
|
estate, NULL, nonPublishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* initialise state for writing to shards, we'll open connections on demand */
|
/* initialise state for writing to shards, we'll open connections on demand */
|
||||||
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);
|
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);
|
||||||
|
|
|
@ -106,6 +106,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -499,10 +500,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
|
|
||||||
/* set up the destination for the COPY */
|
/* set up the destination for the COPY */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
|
|
||||||
|
/* we want to track query counters for "COPY (to) distributed-table .." commands */
|
||||||
|
const bool trackQueryCounters = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState, NULL,
|
executorState, NULL,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* if the user specified an explicit append-to_shard option, write to it */
|
/* if the user specified an explicit append-to_shard option, write to it */
|
||||||
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
|
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
|
||||||
|
@ -1877,11 +1882,15 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
||||||
* of intermediate results that are co-located with the actual table.
|
* of intermediate results that are co-located with the actual table.
|
||||||
* The names of the intermediate results with be of the form:
|
* The names of the intermediate results with be of the form:
|
||||||
* intermediateResultIdPrefix_<shardid>
|
* intermediateResultIdPrefix_<shardid>
|
||||||
|
*
|
||||||
|
* If trackQueryCounters is true, the COPY will increment the query stat
|
||||||
|
* counters as needed at the end of the COPY.
|
||||||
*/
|
*/
|
||||||
CitusCopyDestReceiver *
|
CitusCopyDestReceiver *
|
||||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
char *intermediateResultIdPrefix, bool isPublishable)
|
char *intermediateResultIdPrefix, bool isPublishable,
|
||||||
|
bool trackQueryCounters)
|
||||||
{
|
{
|
||||||
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
|
||||||
sizeof(CitusCopyDestReceiver));
|
sizeof(CitusCopyDestReceiver));
|
||||||
|
@ -1901,6 +1910,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
||||||
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
||||||
copyDest->memoryContext = CurrentMemoryContext;
|
copyDest->memoryContext = CurrentMemoryContext;
|
||||||
copyDest->isPublishable = isPublishable;
|
copyDest->isPublishable = isPublishable;
|
||||||
|
copyDest->trackQueryCounters = trackQueryCounters;
|
||||||
|
|
||||||
return copyDest;
|
return copyDest;
|
||||||
}
|
}
|
||||||
|
@ -2587,8 +2597,9 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
|
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
|
||||||
* CitusCopyDestReceiver. It ends the COPY on all the open connections and closes
|
* CitusCopyDestReceiver. It ends the COPY on all the open connections, closes
|
||||||
* the relation.
|
* the relation and increments the query stat counters based on the shards
|
||||||
|
* copied into if requested.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
@ -2599,6 +2610,26 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
ListCell *connectionStateCell = NULL;
|
ListCell *connectionStateCell = NULL;
|
||||||
Relation distributedRelation = copyDest->distributedRelation;
|
Relation distributedRelation = copyDest->distributedRelation;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Increment the query stat counters based on the shards copied into
|
||||||
|
* if requested.
|
||||||
|
*/
|
||||||
|
if (copyDest->trackQueryCounters)
|
||||||
|
{
|
||||||
|
int copiedShardCount =
|
||||||
|
copyDest->shardStateHash ?
|
||||||
|
hash_get_num_entries(copyDest->shardStateHash) :
|
||||||
|
0;
|
||||||
|
if (copiedShardCount <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
||||||
|
|
||||||
FinishLocalColocatedIntermediateFiles(copyDest);
|
FinishLocalColocatedIntermediateFiles(copyDest);
|
||||||
|
@ -3141,6 +3172,15 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
|
|
||||||
SendCopyEnd(copyOutState);
|
SendCopyEnd(copyOutState);
|
||||||
|
|
||||||
|
if (list_length(shardIntervalList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
table_close(distributedRelation, AccessShareLock);
|
table_close(distributedRelation, AccessShareLock);
|
||||||
|
|
||||||
if (completionTag != NULL)
|
if (completionTag != NULL)
|
||||||
|
|
|
@ -39,6 +39,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/run_from_same_connection.h"
|
#include "distributed/run_from_same_connection.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/time_constants.h"
|
#include "distributed/time_constants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
|
@ -354,6 +355,18 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
||||||
if (connection)
|
if (connection)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Increment the connection stat counter for the connections that are
|
||||||
|
* reused only if the connection is in a good state. Here we don't
|
||||||
|
* bother shutting down the connection or such if it is not in a good
|
||||||
|
* state but we mostly want to avoid incrementing the connection stat
|
||||||
|
* counter for a connection that the caller cannot really use.
|
||||||
|
*/
|
||||||
|
if (PQstatus(connection->pgConn) == CONNECTION_OK)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
|
||||||
|
}
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -395,6 +408,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
dlist_delete(&connection->connectionNode);
|
dlist_delete(&connection->connectionNode);
|
||||||
pfree(connection);
|
pfree(connection);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for the optional
|
||||||
|
* connections that we gave up establishing due to connection throttling
|
||||||
|
* because the callers who request optional connections know how to
|
||||||
|
* survive without them.
|
||||||
|
*/
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -982,6 +1001,14 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
{
|
{
|
||||||
waitCount++;
|
waitCount++;
|
||||||
}
|
}
|
||||||
|
else if (connectionState->phase == MULTI_CONNECTION_PHASE_ERROR)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Here we count the connections establishments that failed and that
|
||||||
|
* we won't wait anymore.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* prepare space for socket events */
|
/* prepare space for socket events */
|
||||||
|
@ -1026,6 +1053,11 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
|
|
||||||
if (event->events & WL_POSTMASTER_DEATH)
|
if (event->events & WL_POSTMASTER_DEATH)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for the
|
||||||
|
* optional failed connections because this is not a connection
|
||||||
|
* failure, but a postmaster death in the local node.
|
||||||
|
*/
|
||||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1042,6 +1074,12 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
* reset the memory context
|
* reset the memory context
|
||||||
*/
|
*/
|
||||||
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
|
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Similarly, we don't increment the connection stat counter for the
|
||||||
|
* failed connections here because this is not a connection failure
|
||||||
|
* but a cancellation request is received.
|
||||||
|
*/
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1072,6 +1110,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
eventMask, NULL);
|
eventMask, NULL);
|
||||||
if (!success)
|
if (!success)
|
||||||
{
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("connection establishment for node %s:%d "
|
errmsg("connection establishment for node %s:%d "
|
||||||
"failed", connection->hostname,
|
"failed", connection->hostname,
|
||||||
|
@ -1088,7 +1127,15 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
*/
|
*/
|
||||||
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
|
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
|
||||||
{
|
{
|
||||||
MarkConnectionConnected(connectionState->connection);
|
/*
|
||||||
|
* Since WaitEventSetFromMultiConnectionStates() only adds the
|
||||||
|
* connections that we haven't completed the connection
|
||||||
|
* establishment yet, here we always have a new connection.
|
||||||
|
* In other words, at this point, we surely know that we're
|
||||||
|
* not dealing with a cached connection.
|
||||||
|
*/
|
||||||
|
bool newConnection = true;
|
||||||
|
MarkConnectionConnected(connectionState->connection, newConnection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1172,6 +1219,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
|
||||||
|
|
||||||
/* close connection, otherwise we take up resource on the other side */
|
/* close connection, otherwise we take up resource on the other side */
|
||||||
CitusPQFinish(connection);
|
CitusPQFinish(connection);
|
||||||
|
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1584,7 +1633,7 @@ RemoteTransactionIdle(MultiConnection *connection)
|
||||||
* establishment time when necessary.
|
* establishment time when necessary.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
MarkConnectionConnected(MultiConnection *connection)
|
MarkConnectionConnected(MultiConnection *connection, bool newConnection)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
||||||
|
|
||||||
|
@ -1592,6 +1641,11 @@ MarkConnectionConnected(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
|
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (newConnection)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -171,6 +171,7 @@
|
||||||
#include "distributed/repartition_join_execution.h"
|
#include "distributed/repartition_join_execution.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/transaction_identifier.h"
|
#include "distributed/transaction_identifier.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
@ -690,7 +691,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution,
|
||||||
WorkerSession *session);
|
WorkerSession *session);
|
||||||
static void ConnectionStateMachine(WorkerSession *session);
|
static void ConnectionStateMachine(WorkerSession *session);
|
||||||
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
||||||
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection);
|
||||||
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
||||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||||
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
||||||
|
@ -2035,6 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2810,21 +2812,21 @@ CheckConnectionTimeout(WorkerPool *workerPool)
|
||||||
logLevel = ERROR;
|
logLevel = ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
|
||||||
errmsg("could not establish any connections to the node "
|
|
||||||
"%s:%d after %u ms", workerPool->nodeName,
|
|
||||||
workerPool->nodePort,
|
|
||||||
NodeConnectionTimeout)));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We hit the connection timeout. In that case, we should not let the
|
* We hit the connection timeout. In that case, we should not let the
|
||||||
* connection establishment to continue because the execution logic
|
* connection establishment to continue because the execution logic
|
||||||
* pretends that failed sessions are not going to be used anymore.
|
* pretends that failed sessions are not going to be used anymore.
|
||||||
*
|
*
|
||||||
* That's why we mark the connection as timed out to trigger the state
|
* That's why we mark the connection as timed out to trigger the state
|
||||||
* changes in the executor.
|
* changes in the executor, if we don't throw an error below.
|
||||||
*/
|
*/
|
||||||
MarkEstablishingSessionsTimedOut(workerPool);
|
MarkEstablishingSessionsTimedOut(workerPool);
|
||||||
|
|
||||||
|
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not establish any connections to the node "
|
||||||
|
"%s:%d after %u ms", workerPool->nodeName,
|
||||||
|
workerPool->nodePort,
|
||||||
|
NodeConnectionTimeout)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -2852,6 +2854,7 @@ MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
|
||||||
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3009,6 +3012,10 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
* the state machines might have already progressed and used
|
* the state machines might have already progressed and used
|
||||||
* new pools/sessions instead. That's why we terminate the
|
* new pools/sessions instead. That's why we terminate the
|
||||||
* connection, clear any state associated with it.
|
* connection, clear any state associated with it.
|
||||||
|
*
|
||||||
|
* Note that here we don't increment the failed connection
|
||||||
|
* stat counter because MarkEstablishingSessionsTimedOut()
|
||||||
|
* already did that.
|
||||||
*/
|
*/
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
break;
|
break;
|
||||||
|
@ -3019,7 +3026,12 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
ConnStatusType status = PQstatus(connection->pgConn);
|
ConnStatusType status = PQstatus(connection->pgConn);
|
||||||
if (status == CONNECTION_OK)
|
if (status == CONNECTION_OK)
|
||||||
{
|
{
|
||||||
HandleMultiConnectionSuccess(session);
|
/*
|
||||||
|
* Connection was already established, possibly a cached
|
||||||
|
* connection.
|
||||||
|
*/
|
||||||
|
bool newConnection = false;
|
||||||
|
HandleMultiConnectionSuccess(session, newConnection);
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
break;
|
break;
|
||||||
|
@ -3027,6 +3039,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
else if (status == CONNECTION_BAD)
|
else if (status == CONNECTION_BAD)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3042,6 +3055,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
if (pollMode == PGRES_POLLING_FAILED)
|
if (pollMode == PGRES_POLLING_FAILED)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
else if (pollMode == PGRES_POLLING_READING)
|
else if (pollMode == PGRES_POLLING_READING)
|
||||||
{
|
{
|
||||||
|
@ -3059,7 +3073,12 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
HandleMultiConnectionSuccess(session);
|
/*
|
||||||
|
* Connection was not established befoore (!= CONNECTION_OK)
|
||||||
|
* but PQconnectPoll() did so now.
|
||||||
|
*/
|
||||||
|
bool newConnection = true;
|
||||||
|
HandleMultiConnectionSuccess(session, newConnection);
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
|
|
||||||
|
@ -3137,6 +3156,11 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for failed
|
||||||
|
* connections because we don't track the connections that we could
|
||||||
|
* establish but lost later.
|
||||||
|
*/
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3299,12 +3323,12 @@ HasUnfinishedTaskForSession(WorkerSession *session)
|
||||||
* connection's state.
|
* connection's state.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
HandleMultiConnectionSuccess(WorkerSession *session)
|
HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection)
|
||||||
{
|
{
|
||||||
MultiConnection *connection = session->connection;
|
MultiConnection *connection = session->connection;
|
||||||
WorkerPool *workerPool = session->workerPool;
|
WorkerPool *workerPool = session->workerPool;
|
||||||
|
|
||||||
MarkConnectionConnected(connection);
|
MarkConnectionConnected(connection, newConnection);
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
||||||
"session %ld in %ld microseconds",
|
"session %ld in %ld microseconds",
|
||||||
|
|
|
@ -45,6 +45,7 @@
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/query_stats.h"
|
#include "distributed/query_stats.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -206,7 +207,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
|
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* INSERT..SELECT via coordinator or re-partitioning are special because
|
* INSERT..SELECT / MERGE via coordinator or re-partitioning are special because
|
||||||
* the SELECT part is planned separately.
|
* the SELECT part is planned separately.
|
||||||
*/
|
*/
|
||||||
return;
|
return;
|
||||||
|
@ -262,8 +263,19 @@ CitusExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
|
bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan);
|
||||||
|
|
||||||
AdaptiveExecutor(scanState);
|
AdaptiveExecutor(scanState);
|
||||||
|
|
||||||
|
if (isMultiTaskPlan)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
@ -178,6 +179,22 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
targetRelation,
|
targetRelation,
|
||||||
binaryFormat);
|
binaryFormat);
|
||||||
|
|
||||||
|
if (list_length(distSelectTaskList) <= 1)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Probably we will never get here for a repartitioned
|
||||||
|
* INSERT..SELECT because when the source is a single shard
|
||||||
|
* table, we should most probably choose to use
|
||||||
|
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
|
||||||
|
* here.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point select query has been executed on workers and results
|
* At this point select query has been executed on workers and results
|
||||||
* have been fetched in such a way that they are colocated with corresponding
|
* have been fetched in such a way that they are colocated with corresponding
|
||||||
|
@ -198,6 +215,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
taskList, tupleDest,
|
taskList, tupleDest,
|
||||||
hasReturning);
|
hasReturning);
|
||||||
|
|
||||||
|
if (list_length(taskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsInserted;
|
executorState->es_processed = rowsInserted;
|
||||||
|
|
||||||
if (SortReturning && hasReturning)
|
if (SortReturning && hasReturning)
|
||||||
|
@ -272,6 +298,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
SortTupleStore(scanState);
|
SortTupleStore(scanState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (list_length(prunedTaskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -313,6 +348,12 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
||||||
columnNameList);
|
columnNameList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't track query counters for the COPY commands that are executed to
|
||||||
|
* prepare intermediate results.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the intermediate table */
|
/* set up a DestReceiver that copies into the intermediate table */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
|
@ -320,7 +361,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState,
|
executorState,
|
||||||
intermediateResultIdPrefix,
|
intermediateResultIdPrefix,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
|
@ -349,13 +391,20 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
||||||
columnNameList);
|
columnNameList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We want to track query counters for the COPY commands that are executed to
|
||||||
|
* perform the final INSERT for such INSERT..SELECT queries.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = true;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the distributed table */
|
/* set up a DestReceiver that copies into the distributed table */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
columnNameList,
|
columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState, NULL,
|
executorState, NULL,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
|
|
||||||
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
|
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
|
||||||
|
@ -166,6 +167,21 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
distSourceTaskList, partitionColumnIndex,
|
distSourceTaskList, partitionColumnIndex,
|
||||||
targetRelation, binaryFormat);
|
targetRelation, binaryFormat);
|
||||||
|
|
||||||
|
if (list_length(distSourceTaskList) <= 1)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Probably we will never get here for a repartitioned MERGE
|
||||||
|
* because when the source is a single shard table, we should
|
||||||
|
* most probably choose to use ExecuteSourceAtCoordAndRedistribution(),
|
||||||
|
* but we still keep this here.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
||||||
"intermediate results")));
|
"intermediate results")));
|
||||||
|
|
||||||
|
@ -193,6 +209,16 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
tupleDest,
|
tupleDest,
|
||||||
hasReturning,
|
hasReturning,
|
||||||
paramListInfo);
|
paramListInfo);
|
||||||
|
|
||||||
|
if (list_length(taskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -287,7 +313,11 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
|
|
||||||
if (prunedTaskList == NIL)
|
if (prunedTaskList == NIL)
|
||||||
{
|
{
|
||||||
/* No task to execute */
|
/*
|
||||||
|
* No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD
|
||||||
|
* as per our convention.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -307,6 +337,16 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
tupleDest,
|
tupleDest,
|
||||||
hasReturning,
|
hasReturning,
|
||||||
paramListInfo);
|
paramListInfo);
|
||||||
|
|
||||||
|
if (list_length(prunedTaskList) == 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,6 +372,12 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
List *columnNameList =
|
List *columnNameList =
|
||||||
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
|
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't track query counters for the COPY commands that are executed to
|
||||||
|
* prepare intermediate results.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the intermediate file */
|
/* set up a DestReceiver that copies into the intermediate file */
|
||||||
const bool publishableData = false;
|
const bool publishableData = false;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
|
@ -339,7 +385,8 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState,
|
executorState,
|
||||||
intermediateResultIdPrefix,
|
intermediateResultIdPrefix,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* We can skip when writing to intermediate files */
|
/* We can skip when writing to intermediate files */
|
||||||
copyDest->skipCoercions = true;
|
copyDest->skipCoercions = true;
|
||||||
|
|
|
@ -105,6 +105,7 @@
|
||||||
#include "distributed/shardsplit_shared_memory.h"
|
#include "distributed/shardsplit_shared_memory.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/time_constants.h"
|
#include "distributed/time_constants.h"
|
||||||
|
@ -187,8 +188,10 @@ static void ResizeStackToMaximumDepth(void);
|
||||||
static void multi_log_hook(ErrorData *edata);
|
static void multi_log_hook(ErrorData *edata);
|
||||||
static bool IsSequenceOverflowError(ErrorData *edata);
|
static bool IsSequenceOverflowError(ErrorData *edata);
|
||||||
static void RegisterConnectionCleanup(void);
|
static void RegisterConnectionCleanup(void);
|
||||||
|
static void RegisterSaveBackendStatsIntoSavedBackendStatsHash(void);
|
||||||
static void RegisterExternalClientBackendCounterDecrement(void);
|
static void RegisterExternalClientBackendCounterDecrement(void);
|
||||||
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
||||||
|
static void SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg);
|
||||||
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
||||||
static void CreateRequiredDirectories(void);
|
static void CreateRequiredDirectories(void);
|
||||||
static void RegisterCitusConfigVariables(void);
|
static void RegisterCitusConfigVariables(void);
|
||||||
|
@ -504,6 +507,8 @@ _PG_init(void)
|
||||||
InitializeShardSplitSMHandleManagement();
|
InitializeShardSplitSMHandleManagement();
|
||||||
|
|
||||||
InitializeMultiTenantMonitorSMHandleManagement();
|
InitializeMultiTenantMonitorSMHandleManagement();
|
||||||
|
InitializeStatCountersShmem();
|
||||||
|
|
||||||
|
|
||||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||||
if (IsBinaryUpgrade)
|
if (IsBinaryUpgrade)
|
||||||
|
@ -615,6 +620,8 @@ citus_shmem_request(void)
|
||||||
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
|
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
|
||||||
RequestAddinShmemSpace(LogicalClockShmemSize());
|
RequestAddinShmemSpace(LogicalClockShmemSize());
|
||||||
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
|
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
|
||||||
|
RequestAddinShmemSpace(StatCountersShmemSize());
|
||||||
|
RequestNamedLWLockTranche(SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -787,6 +794,8 @@ StartupCitusBackend(void)
|
||||||
|
|
||||||
SetBackendDataDatabaseId();
|
SetBackendDataDatabaseId();
|
||||||
RegisterConnectionCleanup();
|
RegisterConnectionCleanup();
|
||||||
|
RegisterSaveBackendStatsIntoSavedBackendStatsHash();
|
||||||
|
|
||||||
FinishedStartupCitusBackend = true;
|
FinishedStartupCitusBackend = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -824,6 +833,24 @@ RegisterConnectionCleanup(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegisterSaveBackendStatsIntoSavedBackendStatsHash registers the function
|
||||||
|
* that saves the backend stats for the exited backends into the saved backend
|
||||||
|
* stats hash.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RegisterSaveBackendStatsIntoSavedBackendStatsHash(void)
|
||||||
|
{
|
||||||
|
static bool registeredSaveBackendStats = false;
|
||||||
|
if (registeredSaveBackendStats == false)
|
||||||
|
{
|
||||||
|
before_shmem_exit(SaveBackendStatsIntoSavedBackendStatsHashAtExit, 0);
|
||||||
|
|
||||||
|
registeredSaveBackendStats = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
|
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
|
||||||
* For all client backends, we register a callback that will undo
|
* For all client backends, we register a callback that will undo
|
||||||
|
@ -864,6 +891,24 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHashAtExit is called before_shmem_exit()
|
||||||
|
* of the backend for the purposes of saving the backend stats for the exited
|
||||||
|
* backends into the saved backend stats hash.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg)
|
||||||
|
{
|
||||||
|
if (code)
|
||||||
|
{
|
||||||
|
/* don't try to save the stats during a crash */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHash();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
|
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
|
||||||
* backend for the purposes decrementing
|
* backend for the purposes decrementing
|
||||||
|
@ -1451,6 +1496,20 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_stat_counters",
|
||||||
|
gettext_noop("Enables the collection of statistic counters for Citus."),
|
||||||
|
gettext_noop("When enabled, Citus maintains a set of statistic "
|
||||||
|
"counters for the Citus extension. These statistics are "
|
||||||
|
"available in the citus_stat_counters view and are "
|
||||||
|
"lost on server shutdown and can be reset by executing "
|
||||||
|
"the function citus_stat_counters_reset() on demand."),
|
||||||
|
&EnableStatCounters,
|
||||||
|
ENABLE_STAT_COUNTERS_DEFAULT,
|
||||||
|
PGC_SUSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_statistics_collection",
|
"citus.enable_statistics_collection",
|
||||||
gettext_noop("Enables sending basic usage statistics to Citus."),
|
gettext_noop("Enables sending basic usage statistics to Citus."),
|
||||||
|
|
|
@ -48,3 +48,5 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
|
||||||
#include "udfs/repl_origin_helper/13.1-1.sql"
|
#include "udfs/repl_origin_helper/13.1-1.sql"
|
||||||
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
|
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
|
||||||
#include "udfs/citus_is_primary_node/13.1-1.sql"
|
#include "udfs/citus_is_primary_node/13.1-1.sql"
|
||||||
|
#include "udfs/citus_stat_counters/13.1-1.sql"
|
||||||
|
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
|
||||||
|
|
|
@ -41,3 +41,7 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking();
|
||||||
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
|
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
|
||||||
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
|
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
|
||||||
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
||||||
|
|
||||||
|
DROP VIEW pg_catalog.citus_stat_counters;
|
||||||
|
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||||
|
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
-- See the comments for the function in
|
||||||
|
-- src/backend/distributed/stat_counters.c for more details.
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
database_id oid DEFAULT 0,
|
||||||
|
|
||||||
|
-- must always be the first column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
|
||||||
|
OUT database_id oid,
|
||||||
|
|
||||||
|
-- Following stat counter columns must be in the same order as the
|
||||||
|
-- StatType enum defined in src/include/distributed/stat_counters.h
|
||||||
|
OUT connection_establishment_succeeded bigint,
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
OUT connection_reused bigint,
|
||||||
|
OUT query_execution_single_shard bigint,
|
||||||
|
OUT query_execution_multi_shard bigint,
|
||||||
|
|
||||||
|
-- must always be the last column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
|
||||||
|
OUT stats_reset timestamp with time zone
|
||||||
|
)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
|
||||||
|
-- returns the stat counters for all the databases in local node
|
||||||
|
CREATE VIEW citus.citus_stat_counters AS
|
||||||
|
SELECT pg_database.oid,
|
||||||
|
pg_database.datname as name,
|
||||||
|
|
||||||
|
-- We always COALESCE the counters to 0 because the LEFT JOIN
|
||||||
|
-- will bring the databases that have never been connected to
|
||||||
|
-- since the last restart with NULL counters, but we want to
|
||||||
|
-- show them with 0 counters in the view.
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
|
||||||
|
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
|
||||||
|
|
||||||
|
citus_stat_counters.stats_reset
|
||||||
|
FROM pg_catalog.pg_database
|
||||||
|
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
|
||||||
|
ON (oid = database_id);
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||||
|
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
|
@ -0,0 +1,49 @@
|
||||||
|
-- See the comments for the function in
|
||||||
|
-- src/backend/distributed/stat_counters.c for more details.
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
database_id oid DEFAULT 0,
|
||||||
|
|
||||||
|
-- must always be the first column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
|
||||||
|
OUT database_id oid,
|
||||||
|
|
||||||
|
-- Following stat counter columns must be in the same order as the
|
||||||
|
-- StatType enum defined in src/include/distributed/stat_counters.h
|
||||||
|
OUT connection_establishment_succeeded bigint,
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
OUT connection_reused bigint,
|
||||||
|
OUT query_execution_single_shard bigint,
|
||||||
|
OUT query_execution_multi_shard bigint,
|
||||||
|
|
||||||
|
-- must always be the last column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
|
||||||
|
OUT stats_reset timestamp with time zone
|
||||||
|
)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
|
||||||
|
-- returns the stat counters for all the databases in local node
|
||||||
|
CREATE VIEW citus.citus_stat_counters AS
|
||||||
|
SELECT pg_database.oid,
|
||||||
|
pg_database.datname as name,
|
||||||
|
|
||||||
|
-- We always COALESCE the counters to 0 because the LEFT JOIN
|
||||||
|
-- will bring the databases that have never been connected to
|
||||||
|
-- since the last restart with NULL counters, but we want to
|
||||||
|
-- show them with 0 counters in the view.
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
|
||||||
|
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
|
||||||
|
|
||||||
|
citus_stat_counters.stats_reset
|
||||||
|
FROM pg_catalog.pg_database
|
||||||
|
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
|
||||||
|
ON (oid = database_id);
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||||
|
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
|
@ -0,0 +1,11 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID or for the current database if nothing or 0 is provided.';
|
||||||
|
|
||||||
|
-- Rather than using explicit superuser() check in the function, we use
|
||||||
|
-- the GRANT system to REVOKE access to it when creating the extension.
|
||||||
|
-- Administrators can later change who can access it, or leave them as
|
||||||
|
-- only available to superuser / database cluster owner, if they choose.
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
|
@ -0,0 +1,11 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID or for the current database if nothing or 0 is provided.';
|
||||||
|
|
||||||
|
-- Rather than using explicit superuser() check in the function, we use
|
||||||
|
-- the GRANT system to REVOKE access to it when creating the extension.
|
||||||
|
-- Administrators can later change who can access it, or leave them as
|
||||||
|
-- only available to superuser / database cluster owner, if they choose.
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
|
@ -0,0 +1,973 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* stat_counters.c
|
||||||
|
*
|
||||||
|
* This file contains functions to track various statistic counters for
|
||||||
|
* Citus.
|
||||||
|
*
|
||||||
|
* We create an array of "BackendStatsSlot"s in shared memory, one for
|
||||||
|
* each backend. Each backend increments its own stat counters in its
|
||||||
|
* own slot via IncrementStatCounterForMyDb(). And when a backend exits,
|
||||||
|
* it saves its stat counters from its slot via
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
|
||||||
|
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
|
||||||
|
* the key is the database id. In other words, each entry of the hash
|
||||||
|
* table is used to aggregate the stat counters for backends that were
|
||||||
|
* connected to that database and exited since the last server restart.
|
||||||
|
* Plus, each entry is responsible for keeping track of the reset
|
||||||
|
* timestamp for both active and exited backends too.
|
||||||
|
* Note that today we don't evict the entries of the said hash table
|
||||||
|
* that point to dropped databases because the wrapper view anyway
|
||||||
|
* filters them out (thanks to LEFT JOIN) and we don't expect a
|
||||||
|
* performance hit due to that unless users have a lot of databases
|
||||||
|
* that are dropped and recreated frequently.
|
||||||
|
*
|
||||||
|
* The reason why we save the stat counters for exited backends in the
|
||||||
|
* shared hash table is that we cannot guarantee that the backend slot
|
||||||
|
* that was used by an exited backend will be reused by another backend
|
||||||
|
* connected to the same database. For this reason, we need to save the
|
||||||
|
* stat counters for exited backends into a shared hash table so that we
|
||||||
|
* can reset the counters within the corresponding backend slots while
|
||||||
|
* the backends exit.
|
||||||
|
*
|
||||||
|
* When citus_stat_counters() is called, we first aggregate the stat
|
||||||
|
* counters from the backend slots of all the active backends and then
|
||||||
|
* we add the aggregated stat counters from the exited backends that
|
||||||
|
* are stored in the shared hash table. Also, we don't persist backend
|
||||||
|
* stats on server shutdown, but we might want to do that in the future.
|
||||||
|
*
|
||||||
|
* Similarly, when citus_stat_counters_reset() is called, we reset the
|
||||||
|
* stat counters for the active backends and the exited backends that are
|
||||||
|
* stored in the shared hash table. Then, it also updates the
|
||||||
|
* resetTimestamp in the shared hash table entry appropriately. So,
|
||||||
|
* similarly, when citus_stat_counters() is called, we just report
|
||||||
|
* resetTimestamp as stats_reset column.
|
||||||
|
*
|
||||||
|
* Caveats:
|
||||||
|
*
|
||||||
|
* There is chance that citus_stat_counters_reset() might race with a
|
||||||
|
* backend that is trying to increment one of the counters in its slot
|
||||||
|
* and as a result it can effectively fail to reset that counter due to
|
||||||
|
* the reasons documented in IncrementStatCounterForMyDb() function.
|
||||||
|
* However, this should be a very rare case and we can live with that
|
||||||
|
* for now.
|
||||||
|
*
|
||||||
|
* Also, citus_stat_counters() might observe the counters for a backend
|
||||||
|
* twice or perhaps unsee it if it's concurrently exiting, depending on
|
||||||
|
* the order we call CollectActiveBackendStatsIntoHTAB() and
|
||||||
|
* CollectSavedBackendStatsIntoHTAB() in citus_stat_counters(). However,
|
||||||
|
* the next call to citus_stat_counters() will see the correct values
|
||||||
|
* for the counters, so we can live with that for now.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "common/hashfn.h"
|
||||||
|
#include "port/atomics.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
#include "pg_version_compat.h"
|
||||||
|
|
||||||
|
#include "distributed/argutils.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/stat_counters.h"
|
||||||
|
#include "distributed/tuplestore.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* saved backend stats - hash table constants
|
||||||
|
*
|
||||||
|
* Configurations used to create the hash table for saved backend stats.
|
||||||
|
* The places where SAVED_BACKEND_STATS_HASH_MAX_DATABASES is used do not
|
||||||
|
* impose a hard limit on the number of databases that can be tracked but
|
||||||
|
* in ShmemInitHash() it's documented that the access efficiency will degrade
|
||||||
|
* if it is exceeded substantially.
|
||||||
|
*
|
||||||
|
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
|
||||||
|
* a concern.
|
||||||
|
*/
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_INIT_DATABASES 8
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_MAX_DATABASES 1024
|
||||||
|
|
||||||
|
|
||||||
|
/* fixed size array types to store the stat counters */
|
||||||
|
typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS];
|
||||||
|
typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* saved backend stats - hash entry definition
|
||||||
|
*
|
||||||
|
* This is used to define & access the shared hash table used to aggregate the stat
|
||||||
|
* counters for the backends exited so far since last server restart. It's also
|
||||||
|
* responsible for keeping track of the reset timestamp.
|
||||||
|
*/
|
||||||
|
typedef struct SavedBackendStatsHashEntry
|
||||||
|
{
|
||||||
|
/* hash entry key, must always be the first */
|
||||||
|
Oid databaseId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Needs to be locked whenever we read / write counters or resetTimestamp
|
||||||
|
* in this struct since we don't use atomic counters for this struct. Plus,
|
||||||
|
* we want to update the stat counters and resetTimestamp atomically.
|
||||||
|
*/
|
||||||
|
slock_t mutex;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* While "counters" only represents the stat counters for exited backends,
|
||||||
|
* the "resetTimestamp" doesn't only represent the reset timestamp for exited
|
||||||
|
* backends' stat counters but also for the active backends.
|
||||||
|
*/
|
||||||
|
StatCounters counters;
|
||||||
|
TimestampTz resetTimestamp;
|
||||||
|
} SavedBackendStatsHashEntry;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Hash entry definition used for the local hash table created by
|
||||||
|
* citus_stat_counters() at the runtime to aggregate the stat counters
|
||||||
|
* across all backends.
|
||||||
|
*/
|
||||||
|
typedef struct DatabaseStatsHashEntry
|
||||||
|
{
|
||||||
|
/* hash entry key, must always be the first */
|
||||||
|
Oid databaseId;
|
||||||
|
|
||||||
|
StatCounters counters;
|
||||||
|
TimestampTz resetTimestamp;
|
||||||
|
} DatabaseStatsHashEntry;
|
||||||
|
|
||||||
|
/* definition of a one per-backend stat counters slot in shared memory */
|
||||||
|
typedef struct BackendStatsSlot
|
||||||
|
{
|
||||||
|
AtomicStatCounters counters;
|
||||||
|
} BackendStatsSlot;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GUC variable
|
||||||
|
*
|
||||||
|
* This only controls whether we track the stat counters or not, via
|
||||||
|
* IncrementStatCounterForMyDb() and
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
|
||||||
|
* when the GUC is disabled, we still allocate the shared memory
|
||||||
|
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
|
||||||
|
* will still work.
|
||||||
|
*/
|
||||||
|
bool EnableStatCounters = ENABLE_STAT_COUNTERS_DEFAULT;
|
||||||
|
|
||||||
|
/* saved backend stats - shared memory variables */
|
||||||
|
static LWLockId *SharedSavedBackendStatsHashLock = NULL;
|
||||||
|
static HTAB *SharedSavedBackendStatsHash = NULL;
|
||||||
|
|
||||||
|
/* per-backend stat counter slots - shared memory array */
|
||||||
|
BackendStatsSlot *SharedBackendStatsSlotArray = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't expect the callsites that check this (via
|
||||||
|
* EnsureStatCountersShmemInitDone()) to be executed before
|
||||||
|
* StatCountersShmemInit() is done. Plus, once StatCountersShmemInit()
|
||||||
|
* is done, we also don't expect shared memory variables to be
|
||||||
|
* initialized improperly. However, we still set this to true only
|
||||||
|
* once StatCountersShmemInit() is done and if all three of the shared
|
||||||
|
* memory variables above are initialized properly. And in the callsites
|
||||||
|
* where these shared memory variables are accessed, we check this
|
||||||
|
* variable first just to be on the safe side.
|
||||||
|
*/
|
||||||
|
static bool StatCountersShmemInitDone = false;
|
||||||
|
|
||||||
|
/* saved shmem_startup_hook */
|
||||||
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
|
||||||
|
|
||||||
|
/* shared memory init & management */
|
||||||
|
static void StatCountersShmemInit(void);
|
||||||
|
static Size SharedBackendStatsSlotArrayShmemSize(void);
|
||||||
|
|
||||||
|
/* helper functions for citus_stat_counters() */
|
||||||
|
static void CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
|
||||||
|
static void CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
|
||||||
|
static DatabaseStatsHashEntry * DatabaseStatsHashEntryFindOrCreate(Oid databaseId,
|
||||||
|
HTAB *databaseStats);
|
||||||
|
static void StoreDatabaseStatsIntoTupStore(HTAB *databaseStats,
|
||||||
|
Tuplestorestate *tupleStore,
|
||||||
|
TupleDesc tupleDescriptor);
|
||||||
|
|
||||||
|
/* helper functions for citus_stat_counters_reset() */
|
||||||
|
static bool ResetActiveBackendStats(Oid databaseId);
|
||||||
|
static void ResetSavedBackendStats(Oid databaseId, bool force);
|
||||||
|
|
||||||
|
/* saved backend stats */
|
||||||
|
static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid
|
||||||
|
databaseId);
|
||||||
|
|
||||||
|
|
||||||
|
/* sql exports */
|
||||||
|
PG_FUNCTION_INFO_V1(citus_stat_counters);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureStatCountersShmemInitDone returns true if the shared memory
|
||||||
|
* data structures used for keeping track of stat counters have been
|
||||||
|
* properly initialized, otherwise, returns false and emits a warning.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
EnsureStatCountersShmemInitDone(void)
|
||||||
|
{
|
||||||
|
if (!StatCountersShmemInitDone)
|
||||||
|
{
|
||||||
|
ereport(WARNING,
|
||||||
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("shared memory for stat counters was not properly initialized")));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_stat_counters returns stats counters for the given database id.
|
||||||
|
*
|
||||||
|
* This only returns rows for the databases which have been connected to
|
||||||
|
* by at least one backend since the last server restart (even if no
|
||||||
|
* observations have been made for none of the counters or if they were
|
||||||
|
* reset) and it considers such a database even if it has been dropped later.
|
||||||
|
*
|
||||||
|
* When InvalidOid is provided, all such databases are considered; otherwise
|
||||||
|
* only the database with the given id is considered.
|
||||||
|
*
|
||||||
|
* So, as an outcome, when a database id that is different than InvalidOid
|
||||||
|
* is provided and no backend has connected to it since the last server
|
||||||
|
* restart, or, if we didn't ever have such a database, then the function
|
||||||
|
* returns an empty set.
|
||||||
|
*
|
||||||
|
* Finally, stats_reset column is set to NULL if the stat counters for the
|
||||||
|
* database were never reset since the last server restart.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_stat_counters(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function's sql definition allows Postgres to silently
|
||||||
|
* ignore NULL, but we still check.
|
||||||
|
*/
|
||||||
|
PG_ENSURE_ARGNOTNULL(0, "database_id");
|
||||||
|
Oid databaseId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
|
||||||
|
HASHCTL info;
|
||||||
|
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
|
||||||
|
memset(&info, 0, sizeof(info));
|
||||||
|
info.keysize = sizeof(Oid);
|
||||||
|
info.hash = oid_hash;
|
||||||
|
info.entrysize = sizeof(DatabaseStatsHashEntry);
|
||||||
|
|
||||||
|
HTAB *databaseStats = hash_create("Citus Database Stats Collect Hash", 8, &info,
|
||||||
|
hashFlags);
|
||||||
|
|
||||||
|
CollectActiveBackendStatsIntoHTAB(databaseId, databaseStats);
|
||||||
|
CollectSavedBackendStatsIntoHTAB(databaseId, databaseStats);
|
||||||
|
|
||||||
|
StoreDatabaseStatsIntoTupStore(databaseStats, tupleStore, tupleDescriptor);
|
||||||
|
|
||||||
|
hash_destroy(databaseStats);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_stat_counters_reset resets Citus stat counters for given database
|
||||||
|
* id.
|
||||||
|
*
|
||||||
|
* If a valid database id is provided, stat counters for that database are
|
||||||
|
* reset, even if it was dropped later.
|
||||||
|
*
|
||||||
|
* Otherwise, if the provided database id is not valid, then the function
|
||||||
|
* effectively does nothing.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_stat_counters_reset(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function's sql definition allows Postgres to silently
|
||||||
|
* ignore NULL, but we still check.
|
||||||
|
*/
|
||||||
|
PG_ENSURE_ARGNOTNULL(0, "database_id");
|
||||||
|
Oid databaseId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the database id is InvalidOid, then we assume that
|
||||||
|
* the caller wants to reset the stat counters for the
|
||||||
|
* current database.
|
||||||
|
*/
|
||||||
|
if (databaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
databaseId = MyDatabaseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool foundAnyBackendsForDb = ResetActiveBackendStats(databaseId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Even when we don't have an entry for the given database id in the
|
||||||
|
* saved backend stats hash table, we still want to create one for
|
||||||
|
* it to save the resetTimestamp if we currently have at least backend
|
||||||
|
* connected to it. By providing foundAnyBackendsForDb, we effectively
|
||||||
|
* let the function do that. Since ResetActiveBackendStats() doesn't
|
||||||
|
* filter the active backends, foundAnyBackendsForDb being true
|
||||||
|
* not always means that at least one backend is connected to it right
|
||||||
|
* now, but it means that we had such a backend at some point in time
|
||||||
|
* since the last server restart. If all backends refered to in the
|
||||||
|
* shared array are already exited, then we should already have an
|
||||||
|
* entry for it in the saved backend stats hash table, so providing
|
||||||
|
* a "true" wouldn't do anything in that case. Otherwise, if at least
|
||||||
|
* one backend is still connected to it, providing a "true" will
|
||||||
|
* effectively create a new entry for it if it doesn't exist yet,
|
||||||
|
* which is what we actually want to do.
|
||||||
|
*
|
||||||
|
* That way, we can save the resetTimestamp for the active backends
|
||||||
|
* into the relevant entry of the saved backend stats hash table.
|
||||||
|
* Note that we don't do that for the databases that don't have
|
||||||
|
* any active backends connected to them because we actually don't
|
||||||
|
* reset anything for such databases.
|
||||||
|
*/
|
||||||
|
ResetSavedBackendStats(databaseId, foundAnyBackendsForDb);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeStatCountersShmem saves the previous shmem_startup_hook and sets
|
||||||
|
* up a new shmem_startup_hook for initializing the shared memory data structures
|
||||||
|
* used for keeping track of stat counters.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeStatCountersShmem(void)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
|
shmem_startup_hook = StatCountersShmemInit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StatCountersShmemSize calculates and returns shared memory size
|
||||||
|
* required for the shared memory data structures used for keeping track of
|
||||||
|
* stat counters.
|
||||||
|
*/
|
||||||
|
Size
|
||||||
|
StatCountersShmemSize(void)
|
||||||
|
{
|
||||||
|
Size backendStatsSlotArraySize = SharedBackendStatsSlotArrayShmemSize();
|
||||||
|
Size savedBackendStatsHashLockSize = MAXALIGN(sizeof(LWLockId));
|
||||||
|
Size savedBackendStatsHashSize = hash_estimate_size(
|
||||||
|
SAVED_BACKEND_STATS_HASH_MAX_DATABASES, sizeof(SavedBackendStatsHashEntry));
|
||||||
|
|
||||||
|
return add_size(add_size(backendStatsSlotArraySize, savedBackendStatsHashLockSize),
|
||||||
|
savedBackendStatsHashSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IncrementStatCounterForMyDb increments the stat counter for the given statId
|
||||||
|
* for this backend.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
IncrementStatCounterForMyDb(int statId)
|
||||||
|
{
|
||||||
|
if (!EnableStatCounters)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int myBackendSlotIdx = getProcNo_compat(MyProc);
|
||||||
|
BackendStatsSlot *myBackendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[myBackendSlotIdx];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When there cannot be any other writers, incrementing an atomic
|
||||||
|
* counter via pg_atomic_read_u64() and pg_atomic_write_u64() is
|
||||||
|
* same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the
|
||||||
|
* former is cheaper than the latter because the latter has to do
|
||||||
|
* extra work to deal with concurrent writers.
|
||||||
|
*
|
||||||
|
* In our case, the only concurrent writer could be the backend that
|
||||||
|
* is executing citus_stat_counters_reset(). So, there is chance that
|
||||||
|
* we read the counter value, then it gets reset by a concurrent call
|
||||||
|
* made to citus_stat_counters_reset() and then we write the
|
||||||
|
* incremented value back, by effectively overriding the reset value.
|
||||||
|
* But this should be a rare case and we can live with that, for the
|
||||||
|
* sake of lock-free implementation of this function.
|
||||||
|
*/
|
||||||
|
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId];
|
||||||
|
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash saves the stat counters
|
||||||
|
* for this backend into the saved backend stats hash table.
|
||||||
|
*
|
||||||
|
* So, this is only supposed to be called when a backend exits.
|
||||||
|
*
|
||||||
|
* Also, we do our best to avoid throwing errors in this function because
|
||||||
|
* this function is called when a backend is exiting and throwing errors
|
||||||
|
* at that point will cause the backend to crash.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHash(void)
|
||||||
|
{
|
||||||
|
if (!EnableStatCounters)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid databaseId = MyDatabaseId;
|
||||||
|
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/* promote the lock to exclusive to insert the new entry for this database */
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry =
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Couldn't allocate a new hash entry because we're out of
|
||||||
|
* (shared) memory. In that case, we just log a warning and
|
||||||
|
* return, instead of throwing an error due to the reasons
|
||||||
|
* mentioned in function's comment.
|
||||||
|
*/
|
||||||
|
ereport(WARNING,
|
||||||
|
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||||
|
errmsg("failed to allocate saved backend stats hash entry")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* re-acquire the shared lock */
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
}
|
||||||
|
|
||||||
|
int myBackendSlotIdx = getProcNo_compat(MyProc);
|
||||||
|
BackendStatsSlot *myBackendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[myBackendSlotIdx];
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx] +=
|
||||||
|
pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given that this function is only called when a backend exits, later on
|
||||||
|
* another backend might be assigned to the same slot. So, we reset each
|
||||||
|
* stat counter of this slot to 0 after saving it.
|
||||||
|
*/
|
||||||
|
pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StatCountersShmemInit initializes the shared memory data structures used
|
||||||
|
* for keeping track of stat counters.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
StatCountersShmemInit(void)
|
||||||
|
{
|
||||||
|
if (prev_shmem_startup_hook != NULL)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook();
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
bool sharedBackendStatsSlotArrayAlreadyInit = false;
|
||||||
|
SharedBackendStatsSlotArray = (BackendStatsSlot *)
|
||||||
|
ShmemInitStruct("Citus Shared Backend Stats Slot Array",
|
||||||
|
SharedBackendStatsSlotArrayShmemSize(),
|
||||||
|
&sharedBackendStatsSlotArrayAlreadyInit);
|
||||||
|
|
||||||
|
bool sharedSavedBackendStatsHashLockAlreadyInit = false;
|
||||||
|
SharedSavedBackendStatsHashLock = ShmemInitStruct(
|
||||||
|
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME,
|
||||||
|
sizeof(LWLockId),
|
||||||
|
&
|
||||||
|
sharedSavedBackendStatsHashLockAlreadyInit);
|
||||||
|
|
||||||
|
HASHCTL hashInfo = {
|
||||||
|
.keysize = sizeof(Oid),
|
||||||
|
.entrysize = sizeof(SavedBackendStatsHashEntry),
|
||||||
|
.hash = oid_hash,
|
||||||
|
};
|
||||||
|
SharedSavedBackendStatsHash = ShmemInitHash("Citus Shared Saved Backend Stats Hash",
|
||||||
|
SAVED_BACKEND_STATS_HASH_INIT_DATABASES,
|
||||||
|
SAVED_BACKEND_STATS_HASH_MAX_DATABASES,
|
||||||
|
&hashInfo,
|
||||||
|
HASH_ELEM | HASH_FUNCTION);
|
||||||
|
|
||||||
|
Assert(sharedBackendStatsSlotArrayAlreadyInit ==
|
||||||
|
sharedSavedBackendStatsHashLockAlreadyInit);
|
||||||
|
if (!sharedBackendStatsSlotArrayAlreadyInit)
|
||||||
|
{
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*SharedSavedBackendStatsHashLock = &(
|
||||||
|
GetNamedLWLockTranche(
|
||||||
|
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME)
|
||||||
|
)->lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(AddinShmemInitLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* At this point, they should have been set to non-null values already,
|
||||||
|
* but we still check them just to be sure.
|
||||||
|
*/
|
||||||
|
if (SharedBackendStatsSlotArray &&
|
||||||
|
SharedSavedBackendStatsHashLock &&
|
||||||
|
SharedSavedBackendStatsHash)
|
||||||
|
{
|
||||||
|
StatCountersShmemInitDone = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SharedBackendStatsSlotArrayShmemSize returns the size of the shared
|
||||||
|
* backend stats slot array.
|
||||||
|
*/
|
||||||
|
static Size
|
||||||
|
SharedBackendStatsSlotArrayShmemSize(void)
|
||||||
|
{
|
||||||
|
return mul_size(sizeof(BackendStatsSlot), MaxBackends);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CollectActiveBackendStatsIntoHTAB aggregates the stat counters for the
|
||||||
|
* given database id from all the active backends into the databaseStats
|
||||||
|
* hash table. The function doesn't actually filter the slots of active
|
||||||
|
* backends but it's just fine to read the stat counters from all because
|
||||||
|
* exited backends anyway zero out their stat counters when they exit.
|
||||||
|
*
|
||||||
|
* If the database id is InvalidOid, then all the active backends will be
|
||||||
|
* considered regardless of the database they are connected to.
|
||||||
|
*
|
||||||
|
* Otherwise, if the database id is different than InvalidOid, then only
|
||||||
|
* the active backends whose PGPROC->databaseId is the same as the given
|
||||||
|
* database id will be considered, if any.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
|
||||||
|
|
||||||
|
if (backendProc->pid == 0)
|
||||||
|
{
|
||||||
|
/* unused slot */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid procDatabaseId = backendProc->databaseId;
|
||||||
|
if (procDatabaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Not connected to any database, something like logical replication
|
||||||
|
* launcher, autovacuum launcher or such.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseId != InvalidOid && databaseId != procDatabaseId)
|
||||||
|
{
|
||||||
|
/* not a database we are interested in */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats);
|
||||||
|
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CollectSavedBackendStatsIntoHTAB fetches the saved stat counters and
|
||||||
|
* resetTimestamp for the given database id from the saved backend stats
|
||||||
|
* hash table and saves them into the databaseStats hash table.
|
||||||
|
*
|
||||||
|
* If the database id is InvalidOid, then all the databases that present
|
||||||
|
* in the saved backend stats hash table will be considered.
|
||||||
|
*
|
||||||
|
* Otherwise, if the database id is different than InvalidOid, then only
|
||||||
|
* the entry that belongs to given database will be considered, if there
|
||||||
|
* is such an entry.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
if (databaseId != InvalidOid)
|
||||||
|
{
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(databaseId, databaseStats);
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->resetTimestamp =
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
hash_seq_init(&hashSeqStatus, SharedSavedBackendStatsHash);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry = NULL;
|
||||||
|
while ((dbSavedBackendStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(dbSavedBackendStatsEntry->databaseId,
|
||||||
|
databaseStats);
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->resetTimestamp =
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DatabaseStatsHashEntryFindOrCreate creates a new entry in databaseStats
|
||||||
|
* hash table for the given database id if it doesn't already exist and
|
||||||
|
* initializes it, or just returns the existing entry if it does.
|
||||||
|
*/
|
||||||
|
static DatabaseStatsHashEntry *
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
|
||||||
|
hash_search(databaseStats, &databaseId,
|
||||||
|
HASH_ENTER, &found);
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
dbStatsEntry->resetTimestamp = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbStatsEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StoreDatabaseStatsIntoTupStore stores the database stats from the
|
||||||
|
* databaseStats hash table into given tuple store.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore,
|
||||||
|
TupleDesc tupleDescriptor)
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
hash_seq_init(&hashSeqStatus, databaseStats);
|
||||||
|
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry = NULL;
|
||||||
|
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
/* +2 for database_id (first) and the stats_reset (last) column */
|
||||||
|
Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
||||||
|
bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
||||||
|
|
||||||
|
values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
uint64 statCounter = dbStatsEntry->counters[statIdx];
|
||||||
|
values[statIdx + 1] = UInt64GetDatum(statCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* set stats_reset column to NULL if it was never reset */
|
||||||
|
if (dbStatsEntry->resetTimestamp == 0)
|
||||||
|
{
|
||||||
|
isNulls[N_CITUS_STAT_COUNTERS + 1] = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
values[N_CITUS_STAT_COUNTERS + 1] =
|
||||||
|
TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetActiveBackendStats resets the stat counters for the given database
|
||||||
|
* id for all the active backends. The function doesn't actually filter the
|
||||||
|
* slots of active backends but it's just fine to reset the stat counters
|
||||||
|
* for all because doing so just means resetting the stat counters for
|
||||||
|
* exited backends once again, which were already reset when they exited.
|
||||||
|
*
|
||||||
|
* Only active backends whose PGPROC->databaseId is the same as the given
|
||||||
|
* database id will be considered, if any.
|
||||||
|
*
|
||||||
|
* Returns true if any active backend was found.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ResetActiveBackendStats(Oid databaseId)
|
||||||
|
{
|
||||||
|
bool foundAny = false;
|
||||||
|
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
|
||||||
|
|
||||||
|
if (backendProc->pid == 0)
|
||||||
|
{
|
||||||
|
/* unused slot */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid procDatabaseId = backendProc->databaseId;
|
||||||
|
if (procDatabaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* not connected to any database, something like logical replication
|
||||||
|
* launcher, autovacuum launcher, etc.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseId != procDatabaseId)
|
||||||
|
{
|
||||||
|
/* not a database we are interested in */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
foundAny = true;
|
||||||
|
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return foundAny;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetSavedBackendStats resets the saved stat counters for the given
|
||||||
|
* database id and sets the resetTimestamp for it to the current timestamp.
|
||||||
|
*
|
||||||
|
* If force is true, then we first make sure that we have an entry for
|
||||||
|
* the given database id in the saved backend stats hash table.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ResetSavedBackendStats(Oid databaseId, bool force)
|
||||||
|
{
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry && force)
|
||||||
|
{
|
||||||
|
/* promote the lock to exclusive to insert the new entry for this database */
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry =
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Couldn't allocate a new hash entry because we're out of
|
||||||
|
* (shared) memory.
|
||||||
|
*/
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||||
|
errmsg("failed to allocate saved backend stats hash entry")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* re-acquire the shared lock */
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Actually reset the stat counters for the exited backends and set
|
||||||
|
* the resetTimestamp to the current timestamp if we already had
|
||||||
|
* an entry for it or if we just created it.
|
||||||
|
*/
|
||||||
|
if (dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SavedBackendStatsHashEntryCreateIfNotExists creates a new entry in the
|
||||||
|
* saved backend stats hash table for the given database id if it doesn't
|
||||||
|
* already exist and initializes it.
|
||||||
|
*
|
||||||
|
* Assumes that the caller has exclusive access to the hash table since it
|
||||||
|
* performs HASH_ENTER_NULL.
|
||||||
|
*
|
||||||
|
* Returns NULL if the entry didn't exist and couldn't be allocated since
|
||||||
|
* we're out of (shared) memory.
|
||||||
|
*/
|
||||||
|
static SavedBackendStatsHashEntry *
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_ENTER_NULL,
|
||||||
|
&found);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* As we provided HASH_ENTER_NULL, returning NULL means OOM.
|
||||||
|
* In that case, we return and let the caller decide what to do.
|
||||||
|
*/
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp = 0;
|
||||||
|
|
||||||
|
SpinLockInit(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbSavedBackendStatsEntry;
|
||||||
|
}
|
|
@ -154,6 +154,11 @@ typedef struct CitusCopyDestReceiver
|
||||||
* when merging into the target tables.
|
* when merging into the target tables.
|
||||||
*/
|
*/
|
||||||
bool skipCoercions;
|
bool skipCoercions;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Determines whether the COPY command should track query stat counters.
|
||||||
|
*/
|
||||||
|
bool trackQueryCounters;
|
||||||
} CitusCopyDestReceiver;
|
} CitusCopyDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
@ -170,7 +175,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
int partitionColumnIndex,
|
int partitionColumnIndex,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
char *intermediateResultPrefix,
|
char *intermediateResultPrefix,
|
||||||
bool isPublishable);
|
bool isPublishable,
|
||||||
|
bool trackQueryCounters);
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
||||||
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);
|
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);
|
||||||
|
|
|
@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
|
||||||
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
|
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
|
||||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||||
extern void UnclaimConnection(MultiConnection *connection);
|
extern void UnclaimConnection(MultiConnection *connection);
|
||||||
extern void MarkConnectionConnected(MultiConnection *connection);
|
extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection);
|
||||||
|
|
||||||
/* waiteventset utilities */
|
/* waiteventset utilities */
|
||||||
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* stat_counters.h
|
||||||
|
*
|
||||||
|
* This file contains the exported functions to track various statistic
|
||||||
|
* counters for Citus.
|
||||||
|
*
|
||||||
|
* -------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef STAT_COUNTERS_H
|
||||||
|
#define STAT_COUNTERS_H
|
||||||
|
|
||||||
|
|
||||||
|
/* saved backend stats - constants */
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME \
|
||||||
|
"citus_stat_counters saved backend stats hash"
|
||||||
|
|
||||||
|
/* default value for the GUC variable */
|
||||||
|
#define ENABLE_STAT_COUNTERS_DEFAULT false
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Must be in the same order as the output columns defined in citus_stat_counters() UDF,
|
||||||
|
* see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql
|
||||||
|
*/
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* These are mainly tracked by connection_management.c and
|
||||||
|
* adaptive_executor.c.
|
||||||
|
*/
|
||||||
|
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
|
||||||
|
STAT_CONNECTION_ESTABLISHMENT_FAILED,
|
||||||
|
STAT_CONNECTION_REUSED,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* These are maintained by ExecCustomScan methods implemented
|
||||||
|
* for CustomScan nodes provided by Citus to account for actual
|
||||||
|
* execution of the queries and subplans. By maintaining these
|
||||||
|
* counters in ExecCustomScan callbacks, we ensure avoid
|
||||||
|
* incrementing them for plain EXPLAIN (i.e., without ANALYZE).
|
||||||
|
* queries. And, prefering the executor methods rather than the
|
||||||
|
* planner methods helps us capture the execution of prepared
|
||||||
|
* statements too.
|
||||||
|
*/
|
||||||
|
STAT_QUERY_EXECUTION_SINGLE_SHARD,
|
||||||
|
STAT_QUERY_EXECUTION_MULTI_SHARD,
|
||||||
|
|
||||||
|
/* do not use this and ensure it is the last entry */
|
||||||
|
N_CITUS_STAT_COUNTERS
|
||||||
|
} StatType;
|
||||||
|
|
||||||
|
|
||||||
|
/* GUC variable */
|
||||||
|
extern bool EnableStatCounters;
|
||||||
|
|
||||||
|
|
||||||
|
/* shared memory init */
|
||||||
|
extern void InitializeStatCountersShmem(void);
|
||||||
|
extern Size StatCountersShmemSize(void);
|
||||||
|
|
||||||
|
/* main entry point for the callers who want to increment the stat counters */
|
||||||
|
extern void IncrementStatCounterForMyDb(int statId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Exported to define a before_shmem_exit() callback that saves
|
||||||
|
* the stat counters for exited backends into the shared memory.
|
||||||
|
*/
|
||||||
|
extern void SaveBackendStatsIntoSavedBackendStatsHash(void);
|
||||||
|
|
||||||
|
#endif /* STAT_COUNTERS_H */
|
|
@ -92,6 +92,69 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT * FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
product_no | name | price
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- this time set citus.force_max_query_parallelization set to on
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT * FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
product_no | name | price
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
-- Make sure that we fall back to a working node for reads, even if it's not
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- the first choice in our task assignment policy.
|
-- the first choice in our task assignment policy.
|
||||||
SET citus.node_connection_timeout TO 900;
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
@ -168,6 +231,48 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test insert into a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test select from a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT count(*) FROM single_replicatated;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- one similar test, and this time on modification queries
|
-- one similar test, and this time on modification queries
|
||||||
-- to see that connection establishement failures could
|
-- to see that connection establishement failures could
|
||||||
-- fail the transaction (but not mark any placements as INVALID)
|
-- fail the transaction (but not mark any placements as INVALID)
|
||||||
|
|
|
@ -1480,8 +1480,11 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
|
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
|
||||||
| function citus_internal.update_relation_colocation(oid,integer) void
|
| function citus_internal.update_relation_colocation(oid,integer) void
|
||||||
| function citus_is_primary_node() boolean
|
| function citus_is_primary_node() boolean
|
||||||
|
| function citus_stat_counters(oid) SETOF record
|
||||||
|
| function citus_stat_counters_reset(oid) void
|
||||||
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
|
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
|
||||||
(27 rows)
|
| view citus_stat_counters
|
||||||
|
(30 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -48,6 +48,43 @@ show citus.node_conninfo;
|
||||||
-- Should give a connection error because of bad sslmode
|
-- Should give a connection error because of bad sslmode
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
select count(*) from test;
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
-- Test a function that tries to establish parallel node connections.
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
-- we don't care about the result, hence make it always return true
|
||||||
|
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -179,6 +179,8 @@ ORDER BY 1;
|
||||||
function citus_shards_on_worker()
|
function citus_shards_on_worker()
|
||||||
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
|
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
|
||||||
function citus_stat_activity()
|
function citus_stat_activity()
|
||||||
|
function citus_stat_counters(oid)
|
||||||
|
function citus_stat_counters_reset(oid)
|
||||||
function citus_stat_statements()
|
function citus_stat_statements()
|
||||||
function citus_stat_statements_reset()
|
function citus_stat_statements_reset()
|
||||||
function citus_stat_tenants(boolean)
|
function citus_stat_tenants(boolean)
|
||||||
|
@ -384,11 +386,12 @@ ORDER BY 1;
|
||||||
view citus_shards
|
view citus_shards
|
||||||
view citus_shards_on_worker
|
view citus_shards_on_worker
|
||||||
view citus_stat_activity
|
view citus_stat_activity
|
||||||
|
view citus_stat_counters
|
||||||
view citus_stat_statements
|
view citus_stat_statements
|
||||||
view citus_stat_tenants
|
view citus_stat_tenants
|
||||||
view citus_stat_tenants_local
|
view citus_stat_tenants_local
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(358 rows)
|
(361 rows)
|
||||||
|
|
||||||
DROP TABLE extension_basic_types;
|
DROP TABLE extension_basic_types;
|
||||||
|
|
|
@ -33,6 +33,8 @@ test: failure_savepoints
|
||||||
test: failure_multi_row_insert
|
test: failure_multi_row_insert
|
||||||
test: failure_mx_metadata_sync
|
test: failure_mx_metadata_sync
|
||||||
test: failure_mx_metadata_sync_multi_trans
|
test: failure_mx_metadata_sync_multi_trans
|
||||||
|
# Do not parallelize with others because this measures stat counters
|
||||||
|
# for failed connections for a few queries.
|
||||||
test: failure_connection_establishment
|
test: failure_connection_establishment
|
||||||
test: failure_create_database
|
test: failure_create_database
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,12 @@ test: comment_on_role
|
||||||
test: single_shard_table_udfs
|
test: single_shard_table_udfs
|
||||||
test: schema_based_sharding
|
test: schema_based_sharding
|
||||||
test: citus_schema_distribute_undistribute
|
test: citus_schema_distribute_undistribute
|
||||||
|
# Don't parallelize stat_counters with others because we don't want statistics
|
||||||
|
# to be updated by other tests concurrently except Citus Maintenance Daemon.
|
||||||
|
#
|
||||||
|
# Also, this needs to be the first test that calls citus_stat_counters()
|
||||||
|
# because it checks the value of stats_reset column before calling the function.
|
||||||
|
test: stat_counters
|
||||||
|
|
||||||
test: multi_test_catalog_views
|
test: multi_test_catalog_views
|
||||||
test: multi_table_ddl
|
test: multi_table_ddl
|
||||||
|
@ -290,6 +296,9 @@ test: multi_colocation_utils
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# node_conninfo_reload tests that node_conninfo changes take effect
|
# node_conninfo_reload tests that node_conninfo changes take effect
|
||||||
|
#
|
||||||
|
# Do not parallelize with others because this measures stat counters
|
||||||
|
# for failed connections for a few queries.
|
||||||
# ----------
|
# ----------
|
||||||
test: node_conninfo_reload
|
test: node_conninfo_reload
|
||||||
|
|
||||||
|
|
|
@ -492,6 +492,7 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
|
||||||
push(@pgOptions, "citus.enable_change_data_capture=on");
|
push(@pgOptions, "citus.enable_change_data_capture=on");
|
||||||
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
||||||
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
||||||
|
push(@pgOptions, "citus.enable_stat_counters=on");
|
||||||
push(@pgOptions, "citus.superuser = 'postgres'");
|
push(@pgOptions, "citus.superuser = 'postgres'");
|
||||||
|
|
||||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||||
|
@ -1199,4 +1200,3 @@ else {
|
||||||
die "Failed in ". ($endTime - $startTime)." seconds. \n";
|
die "Failed in ". ($endTime - $startTime)." seconds. \n";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,43 @@ ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
||||||
RESET citus.node_connection_timeout;
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT * FROM products;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- this time set citus.force_max_query_parallelization set to on
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT * FROM products;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
-- Make sure that we fall back to a working node for reads, even if it's not
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- the first choice in our task assignment policy.
|
-- the first choice in our task assignment policy.
|
||||||
SET citus.node_connection_timeout TO 900;
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
@ -87,6 +124,33 @@ RESET citus.force_max_query_parallelization;
|
||||||
RESET citus.node_connection_timeout;
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
-- test insert into a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
-- test select from a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT count(*) FROM single_replicatated;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
-- one similar test, and this time on modification queries
|
-- one similar test, and this time on modification queries
|
||||||
-- to see that connection establishement failures could
|
-- to see that connection establishement failures could
|
||||||
|
|
|
@ -22,6 +22,30 @@ show citus.node_conninfo;
|
||||||
-- Should give a connection error because of bad sslmode
|
-- Should give a connection error because of bad sslmode
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
select count(*) from test;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
-- Test a function that tries to establish parallel node connections.
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
-- we don't care about the result, hence make it always return true
|
||||||
|
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue