pull/7917/head
Onur Tirtir 2025-04-09 15:59:57 +03:00
parent a5ba21d223
commit 75fe86a0a5
21 changed files with 2794 additions and 11 deletions

View File

@ -57,6 +57,8 @@ The purpose of this document is to provide comprehensive technical documentation
- [Query from any node](#query-from-any-node)
- [Why didnt we have dedicated Query Nodes and Data Nodes?](#why-didnt-we-have-dedicated-query-nodes-and-data-nodes)
- [Shard visibility](#shard-visibility)
- [Statistic tracking](#statistic-tracking)
- [Citus stat counters](#citus-stat-counters)
# Citus Concepts
@ -2702,3 +2704,38 @@ Shards can be revealed via two settings:
- `citus.override_shard_visibility = off` disables shard hiding entirely
- `citus.show_shards_for_app_name_prefixes`= 'pgAdmin,psql'` disables shard hiding only for specific application_name values, by prefix
## Statistic tracking
Statistic views defined by Postgres already work well for one Citus node, like `pg_stat_database`, `pg_stat_activity`, `pg_stat_statements`, etc. And for some of them, we even provide wrapper views in Citus to have a global (i.e., cluster-wide) view of the statistics, like `citus_stat_activity`.
And beside these, Citus itself also provides some additional statistics views to track the Citus-specific activities. Note that the way we collect statastics for each is quite different.
- `citus_stat_tenants` (needs documentation)
- `citus_stat_statements` (needs documentation)
- `citus_stat_counters`
### Citus stat counters
(
Citus keeps track of several stat counters and exposes them via the `citus_stat_counters` view. The counters are tracked once `citus.enable_stat_counters` is set to true. Also, `citus_stat_counters_reset()` can be used to reset the counters for a single database if a database id different than 0 (default, InvalidOid) is provided, otherwise, it resets the counters for all databases.
Details about the implementation and its caveats can be found in the header comment of [stat_counters.c](/src/backend/distributed/stat_counters.c). However, at the high level;
1. We allocate a shared memory array of length `MaxBackends` so that each backend has its own counter slot to reduce the contention while incrementing the counters at the runtime.
2. We also allocate a shared hash, whose entries correspond to different databases,
Then, when a backend exits, it first aggregates its counters to the relevant entry in the shared hash, and then it resets its own counters because the same counter slot might be reused by another backend later.
3. So, when `citus_stat_counters` is queried, we first aggregate the counters from the shared memory array and then we add this with the counters aggregated so far in the relevant shared hash entry for the database.
This means that if we weren't aggregating the counters in the shared hash when exiting, counters seen in `citus_stat_counters` could drift backwards in time. Note that `citus_stat_counters` might observe the counters for a backend twice or perhaps unsee it if the backend was concurrently exiting, However, the next call to `citus_stat_counters` will see the correct values for the counters, so we can live with that for now.
4. Finally, when `citus_stat_counters_reset()` is called, we reset the shared hash entry for the relevant database and also reset the relevant slots in the shared memory array for the provided database. Note that there is chance that `citus_stat_counters_reset()` might partially fail to reset the counters for of a backend slot under some rare circumstances, but this should be very rare and we choose to ignore that for the sake of lock-free counter increments.
5. As of today, we don't persist stat counters on server shutdown. Although it seems quite straightforward to do so, we skipped doing that at v1. Once we decide to persist the counters, one can check the relevant functions that we have for `citus_stat_statements`, namely, `CitusQueryStatsShmemShutdown()` and `CitusQueryStatsShmemStartup()`. And since it has been quite a long time since we wrote these two functions, we should also make sure to check `pgstat_write_statsfile()` and `pgstat_read_statsfile()` in Postgres to double check if we're missing anything -it seems we have a few-.
6. Note that today we don't evict the entries of the said hash table that point to dropped databases because the wrapper view anyway filters them out (thanks to LEFT JOIN) and we don't expect a performance hit when reading from / writing to the hash table because of that unless users have a lot of databases that are dropped and recreated frequently. If some day we think that this is a problem, then we can let Citus maintenance daemon to do that for us periodically.
The reason why we don't just use a shared hash table for the counters is that it could be more expensive to do hash lookups for each increment. Plus, using a single counter slot for all the backends that are connected to the same database could lead to contention because that definitely requires a lock to be taken. However, incrementing a counter in today's implementation doesn't require acquiring any sort of locks.
Also, as of writing section, it seems quite likely that Postgres will expose their Cumulative Statistics infra starting with Postgres 18, see https://github.com/postgres/postgres/commit/7949d9594582ab49dee221e1db1aa5401ace49d4.
So, once this happens, we can also consider using the same infra to track Citus stat counters. However, we can only do that once we drop support for Postgres versions older than 18.
### A side note on query stat counters
Initially, we were thinking of tracking query stat counters at the very end of the planner, namely, via `FinalizePlan()` function. However, that would mean not tracking the execution of the prepared statements because a prepared statement is not planned again once its plan is cached. To give a bit more details, query plan for a prepared statement is typically cached when the same prepared statement is executed five times by Postgres (hard-coded value). Even further, a plan may even be cached after the first time it's executed if it's straightforward enough (e.g. when it doesn't have any parameters).
For this reason, we track the query stat counters at appropriate places within the CustomScan implementations provided by Citus for adaptive executor and non-pushable insert-select / merge executors.

View File

@ -39,6 +39,7 @@
#include "distributed/remote_commands.h"
#include "distributed/run_from_same_connection.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/stat_counters.h"
#include "distributed/time_constants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
@ -354,6 +355,18 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
/*
* Increment the connection stat counter for the connections that are
* reused only if the connection is in a good state. Here we don't
* bother shutting down the connection or such if it is not in a good
* state but we mostly want to avoid incrementing the connection stat
* counter for a connection that the caller cannot really use.
*/
if (PQstatus(connection->pgConn) == CONNECTION_OK)
{
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
}
return connection;
}
}
@ -395,6 +408,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_delete(&connection->connectionNode);
pfree(connection);
/*
* Here we don't increment the connection stat counter for the optional
* connections that we gave up establishing due to connection throttling
* because the callers who request optional connections know how to
* survive without them.
*/
return NULL;
}
}
@ -1026,6 +1045,11 @@ FinishConnectionListEstablishment(List *multiConnectionList)
if (event->events & WL_POSTMASTER_DEATH)
{
/*
* Here we don't increment the connection stat counter for the
* optional failed connections because this is not a connection
* failure, but a postmaster death in the local node.
*/
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
@ -1042,6 +1066,12 @@ FinishConnectionListEstablishment(List *multiConnectionList)
* reset the memory context
*/
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
/*
* Similarly, we don't increment the connection stat counter for the
* failed connections here because this is not a connection failure
* but a cancellation request is received.
*/
return;
}
@ -1072,6 +1102,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
eventMask, NULL);
if (!success)
{
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d "
"failed", connection->hostname,
@ -1088,7 +1119,15 @@ FinishConnectionListEstablishment(List *multiConnectionList)
*/
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
{
MarkConnectionConnected(connectionState->connection);
/*
* Since WaitEventSetFromMultiConnectionStates() only adds the
* connections that we haven't completed the connection
* establishment yet, here we always have a new connection.
* In other words, at this point, we surely know that we're
* not dealing with a cached connection.
*/
bool newConnection = true;
MarkConnectionConnected(connectionState->connection, newConnection);
}
}
}
@ -1172,6 +1211,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
/* close connection, otherwise we take up resource on the other side */
CitusPQFinish(connection);
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
}
}
@ -1584,7 +1625,7 @@ RemoteTransactionIdle(MultiConnection *connection)
* establishment time when necessary.
*/
void
MarkConnectionConnected(MultiConnection *connection)
MarkConnectionConnected(MultiConnection *connection, bool newConnection)
{
connection->connectionState = MULTI_CONNECTION_CONNECTED;
@ -1592,6 +1633,11 @@ MarkConnectionConnected(MultiConnection *connection)
{
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
}
if (newConnection)
{
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
}
}

View File

@ -171,6 +171,7 @@
#include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/stat_counters.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h"
@ -690,7 +691,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static void ConnectionStateMachine(WorkerSession *session);
static bool HasUnfinishedTaskForSession(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection);
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
@ -2035,6 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
else
{
connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
}
@ -3011,6 +3013,7 @@ ConnectionStateMachine(WorkerSession *session)
* connection, clear any state associated with it.
*/
connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
break;
}
@ -3019,7 +3022,12 @@ ConnectionStateMachine(WorkerSession *session)
ConnStatusType status = PQstatus(connection->pgConn);
if (status == CONNECTION_OK)
{
HandleMultiConnectionSuccess(session);
/*
* Connection was already established, possibly a cached
* connection.
*/
bool newConnection = false;
HandleMultiConnectionSuccess(session, newConnection);
UpdateConnectionWaitFlags(session,
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
break;
@ -3027,6 +3035,7 @@ ConnectionStateMachine(WorkerSession *session)
else if (status == CONNECTION_BAD)
{
connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
break;
}
@ -3042,6 +3051,7 @@ ConnectionStateMachine(WorkerSession *session)
if (pollMode == PGRES_POLLING_FAILED)
{
connection->connectionState = MULTI_CONNECTION_FAILED;
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
}
else if (pollMode == PGRES_POLLING_READING)
{
@ -3059,7 +3069,12 @@ ConnectionStateMachine(WorkerSession *session)
}
else
{
HandleMultiConnectionSuccess(session);
/*
* Connection was not established befoore (!= CONNECTION_OK)
* but PQconnectPoll() did so now.
*/
bool newConnection = true;
HandleMultiConnectionSuccess(session, newConnection);
UpdateConnectionWaitFlags(session,
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
@ -3137,6 +3152,11 @@ ConnectionStateMachine(WorkerSession *session)
break;
}
/*
* Here we don't increment the connection stat counter for failed
* connections because we don't track the connections that we could
* establish but lost later.
*/
connection->connectionState = MULTI_CONNECTION_FAILED;
break;
}
@ -3299,12 +3319,12 @@ HasUnfinishedTaskForSession(WorkerSession *session)
* connection's state.
*/
static void
HandleMultiConnectionSuccess(WorkerSession *session)
HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection)
{
MultiConnection *connection = session->connection;
WorkerPool *workerPool = session->workerPool;
MarkConnectionConnected(connection);
MarkConnectionConnected(connection, newConnection);
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
"session %ld in %ld microseconds",

View File

@ -45,6 +45,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/query_stats.h"
#include "distributed/shard_utils.h"
#include "distributed/stat_counters.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_protocol.h"
@ -206,7 +207,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
{
/*
* INSERT..SELECT via coordinator or re-partitioning are special because
* INSERT..SELECT / MERGE via coordinator or re-partitioning are special because
* the SELECT part is planned separately.
*/
return;
@ -220,6 +221,18 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusBeginModifyScan(node, estate, eflags);
}
/*
* For INSERT..SELECT / MERGE via coordinator or re-partitioning, we
* increment the stat counters in the respective ExecCustomScan functions.
*/
if (IsMultiTaskPlan(distributedPlan))
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
/*
* If there is force_delgation functions' distribution argument set,

View File

@ -50,6 +50,7 @@
#include "distributed/repartition_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/stat_counters.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
@ -115,6 +116,16 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
GetDistributedPlan((CustomScan *) selectPlan->planTree);
Job *distSelectJob = distSelectPlan->workerJob;
List *distSelectTaskList = distSelectJob->taskList;
if (list_length(distSelectTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
bool randomAccess = true;
bool interTransactions = false;
bool binaryFormat =
@ -189,6 +200,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
redistributedResults,
binaryFormat);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
@ -251,6 +271,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
}
}
if (list_length(prunedTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
if (prunedTaskList != NIL)
{
bool randomAccess = true;
@ -280,6 +309,29 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan,
executorState);
/*
* At this point, we already incremented the query counters for the SELECT
* query indirectly via ExecutePlanIntoRelation() (if needed), so now we
* need to increment the counters for the INSERT query as well.
*/
if (IsCitusTable(targetRelationId))
{
if (HasDistributionKey(targetRelationId))
{
/*
* We assume it's a multi-shard insert if the table has a
* distribution column. Although this may not be true, e.g.,
* when all the data we read from source goes to the same
* shard of the target table, we cannot know that in advance.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
}
}
scanState->finishedRemoteScan = true;

View File

@ -26,6 +26,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/repartition_executor.h"
#include "distributed/stat_counters.h"
#include "distributed/subplan_execution.h"
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
@ -125,6 +126,16 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
GetDistributedPlan((CustomScan *) sourcePlan->planTree);
Job *distSourceJob = distSourcePlan->workerJob;
List *distSourceTaskList = distSourceJob->taskList;
if (list_length(distSourceTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
bool binaryFormat =
CanUseBinaryCopyFormatForTargetList(sourceQuery->targetList);
@ -181,6 +192,15 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
redistributedResults,
binaryFormat);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ParamListInfo paramListInfo = executorState->es_param_list_info;
@ -285,6 +305,15 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
}
if (list_length(prunedTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
if (prunedTaskList == NIL)
{
/* No task to execute */
@ -346,6 +375,29 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
ExecutePlanIntoDestReceiver(sourcePlan, paramListInfo, (DestReceiver *) copyDest);
/*
* At this point, we already incremented the query counters for the SELECT
* query indirectly via ExecutePlanIntoDestReceiver() (if needed), so now we
* need to increment the counters for the MERGE query as well.
*/
if (IsCitusTable(targetRelationId))
{
if (HasDistributionKey(targetRelationId))
{
/*
* We assume it's a multi-shard insert if the table has a
* distribution column. Although this may not be true, e.g.,
* when all the data we read from source goes to the same
* shard of the target table, we cannot know that in advance.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
}
executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA;

View File

@ -105,6 +105,7 @@
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/shared_library_init.h"
#include "distributed/stat_counters.h"
#include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h"
#include "distributed/time_constants.h"
@ -187,8 +188,10 @@ static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata);
static bool IsSequenceOverflowError(ErrorData *edata);
static void RegisterConnectionCleanup(void);
static void RegisterSaveBackendStatsIntoSavedBackendStatsHash(void);
static void RegisterExternalClientBackendCounterDecrement(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static void SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg);
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
@ -504,6 +507,8 @@ _PG_init(void)
InitializeShardSplitSMHandleManagement();
InitializeMultiTenantMonitorSMHandleManagement();
InitializeStatCountersShmem();
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
@ -615,6 +620,8 @@ citus_shmem_request(void)
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
RequestAddinShmemSpace(StatCountersShmemSize());
RequestNamedLWLockTranche(SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME, 1);
}
@ -787,6 +794,8 @@ StartupCitusBackend(void)
SetBackendDataDatabaseId();
RegisterConnectionCleanup();
RegisterSaveBackendStatsIntoSavedBackendStatsHash();
FinishedStartupCitusBackend = true;
}
@ -824,6 +833,24 @@ RegisterConnectionCleanup(void)
}
/*
* RegisterSaveBackendStatsIntoSavedBackendStatsHash registers the function
* that saves the backend stats for the exited backends into the saved backend
* stats hash.
*/
static void
RegisterSaveBackendStatsIntoSavedBackendStatsHash(void)
{
static bool registeredSaveBackendStats = false;
if (registeredSaveBackendStats == false)
{
before_shmem_exit(SaveBackendStatsIntoSavedBackendStatsHashAtExit, 0);
registeredSaveBackendStats = true;
}
}
/*
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
* For all client backends, we register a callback that will undo
@ -864,6 +891,24 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
}
/*
* SaveBackendStatsIntoSavedBackendStatsHashAtExit is called before_shmem_exit()
* of the backend for the purposes of saving the backend stats for the exited
* backends into the saved backend stats hash.
*/
static void
SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg)
{
if (code)
{
/* don't try to save the stats during a crash */
return;
}
SaveBackendStatsIntoSavedBackendStatsHash();
}
/*
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
* backend for the purposes decrementing
@ -1451,6 +1496,20 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_stat_counters",
gettext_noop("Enables the collection of statistic counters for Citus."),
gettext_noop("When enabled, Citus maintains a set of statistic "
"counters for the Citus extension. These statistics are "
"available in the citus_stat_counters view and are "
"lost on server shutdown and can be reset by executing "
"the function citus_stat_counters_reset() on demand."),
&EnableStatCounters,
ENABLE_STAT_COUNTERS_DEFAULT,
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_statistics_collection",
gettext_noop("Enables sending basic usage statistics to Citus."),

View File

@ -48,3 +48,5 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/repl_origin_helper/13.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
#include "udfs/citus_is_primary_node/13.1-1.sql"
#include "udfs/citus_stat_counters/13.1-1.sql"
#include "udfs/citus_stat_counters_reset/13.1-1.sql"

View File

@ -41,3 +41,7 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
DROP VIEW pg_catalog.citus_stat_counters;
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
DROP FUNCTION pg_catalog.citus_stat_counters_reset(oid);

View File

@ -0,0 +1,49 @@
-- See the comments for the function in
-- src/backend/distributed/stat_counters.c for more details.
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
database_id oid DEFAULT 0,
-- must always be the first column or you should accordingly update
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
OUT database_id oid,
-- Following stat counter columns must be in the same order as the
-- StatType enum defined in src/include/distributed/stat_counters.h
OUT connection_establishment_succeeded bigint,
OUT connection_establishment_failed bigint,
OUT connection_reused bigint,
OUT query_execution_single_shard bigint,
OUT query_execution_multi_shard bigint,
-- must always be the last column or you should accordingly update
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
OUT stats_reset timestamp with time zone
)
RETURNS SETOF RECORD
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
-- returns the stat counters for all the databases in local node
CREATE VIEW citus.citus_stat_counters AS
SELECT pg_database.oid,
pg_database.datname as name,
-- We always COALESCE the counters to 0 because the LEFT JOIN
-- will bring the databases that have never been connected to
-- since the last restart with NULL counters, but we want to
-- show them with 0 counters in the view.
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
citus_stat_counters.stats_reset
FROM pg_catalog.pg_database
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
ON (oid = database_id);
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;

View File

@ -0,0 +1,49 @@
-- See the comments for the function in
-- src/backend/distributed/stat_counters.c for more details.
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
database_id oid DEFAULT 0,
-- must always be the first column or you should accordingly update
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
OUT database_id oid,
-- Following stat counter columns must be in the same order as the
-- StatType enum defined in src/include/distributed/stat_counters.h
OUT connection_establishment_succeeded bigint,
OUT connection_establishment_failed bigint,
OUT connection_reused bigint,
OUT query_execution_single_shard bigint,
OUT query_execution_multi_shard bigint,
-- must always be the last column or you should accordingly update
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stat_counters.c
OUT stats_reset timestamp with time zone
)
RETURNS SETOF RECORD
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
-- returns the stat counters for all the databases in local node
CREATE VIEW citus.citus_stat_counters AS
SELECT pg_database.oid,
pg_database.datname as name,
-- We always COALESCE the counters to 0 because the LEFT JOIN
-- will bring the databases that have never been connected to
-- since the last restart with NULL counters, but we want to
-- show them with 0 counters in the view.
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
citus_stat_counters.stats_reset
FROM pg_catalog.pg_database
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
ON (oid = database_id);
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid)
RETURNS VOID
LANGUAGE C STRICT PARALLEL SAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID.';
-- Rather than using explicit superuser() check in the function, we use
-- the GRANT system to REVOKE access to it when creating the extension.
-- Administrators can later change who can access it, or leave them as
-- only available to superuser / database cluster owner, if they choose.
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid)
RETURNS VOID
LANGUAGE C STRICT PARALLEL SAFE
AS 'MODULE_PATHNAME', $$citus_stat_counters_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID.';
-- Rather than using explicit superuser() check in the function, we use
-- the GRANT system to REVOKE access to it when creating the extension.
-- Administrators can later change who can access it, or leave them as
-- only available to superuser / database cluster owner, if they choose.
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;

View File

@ -0,0 +1,948 @@
/*-------------------------------------------------------------------------
*
* stat_counters.c
*
* This file contains functions to track various statistic counters for
* Citus.
*
* We create an array of "BackendStatsSlot"s in shared memory, one for
* each backend. Each backend increments its own stat counters in its
* own slot via IncrementStatCounterForMyDb(). And when a backend exits,
* it saves its stat counters from its slot via
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
* the key is the database id. In other words, each entry of the hash
* table is used to aggregate the stat counters for backends that were
* connected to that database and exited since the last server restart.
* Plus, each entry is responsible for keeping track of the reset
* timestamp for both active and exited backends too.
* Note that today we don't evict the entries of the said hash table
* that point to dropped databases because the wrapper view anyway
* filters them out (thanks to LEFT JOIN) and we don't expect a
* performance hit due to that unless users have a lot of databases
* that are dropped and recreated frequently.
*
* The reason why we save the stat counters for exited backends in the
* shared hash table is that we cannot guarantee that the backend slot
* that was used by an exited backend will be reused by another backend
* connected to the same database. For this reason, we need to save the
* stat counters for exited backends into a shared hash table so that we
* can reset the counters within the corresponding backend slots while
* the backends exit.
*
* When citus_stat_counters() is called, we first aggregate the stat
* counters from the backend slots of all the active backends and then
* we add the aggregated stat counters from the exited backends that
* are stored in the shared hash table. Also, we don't persist backend
* stats on server shutdown, but we might want to do that in the future.
*
* Similarly, when citus_stat_counters_reset() is called, we reset the
* stat counters for the active backends and the exited backends that are
* stored in the shared hash table. Then, it also updates the
* resetTimestamp in the shared hash table entry appropriately. So,
* similarly, when citus_stat_counters() is called, we just report
* resetTimestamp as stats_reset column.
*
* Caveats:
*
* There is chance that citus_stat_counters_reset() might race with a
* backend that is trying to increment one of the counters in its slot
* and as a result it can effectively fail to reset that counter due to
* the reasons documented in IncrementStatCounterForMyDb() function.
* However, this should be a very rare case and we can live with that
* for now.
*
* Also, citus_stat_counters() might observe the counters for a backend
* twice or perhaps unsee it if it's concurrently exiting, depending on
* the order we call CollectActiveBackendStatsIntoHTAB() and
* CollectSavedBackendStatsIntoHTAB() in citus_stat_counters(). However,
* the next call to citus_stat_counters() will see the correct values
* for the counters, so we can live with that for now.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "common/hashfn.h"
#include "port/atomics.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "utils/hsearch.h"
#include "pg_version_compat.h"
#include "distributed/argutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/stat_counters.h"
#include "distributed/tuplestore.h"
/*
* saved backend stats - hash table constants
*
* Configurations used to create the hash table for saved backend stats.
* The places where SAVED_BACKEND_STATS_HASH_MAX_DATABASES is used do not
* impose a hard limit on the number of databases that can be tracked but
* in ShmemInitHash() it's documented that the access efficiency will degrade
* if it is exceeded substantially.
*
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
* a concern.
*/
#define SAVED_BACKEND_STATS_HASH_INIT_DATABASES 8
#define SAVED_BACKEND_STATS_HASH_MAX_DATABASES 1024
/* fixed size array types to store the stat counters */
typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS];
typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS];
/*
* saved backend stats - hash entry definition
*
* This is used to define & access the shared hash table used to aggregate the stat
* counters for the backends exited so far since last server restart. It's also
* responsible for keeping track of the reset timestamp.
*/
typedef struct SavedBackendStatsHashEntry
{
/* hash entry key, must always be the first */
Oid databaseId;
/*
* Needs to be locked whenever we read / write counters or resetTimestamp
* in this struct since we don't use atomic counters for this struct. Plus,
* we want to update the stat counters and resetTimestamp atomically.
*/
slock_t mutex;
/*
* While "counters" only represents the stat counters for exited backends,
* the "resetTimestamp" doesn't only represent the reset timestamp for exited
* backends' stat counters but also for the active backends.
*/
StatCounters counters;
TimestampTz resetTimestamp;
} SavedBackendStatsHashEntry;
/*
* Hash entry definition used for the local hash table created by
* citus_stat_counters() at the runtime to aggregate the stat counters
* across all backends.
*/
typedef struct DatabaseStatsHashEntry
{
/* hash entry key, must always be the first */
Oid databaseId;
StatCounters counters;
TimestampTz resetTimestamp;
} DatabaseStatsHashEntry;
/* definition of a one per-backend stat counters slot in shared memory */
typedef struct BackendStatsSlot
{
AtomicStatCounters counters;
} BackendStatsSlot;
/*
* GUC variable
*
* This only controls whether we track the stat counters or not, via
* IncrementStatCounterForMyDb() and
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
* when the GUC is disabled, we still allocate the shared memory
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
* will still work.
*/
bool EnableStatCounters = ENABLE_STAT_COUNTERS_DEFAULT;
/* saved backend stats - shared memory variables */
static LWLockId *SharedSavedBackendStatsHashLock = NULL;
static HTAB *SharedSavedBackendStatsHash = NULL;
/* per-backend stat counter slots - shared memory array */
BackendStatsSlot *SharedBackendStatsSlotArray = NULL;
/*
* We don't expect the callsites that check this (via
* EnsureStatCountersShmemInitDone()) to be executed before
* StatCountersShmemInit() is done. Plus, once StatCountersShmemInit()
* is done, we also don't expect shared memory variables to be
* initialized improperly. However, we still set this to true only
* once StatCountersShmemInit() is done and if all three of the shared
* memory variables above are initialized properly. And in the callsites
* where these shared memory variables are accessed, we check this
* variable first just to be on the safe side.
*/
static bool StatCountersShmemInitDone = false;
/* saved shmem_startup_hook */
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* shared memory init & management */
static void StatCountersShmemInit(void);
static Size SharedBackendStatsSlotArrayShmemSize(void);
/* helper functions for citus_stat_counters() */
static void CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
static void CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
static void StoreDatabaseStatsIntoTupStore(HTAB *databaseStats,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
/* helper functions for citus_stat_counters_reset() */
static bool ResetActiveBackendStats(Oid databaseId);
static void ResetSavedBackendStats(Oid databaseId, bool force);
/* saved backend stats */
static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryAllocIfNotExists(Oid
databaseId);
/* sql exports */
PG_FUNCTION_INFO_V1(citus_stat_counters);
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
/*
* EnsureStatCountersShmemInitDone returns true if the shared memory
* data structures used for keeping track of stat counters have been
* properly initialized, otherwise, returns false and emits a warning.
*/
static inline bool
EnsureStatCountersShmemInitDone(void)
{
if (!StatCountersShmemInitDone)
{
ereport(WARNING,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("shared memory for stat counters was not properly initialized")));
return false;
}
return true;
}
/*
* citus_stat_counters returns stats counters for the given database id.
*
* This only returns rows for the databases which have been connected to
* by at least one backend since the last server restart (even if no
* observations have been made for none of the counters or if they were
* reset) and it considers such a database even if it has been dropped later.
*
* When InvalidOid is provided, all such databases are considered; otherwise
* only the database with the given id is considered.
*
* So, as an outcome, when a database id that is different than InvalidOid
* is provided and no backend has connected to it since the last server
* restart, or, if we didn't ever have such a database, then the function
* returns an empty set.
*
* Finally, stats_reset column is set to NULL if the stat counters for the
* database were never reset since the last server restart.
*/
Datum
citus_stat_counters(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/*
* Function's sql definition allows Postgres to silently
* ignore NULL, but we still check.
*/
PG_ENSURE_ARGNOTNULL(0, "database_id");
Oid databaseId = PG_GETARG_OID(0);
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
PG_RETURN_VOID();
}
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.hash = oid_hash;
info.entrysize = sizeof(DatabaseStatsHashEntry);
HTAB *databaseStats = hash_create("Citus Database Stats Collect Hash", 8, &info,
hashFlags);
CollectActiveBackendStatsIntoHTAB(databaseId, databaseStats);
CollectSavedBackendStatsIntoHTAB(databaseId, databaseStats);
StoreDatabaseStatsIntoTupStore(databaseStats, tupleStore, tupleDescriptor);
hash_destroy(databaseStats);
PG_RETURN_VOID();
}
/*
* citus_stat_counters_reset resets Citus stat counters for given database
* id.
*
* If a valid database id is provided, stat counters for that database are
* reset, even if it was dropped later.
*
* Otherwise, if the provided database id is not valid, then the function
* effectively does nothing.
*/
Datum
citus_stat_counters_reset(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/*
* Function's sql definition allows Postgres to silently
* ignore NULL, but we still check.
*/
PG_ENSURE_ARGNOTNULL(0, "database_id");
Oid databaseId = PG_GETARG_OID(0);
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
PG_RETURN_VOID();
}
bool foundAnyBackendsForDb = ResetActiveBackendStats(databaseId);
/*
* Even when we don't have an entry for the given database id in the
* saved backend stats hash table, we still want to create one for
* it to save the resetTimestamp if we currently have at least backend
* connected to it. By providing foundAnyBackendsForDb, we effectively
* let the function do that. Since ResetActiveBackendStats() doesn't
* filter the active backends, foundAnyBackendsForDb being true
* not always means that at least one backend is connected to it right
* now, but it means that we had such a backend at some point in time
* since the last server restart. If all backends refered to in the
* shared array are already exited, then we should already have an
* entry for it in the saved backend stats hash table, so providing
* a "true" wouldn't do anything in that case. Otherwise, if at least
* one backend is still connected to it, providing a "true" will
* effectively create a new entry for it if it doesn't exist yet,
* which is what we actually want to do.
*
* That way, we can save the resetTimestamp for the active backends
* into the relevant entry of the saved backend stats hash table.
* Note that we don't do that for the databases that don't have
* any active backends connected to them because we actually don't
* reset anything for such databases.
*/
ResetSavedBackendStats(databaseId, foundAnyBackendsForDb);
PG_RETURN_VOID();
}
/*
* InitializeStatCountersShmem saves the previous shmem_startup_hook and sets
* up a new shmem_startup_hook for initializing the shared memory data structures
* used for keeping track of stat counters.
*/
void
InitializeStatCountersShmem(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = StatCountersShmemInit;
}
/*
* StatCountersShmemSize calculates and returns shared memory size
* required for the shared memory data structures used for keeping track of
* stat counters.
*/
Size
StatCountersShmemSize(void)
{
Size backendStatsSlotArraySize = SharedBackendStatsSlotArrayShmemSize();
Size savedBackendStatsHashLockSize = MAXALIGN(sizeof(LWLockId));
Size savedBackendStatsHashSize = hash_estimate_size(
SAVED_BACKEND_STATS_HASH_MAX_DATABASES, sizeof(SavedBackendStatsHashEntry));
return add_size(add_size(backendStatsSlotArraySize, savedBackendStatsHashLockSize),
savedBackendStatsHashSize);
}
/*
* IncrementStatCounterForMyDb increments the stat counter for the given statId
* for this backend.
*/
void
IncrementStatCounterForMyDb(int statId)
{
if (!EnableStatCounters)
{
return;
}
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
return;
}
int myBackendSlotIdx = getProcNo_compat(MyProc);
BackendStatsSlot *myBackendStatsSlot =
&SharedBackendStatsSlotArray[myBackendSlotIdx];
/*
* When there cannot be any other writers, incrementing an atomic
* counter via pg_atomic_read_u64() and pg_atomic_write_u64() is
* same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the
* former is cheaper than the latter because the latter has to do
* extra work to deal with concurrent writers.
*
* In our case, the only concurrent writer could be the backend that
* is executing citus_stat_counters_reset(). So, there is chance that
* we read the counter value, then it gets reset by a concurrent call
* made to citus_stat_counters_reset() and then we write the
* incremented value back, by effectively overriding the reset value.
* But this should be a rare case and we can live with that, for the
* sake of lock-free implementation of this function.
*/
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId];
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
}
/*
* SaveBackendStatsIntoSavedBackendStatsHash saves the stat counters
* for this backend into the saved backend stats hash table.
*
* So, this is only supposed to be called when a backend exits.
*
* Also, we do our best to avoid throwing errors in this function because
* this function is called when a backend is exiting and throwing errors
* at that point will cause the backend to crash.
*/
void
SaveBackendStatsIntoSavedBackendStatsHash(void)
{
if (!EnableStatCounters)
{
return;
}
/* just to be on the safe side */
if (!EnsureStatCountersShmemInitDone())
{
return;
}
Oid databaseId = MyDatabaseId;
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (!dbSavedBackendStatsEntry)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(*SharedSavedBackendStatsHashLock);
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
dbSavedBackendStatsEntry =
SavedBackendStatsHashEntryAllocIfNotExists(databaseId);
LWLockRelease(*SharedSavedBackendStatsHashLock);
if (!dbSavedBackendStatsEntry)
{
/*
* Couldn't allocate a new hash entry because we're out of
* (shared) memory. In that case, we just log a warning and
* return, instead of throwing an error due to the reasons
* mentioned in function's comment.
*/
ereport(WARNING,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to allocate saved backend stats hash entry")));
return;
}
/* re-acquire the shared lock */
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
}
int myBackendSlotIdx = getProcNo_compat(MyProc);
BackendStatsSlot *myBackendStatsSlot =
&SharedBackendStatsSlotArray[myBackendSlotIdx];
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbSavedBackendStatsEntry->counters[statIdx] +=
pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]);
/*
* Given that this function is only called when a backend exits, later on
* another backend might be assigned to the same slot. So, we reset each
* stat counter of this slot to 0 after saving it.
*/
pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0);
}
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* StatCountersShmemInit initializes the shared memory data structures used
* for keeping track of stat counters.
*/
static void
StatCountersShmemInit(void)
{
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
bool sharedBackendStatsSlotArrayAlreadyInit = false;
SharedBackendStatsSlotArray = (BackendStatsSlot *)
ShmemInitStruct("Citus Shared Backend Stats Slot Array",
SharedBackendStatsSlotArrayShmemSize(),
&sharedBackendStatsSlotArrayAlreadyInit);
bool sharedSavedBackendStatsHashLockAlreadyInit = false;
SharedSavedBackendStatsHashLock = ShmemInitStruct(
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME,
sizeof(LWLockId),
&
sharedSavedBackendStatsHashLockAlreadyInit);
HASHCTL hashInfo = {
.keysize = sizeof(Oid),
.entrysize = sizeof(SavedBackendStatsHashEntry),
.hash = oid_hash,
};
SharedSavedBackendStatsHash = ShmemInitHash("Citus Shared Saved Backend Stats Hash",
SAVED_BACKEND_STATS_HASH_INIT_DATABASES,
SAVED_BACKEND_STATS_HASH_MAX_DATABASES,
&hashInfo,
HASH_ELEM | HASH_FUNCTION);
Assert(sharedBackendStatsSlotArrayAlreadyInit ==
sharedSavedBackendStatsHashLockAlreadyInit);
if (!sharedBackendStatsSlotArrayAlreadyInit)
{
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0);
}
}
*SharedSavedBackendStatsHashLock = &(
GetNamedLWLockTranche(
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME)
)->lock;
}
LWLockRelease(AddinShmemInitLock);
/*
* At this point, they should have been set to non-null values already,
* but we still check them just to be sure.
*/
if (SharedBackendStatsSlotArray &&
SharedSavedBackendStatsHashLock &&
SharedSavedBackendStatsHash)
{
StatCountersShmemInitDone = true;
}
}
/*
* SharedBackendStatsSlotArrayShmemSize returns the size of the shared
* backend stats slot array.
*/
static Size
SharedBackendStatsSlotArrayShmemSize(void)
{
return mul_size(sizeof(BackendStatsSlot), MaxBackends);
}
/*
* CollectActiveBackendStatsIntoHTAB aggregates the stat counters for the
* given database id from all the active backends into the databaseStats
* hash table. The function doesn't actually filter the slots of active
* backends but it's just fine to read the stat counters from all because
* exited backends anyway zero out their stat counters when they exit.
*
* If the database id is InvalidOid, then all the active backends will be
* considered regardless of the database they are connected to.
*
* Otherwise, if the database id is different than InvalidOid, then only
* the active backends whose PGPROC->databaseId is the same as the given
* database id will be considered, if any.
*/
static void
CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
{
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
if (backendProc->pid == 0)
{
/* unused slot */
continue;
}
Oid procDatabaseId = backendProc->databaseId;
if (procDatabaseId == InvalidOid)
{
/*
* Not connected to any database, something like logical replication
* launcher, autovacuum launcher or such.
*/
continue;
}
if (databaseId != InvalidOid && databaseId != procDatabaseId)
{
/* not a database we are interested in */
continue;
}
bool found = false;
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
hash_search(databaseStats, &procDatabaseId,
HASH_ENTER, &found);
if (!found)
{
MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters));
dbStatsEntry->resetTimestamp = 0;
}
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]);
}
}
}
/*
* CollectSavedBackendStatsIntoHTAB fetches the saved stat counters and
* resetTimestamp for the given database id from the saved backend stats
* hash table and saves them into the databaseStats hash table.
*
* If the database id is InvalidOid, then all the databases that present
* in the saved backend stats hash table will be considered.
*
* Otherwise, if the database id is different than InvalidOid, then only
* the entry that belongs to given database will be considered, if there
* is such an entry.
*/
static void
CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
{
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
if (databaseId != InvalidOid)
{
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (dbSavedBackendStatsEntry)
{
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
hash_search(databaseStats, &databaseId,
HASH_ENTER, NULL);
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
dbSavedBackendStatsEntry->counters[statIdx];
}
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
}
else
{
HASH_SEQ_STATUS hashSeqStatus;
hash_seq_init(&hashSeqStatus, SharedSavedBackendStatsHash);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry = NULL;
while ((dbSavedBackendStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
{
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
hash_search(databaseStats,
&dbSavedBackendStatsEntry
->databaseId, HASH_ENTER,
NULL);
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
dbStatsEntry->counters[statIdx] +=
dbSavedBackendStatsEntry->counters[statIdx];
}
dbStatsEntry->resetTimestamp =
dbSavedBackendStatsEntry->resetTimestamp;
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
}
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* StoreDatabaseStatsIntoTupStore stores the database stats from the
* databaseStats hash table into given tuple store.
*/
static void
StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor)
{
HASH_SEQ_STATUS hashSeqStatus;
hash_seq_init(&hashSeqStatus, databaseStats);
DatabaseStatsHashEntry *dbStatsEntry = NULL;
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
{
/* +2 for database_id (first) and the stats_reset (last) column */
Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 };
bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 };
values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId);
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
uint64 statCounter = dbStatsEntry->counters[statIdx];
values[statIdx + 1] = UInt64GetDatum(statCounter);
}
/* set stats_reset column to NULL if it was never reset */
if (dbStatsEntry->resetTimestamp == 0)
{
isNulls[N_CITUS_STAT_COUNTERS + 1] = true;
}
else
{
values[N_CITUS_STAT_COUNTERS + 1] =
TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
}
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
}
/*
* ResetActiveBackendStats resets the stat counters for the given database
* id for all the active backends. The function doesn't actually filter the
* slots of active backends but it's just fine to reset the stat counters
* for all because doing so just means resetting the stat counters for
* exited backends once again, which were already reset when they exited.
*
* Only active backends whose PGPROC->databaseId is the same as the given
* database id will be considered, if any.
*
* Returns true if any active backend was found.
*/
static bool
ResetActiveBackendStats(Oid databaseId)
{
bool foundAny = false;
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
{
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
if (backendProc->pid == 0)
{
/* unused slot */
continue;
}
Oid procDatabaseId = backendProc->databaseId;
if (procDatabaseId == InvalidOid)
{
/*
* not connected to any database, something like logical replication
* launcher, autovacuum launcher, etc.
*/
continue;
}
if (databaseId != procDatabaseId)
{
/* not a database we are interested in */
continue;
}
foundAny = true;
BackendStatsSlot *backendStatsSlot =
&SharedBackendStatsSlotArray[backendSlotIdx];
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
{
pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0);
}
}
return foundAny;
}
/*
* ResetSavedBackendStats resets the saved stat counters for the given
* database id and sets the resetTimestamp for it to the current timestamp.
*
* If force is true, then we first make sure that we have an entry for
* the given database id in the saved backend stats hash table.
*/
static void
ResetSavedBackendStats(Oid databaseId, bool force)
{
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(
SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_FIND,
NULL);
if (!dbSavedBackendStatsEntry && force)
{
/* promote the lock to exclusive to insert the new entry for this database */
LWLockRelease(*SharedSavedBackendStatsHashLock);
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
dbSavedBackendStatsEntry =
SavedBackendStatsHashEntryAllocIfNotExists(databaseId);
LWLockRelease(*SharedSavedBackendStatsHashLock);
if (!dbSavedBackendStatsEntry)
{
/*
* Couldn't allocate a new hash entry because we're out of
* (shared) memory.
*/
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("failed to allocate saved backend stats hash entry")));
return;
}
/* re-acquire the shared lock */
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
}
/*
* Actually reset the stat counters for the exited backends and set
* the resetTimestamp to the current timestamp if we already had
* an entry for it or if we just created it.
*/
if (dbSavedBackendStatsEntry)
{
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
}
LWLockRelease(*SharedSavedBackendStatsHashLock);
}
/*
* SavedBackendStatsHashEntryAllocIfNotExists allocates a new entry in the
* saved backend stats hash table for the given database id if it doesn't
* already exist.
*
* Assumes that the caller has exclusive access to the hash table since it
* performs HASH_ENTER_NULL.
*
* Returns NULL if the entry didn't exist and couldn't be allocated since
* we're out of (shared) memory.
*/
static SavedBackendStatsHashEntry *
SavedBackendStatsHashEntryAllocIfNotExists(Oid databaseId)
{
bool found = false;
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
(SavedBackendStatsHashEntry *) hash_search(SharedSavedBackendStatsHash,
(void *) &databaseId,
HASH_ENTER_NULL,
&found);
if (!dbSavedBackendStatsEntry)
{
/*
* As we provided HASH_ENTER_NULL, returning NULL means OOM.
* In that case, we return and let the caller decide what to do.
*/
return NULL;
}
if (!found)
{
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
dbSavedBackendStatsEntry->resetTimestamp = 0;
SpinLockInit(&dbSavedBackendStatsEntry->mutex);
}
return dbSavedBackendStatsEntry;
}

View File

@ -334,7 +334,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection);
extern void MarkConnectionConnected(MultiConnection *connection);
extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection);
/* waiteventset utilities */
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,

View File

@ -0,0 +1,58 @@
/*-------------------------------------------------------------------------
*
* stat_counters.h
*
* This file contains the exported functions to track various statistic
* counters for Citus.
*
* -------------------------------------------------------------------------
*/
#ifndef STAT_COUNTERS_H
#define STAT_COUNTERS_H
/* saved backend stats - constants */
#define SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME \
"citus_stat_counters saved backend stats hash"
/* default value for the GUC variable */
#define ENABLE_STAT_COUNTERS_DEFAULT false
/*
* Must be in the same order as the output columns defined in citus_stat_counters() UDF,
* see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql
*/
typedef enum
{
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
STAT_CONNECTION_ESTABLISHMENT_FAILED,
STAT_CONNECTION_REUSED,
STAT_QUERY_EXECUTION_SINGLE_SHARD,
STAT_QUERY_EXECUTION_MULTI_SHARD,
/* do not use this and ensure it is the last entry */
N_CITUS_STAT_COUNTERS
} StatType;
/* GUC variable */
extern bool EnableStatCounters;
/* shared memory init */
extern void InitializeStatCountersShmem(void);
extern Size StatCountersShmemSize(void);
/* main entry point for the callers who want to increment the stat counters */
extern void IncrementStatCounterForMyDb(int statId);
/*
* Exported to define a before_shmem_exit() callback that saves
* the stat counters for exited backends into the shared memory.
*/
extern void SaveBackendStatsIntoSavedBackendStatsHash(void);
#endif /* STAT_COUNTERS_H */

View File

@ -1480,8 +1480,11 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
| function citus_internal.update_relation_colocation(oid,integer) void
| function citus_is_primary_node() boolean
| function citus_stat_counters(oid) SETOF record
| function citus_stat_counters_reset(oid) void
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void
(27 rows)
| view citus_stat_counters
(30 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -0,0 +1,793 @@
-- Setup another Citus cluster before setting up the tests for "regression" cluster
\c postgres - - :worker_1_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
\c postgres - - :worker_2_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
\c postgres - - :master_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- back to the "regression" database on coordinator that we usually use during tests
\c regression - - -
CREATE SCHEMA stat_counters;
SET search_path TO stat_counters;
SET citus.next_shard_id to 1970000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO NOTICE;
-- make sure it's disabled first
SET citus.enable_stat_counters TO false;
-- verify that the UDFs don't do anything when NULL input is provided
SELECT citus_stat_counters(null);
citus_stat_counters
---------------------------------------------------------------------
(0 rows)
SELECT citus_stat_counters_reset(null);
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
?column?
---------------------------------------------------------------------
t
(1 row)
-- Verify that providing an oid that doesn't correspond to any database
-- returns an empty set. We know that "SELECT MAX(oid)+1 FROM pg_database"
-- is definitely not a valid database oid.
SELECT COUNT(*) = 0 FROM (SELECT citus_stat_counters((MAX(oid)::integer+1)::oid) FROM pg_database) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- This is the first test in multi_1_schedule that calls citus_stat_counters_reset(), so one
-- can could have reset the stats before us. So, here we can test that stats_reset column is
-- NULL for that databases that citus_stat_counters_reset() was certainly not called for.
SELECT stats_reset IS NULL FROM citus_stat_counters WHERE name IN ('template0', 'template1');
?column?
---------------------------------------------------------------------
t
t
(2 rows)
-- Even more, calling citus_stat_counters_reset() for a database that no one has connected
-- so far is simply a no-op.
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'template0';
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT stats_reset IS NULL FROM citus_stat_counters WHERE name = 'template0';
?column?
---------------------------------------------------------------------
t
(1 row)
-- but this is not true otherwise
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT stats_reset IS NOT NULL FROM citus_stat_counters WHERE name = current_database();
?column?
---------------------------------------------------------------------
t
(1 row)
-- multi_1_schedule has this test in an individual line, so there cannot be any other backends
-- -except Citus maintenance daemon- that can update the stat counters other than us. We also
-- know that Citus maintenance daemon cannot update query related stats.
--
-- So, no one could have incremented query related stats so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
?column? | ?column?
---------------------------------------------------------------------
t | t
t | t
t | t
t | t
t | t
(5 rows)
-- Even further, for the databases that don't have Citus extension installed,
-- we should get 0 for other stats too.
--
-- For the databases that have Citus extension installed, we might or might not
-- get 0 for connection related stats, depending on whether the Citus maintenance
-- daemon has done any work so far, so we don't check them.
SELECT connection_establishment_succeeded = 0,
connection_establishment_failed = 0,
connection_reused = 0
FROM (
SELECT * FROM citus_stat_counters WHERE name NOT IN ('regression', 'stat_counters_test_db')
) q;
?column? | ?column? | ?column?
---------------------------------------------------------------------
t | t | t
t | t | t
t | t | t
(3 rows)
CREATE TABLE dist_table (a int, b int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- no single shard queries yet, so it's set to 0
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- normally this should increment query_execution_single_shard counter, but the GUC is disabled
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.enable_stat_counters TO true;
-- increment query_execution_single_shard counter
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT query_execution_single_shard = 1 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- increment query_execution_single_shard counter
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT query_execution_single_shard = 1 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify that we can reset the stats for a specific database
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- increment counters a bit
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
-- Close the current connection and open a new one to make sure that
-- backends save their stats before exiting.
\c - - - -
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
-- these will be ineffecitve because the GUC is disabled
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
-- Verify that we can observe the counters incremented before the GUC was
-- disabled, even when the GUC is disabled.
SELECT query_execution_single_shard = 3, query_execution_multi_shard = 2
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column? | ?column?
---------------------------------------------------------------------
t | t
(1 row)
SET citus.enable_stat_counters TO true;
-- increment the counters a bit more
SELECT * FROM stat_counters.dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
RESET citus.force_max_query_parallelization;
-- (*1) For the last two queries, we forced opening as many connections as
-- possible. So, we should expect connection_establishment_succeeded to be
-- incremented by some value closer to 32 shards * 2 queries = 64. However,
-- it might not be that high if the shard queries complete very quickly. So,
-- heuristically, we check that it's at least 50 to avoid making the test
-- flaky.
SELECT query_execution_single_shard = 4, query_execution_multi_shard = 5, connection_establishment_succeeded >= 50
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column? | ?column? | ?column?
---------------------------------------------------------------------
t | t | t
(1 row)
-- We can even see the counter values for "regression" database from
-- other databases that has Citus installed.
\c stat_counters_test_db - - -
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
SELECT query_execution_single_shard = 4, query_execution_multi_shard = 5
FROM (SELECT (pg_catalog.citus_stat_counters(oid)).* FROM pg_database WHERE datname = 'regression') q;
?column? | ?column?
---------------------------------------------------------------------
t | t
(1 row)
-- enable it before exiting to make sure we save (all-zero) stats into the shared hash when exiting
SET citus.enable_stat_counters TO true;
-- repeat some of the tests from a worker node
\c regression - - :worker_1_port
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
SET client_min_messages TO NOTICE;
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
-- no one could have incremented query related stats so far
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
?column? | ?column?
---------------------------------------------------------------------
t | t
t | t
t | t
t | t
t | t
(5 rows)
SET citus.enable_stat_counters TO true;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table WHERE a = 1;
a | b
---------------------------------------------------------------------
(0 rows)
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM stat_counters.dist_table;
a | b
---------------------------------------------------------------------
(0 rows)
RESET citus.force_max_query_parallelization;
-- As in (*1), we don't directly compare connection_establishment_succeeded
-- with 3 * 32 = 96 but with something smaller.
SELECT query_execution_single_shard = 2, query_execution_multi_shard = 3, connection_establishment_succeeded >= 80
FROM citus_stat_counters WHERE name = current_database();
?column? | ?column? | ?column?
---------------------------------------------------------------------
t | t | t
(1 row)
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
?column? | ?column?
---------------------------------------------------------------------
t | t
t | t
t | t
t | t
t | t
(5 rows)
SELECT stats_reset into saved_stats_reset_t1 FROM citus_stat_counters WHERE name = current_database();
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT stats_reset into saved_stats_reset_t2 FROM citus_stat_counters WHERE name = current_database();
-- check that that the latter is greater than the former
SELECT t1.stats_reset < t2.stats_reset FROM saved_stats_reset_t1 t1, saved_stats_reset_t2 t2;
?column?
---------------------------------------------------------------------
t
(1 row)
DROP TABLE saved_stats_reset_t1, saved_stats_reset_t2;
\c regression postgres - :master_port
CREATE USER stat_counters_test_user;
GRANT ALL PRIVILEGES ON DATABASE regression TO stat_counters_test_user;
GRANT ALL PRIVILEGES ON SCHEMA stat_counters TO stat_counters_test_user;
ALTER USER stat_counters_test_user SET citus.enable_stat_counters TO true;
SET search_path TO stat_counters;
SET citus.next_shard_id to 1980000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE dist_table_1 (a int, b int);
SELECT create_distributed_table('dist_table_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE uncolocated_dist_table (a int, b int);
SELECT create_distributed_table('uncolocated_dist_table', 'a', colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_shard (a int, b int);
SELECT create_distributed_table('single_shard', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_shard_1 (a int, b int);
SELECT create_distributed_table('single_shard_1', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE uncolocated_single_shard (a int, b int);
SELECT create_distributed_table('uncolocated_single_shard', null, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table (a int, b int);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE local_table (a int, b int);
INSERT INTO local_table (a, b) VALUES (1, 1), (2, 2), (3, 3);
CREATE TABLE citus_local (a int, b int);
INSERT INTO citus_local (a, b) VALUES (1, 1), (2, 2), (3, 3);
SELECT citus_add_local_table_to_metadata('citus_local');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user;
\c stat_counters_test_db postgres - :master_port
-- reset from another database as superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'regression';
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column? | ?column?
---------------------------------------------------------------------
t | t
(1 row)
-- make sure that we can update and read the stats from a non-superuser
\c regression stat_counters_test_user - -
SET search_path TO stat_counters;
CREATE PROCEDURE exec_query_and_check_query_counters(
input_sql text,
query_execution_single_shard_diff_expected bigint,
query_execution_multi_shard_diff_expected bigint
)
LANGUAGE PLPGSQL AS $$
DECLARE
old_query_execution_single_shard bigint;
old_query_execution_multi_shard bigint;
new_query_execution_single_shard bigint;
new_query_execution_multi_shard bigint;
BEGIN
SELECT query_execution_single_shard, query_execution_multi_shard
INTO old_query_execution_single_shard, old_query_execution_multi_shard
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
COMMIT;
EXECUTE input_sql;
SELECT query_execution_single_shard, query_execution_multi_shard
INTO new_query_execution_single_shard, new_query_execution_multi_shard
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
IF (new_query_execution_single_shard - old_query_execution_single_shard != query_execution_single_shard_diff_expected) THEN
RAISE EXCEPTION 'query_execution_single_shard counter is not incremented as expected, expected % but got %',
query_execution_single_shard_diff_expected,
new_query_execution_single_shard - old_query_execution_single_shard;
END IF;
IF (new_query_execution_multi_shard - old_query_execution_multi_shard != query_execution_multi_shard_diff_expected) THEN
RAISE EXCEPTION 'query_execution_multi_shard counter is not incremented as expected, expected % but got %',
query_execution_multi_shard_diff_expected,
new_query_execution_multi_shard - old_query_execution_multi_shard;
END IF;
END;
$$;
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a WHERE dist_table.a = 1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 1
);
SET citus.enable_repartition_joins TO true;
-- A repartition join only increments query_execution_multi_shard once, although
-- this doesn't feel so much ideal.
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.b WHERE dist_table.a = 1
$$,
0, 1
);
RESET citus.enable_repartition_joins;
-- Subplans and the top level query plans separately increment the counters.
-- We first create an intermediate result for dist_table_1, this increments
-- query_execution_multi_shard by 1. Then we join the intermediate result with
-- ref_table, this increments query_execution_single_shard by 1 because it
-- is a single shard query.
CALL exec_query_and_check_query_counters($$
SELECT * FROM ref_table LEFT JOIN dist_table ON dist_table.a = ref_table.a
$$,
1, 1
);
-- OFFSET 0 forces creating an intermediate result for dist_table, this increments
-- query_execution_multi_shard by 1. Then we query the intermediate result
-- with a single shard query, this increments query_execution_single_shard by 1.
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 1
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
$$,
1, 0
);
-- shard pruning is considered too
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a >= 1 AND a = 1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
UPDATE dist_table
SET b = 1
FROM dist_table_1
JOIN ref_table ON dist_table_1.a = ref_table.a
WHERE dist_table_1.a = 1 AND dist_table.a = dist_table_1.a
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table
USING dist_table_1
WHERE dist_table.a = dist_table_1.a
$$,
0, 1
);
-- single-shard inserts
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3)
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-4, -4)
$$,
1, 0
);
PREPARE p1 (bigint) AS SELECT * FROM dist_table WHERE a = $1;
CALL exec_query_and_check_query_counters($$
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
$$,
10, 0
);
CALL exec_query_and_check_query_counters($$
WITH deleted_rows AS (
-- multi-shard
DELETE FROM uncolocated_dist_table
RETURNING *
),
dummy_cte AS (
SELECT count(*) FROM -- single-shard (cross join between intermediate results)
(SELECT * FROM dist_table_1 LIMIT 1) q1, -- multi-shard
(SELECT b, count(*) AS a_count FROM dist_table_1 GROUP BY b) q2 -- multi-shard
)
-- multi-shard
UPDATE dist_table
SET b = 1
FROM dist_table_1
JOIN ref_table ON dist_table_1.a = ref_table.a
JOIN deleted_rows ON dist_table_1.a = deleted_rows.a
CROSS JOIN dummy_cte
WHERE dist_table.a = dist_table_1.a;
$$,
1, 4
);
-- Select query is multi-shard and the same is also true for the final insert
-- but only if it doesn't prune to zero shards, which happens when the source
-- table is empty. So here, we both query_execution_multi_shard and
-- query_execution_single_shard are incremented by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
$$,
1, 1
);
insert into uncolocated_dist_table (a, b) values (1, 1), (2, 2), (3, 3);
-- However, the same insert increments query_execution_multi_shard by 2
-- when the source table is not empty.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
$$,
0, 2
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM uncolocated_single_shard
$$,
2, 0
);
CALL exec_query_and_check_query_counters($$
WITH big_cte AS (
WITH first_cte AS (
-- multi-shard
SELECT b, sum(a) AS a_sum
FROM uncolocated_dist_table
GROUP BY b
),
dummy_cte AS (
SELECT count(*) FROM -- single-shard (cross join between intermediate results)
(SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q1, -- multi-shard
(SELECT b, count(*) AS a_count FROM dist_table_1 GROUP BY b) q2 -- multi-shard
)
-- multi-shard
SELECT dist_table.a, dist_table.b
FROM dist_table
JOIN dist_table_1 ON dist_table.a = dist_table_1.a
JOIN first_cte ON dist_table_1.a = first_cte.a_sum
CROSS JOIN dummy_cte
WHERE dist_table.a = dist_table_1.a
),
another_cte AS (
-- single-shard
SELECT * FROM ref_table ORDER BY a LIMIT 64
)
-- final insert: multi-shard
INSERT INTO dist_table (a, b)
-- source: multi-shard
SELECT uncolocated_dist_table.a, uncolocated_dist_table.b FROM uncolocated_dist_table
LEFT JOIN big_cte ON uncolocated_dist_table.a = big_cte.a
LEFT JOIN another_cte ON uncolocated_dist_table.a = another_cte.a
$$,
2, 6
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) SELECT * FROM local_table
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO local_table (a, b) SELECT * FROM dist_table
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) SELECT * FROM citus_local
$$,
1, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
1, 1
);
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify that we cannot execute citus_stat_counters_reset() from a non-superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
ERROR: permission denied for function citus_stat_counters_reset
\c - postgres - -
ALTER USER stat_counters_test_user SUPERUSER;
\c - stat_counters_test_user - -
-- verify that another superuser can execute citus_stat_counters_reset()
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
citus_stat_counters_reset
---------------------------------------------------------------------
(1 row)
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
?column? | ?column?
---------------------------------------------------------------------
t | t
(1 row)
\c regression postgres - :master_port
-- drop the test cluster
\c regression - - :worker_1_port
DROP DATABASE stat_counters_test_db WITH (FORCE);
\c regression - - :worker_2_port
DROP DATABASE stat_counters_test_db WITH (FORCE);
\c regression - - :master_port
-- save its oid before dropping
SELECT oid AS stat_counters_test_db_oid FROM pg_database WHERE datname = 'stat_counters_test_db' \gset
DROP DATABASE stat_counters_test_db WITH (FORCE);
-- even if the database is dropped, citus_stat_counters() still returns a row for it
SELECT COUNT(*) = 1 FROM citus_stat_counters() WHERE database_id = :'stat_counters_test_db_oid';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT COUNT(*) = 1 FROM (SELECT citus_stat_counters(:'stat_counters_test_db_oid')) q;
?column?
---------------------------------------------------------------------
t
(1 row)
-- However, citus_stat_counters just ignores dropped databases
SELECT COUNT(*) = 0 FROM citus_stat_counters WHERE name = 'stat_counters_test_db';
?column?
---------------------------------------------------------------------
t
(1 row)
-- clean up for the current database
REVOKE ALL ON DATABASE regression FROM stat_counters_test_user;
REVOKE ALL ON SCHEMA stat_counters FROM stat_counters_test_user;
REVOKE ALL ON ALL TABLES IN SCHEMA stat_counters FROM stat_counters_test_user;
SET client_min_messages TO WARNING;
DROP SCHEMA stat_counters CASCADE;
DROP USER stat_counters_test_user;

View File

@ -179,6 +179,8 @@ ORDER BY 1;
function citus_shards_on_worker()
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
function citus_stat_activity()
function citus_stat_counters(oid)
function citus_stat_counters_reset(oid)
function citus_stat_statements()
function citus_stat_statements_reset()
function citus_stat_tenants(boolean)
@ -384,11 +386,12 @@ ORDER BY 1;
view citus_shards
view citus_shards_on_worker
view citus_stat_activity
view citus_stat_counters
view citus_stat_statements
view citus_stat_tenants
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(358 rows)
(361 rows)
DROP TABLE extension_basic_types;

View File

@ -44,6 +44,12 @@ test: comment_on_role
test: single_shard_table_udfs
test: schema_based_sharding
test: citus_schema_distribute_undistribute
# Don't parallelize stat_counters with others because we don't want statistics
# to be updated by other tests.
#
# Also, this needs to be the first test that queries stats_reset column on
# citus_stat_counters since it checks the value of that column.
test: stat_counters
test: multi_test_catalog_views
test: multi_table_ddl

View File

@ -0,0 +1,567 @@
-- Setup another Citus cluster before setting up the tests for "regression" cluster
\c postgres - - :worker_1_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
\c postgres - - :worker_2_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
\c postgres - - :master_port
CREATE DATABASE stat_counters_test_db;
\c stat_counters_test_db - - -
CREATE EXTENSION citus;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- back to the "regression" database on coordinator that we usually use during tests
\c regression - - -
CREATE SCHEMA stat_counters;
SET search_path TO stat_counters;
SET citus.next_shard_id to 1970000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SET client_min_messages TO NOTICE;
-- make sure it's disabled first
SET citus.enable_stat_counters TO false;
-- verify that the UDFs don't do anything when NULL input is provided
SELECT citus_stat_counters(null);
SELECT citus_stat_counters_reset(null);
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
-- Verify that providing an oid that doesn't correspond to any database
-- returns an empty set. We know that "SELECT MAX(oid)+1 FROM pg_database"
-- is definitely not a valid database oid.
SELECT COUNT(*) = 0 FROM (SELECT citus_stat_counters((MAX(oid)::integer+1)::oid) FROM pg_database) q;
-- This is the first test in multi_1_schedule that calls citus_stat_counters_reset(), so one
-- can could have reset the stats before us. So, here we can test that stats_reset column is
-- NULL for that databases that citus_stat_counters_reset() was certainly not called for.
SELECT stats_reset IS NULL FROM citus_stat_counters WHERE name IN ('template0', 'template1');
-- Even more, calling citus_stat_counters_reset() for a database that no one has connected
-- so far is simply a no-op.
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'template0';
SELECT stats_reset IS NULL FROM citus_stat_counters WHERE name = 'template0';
-- but this is not true otherwise
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT stats_reset IS NOT NULL FROM citus_stat_counters WHERE name = current_database();
-- multi_1_schedule has this test in an individual line, so there cannot be any other backends
-- -except Citus maintenance daemon- that can update the stat counters other than us. We also
-- know that Citus maintenance daemon cannot update query related stats.
--
-- So, no one could have incremented query related stats so far.
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
-- Even further, for the databases that don't have Citus extension installed,
-- we should get 0 for other stats too.
--
-- For the databases that have Citus extension installed, we might or might not
-- get 0 for connection related stats, depending on whether the Citus maintenance
-- daemon has done any work so far, so we don't check them.
SELECT connection_establishment_succeeded = 0,
connection_establishment_failed = 0,
connection_reused = 0
FROM (
SELECT * FROM citus_stat_counters WHERE name NOT IN ('regression', 'stat_counters_test_db')
) q;
CREATE TABLE dist_table (a int, b int);
SELECT create_distributed_table('dist_table', 'a');
-- no single shard queries yet, so it's set to 0
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- normally this should increment query_execution_single_shard counter, but the GUC is disabled
SELECT * FROM dist_table WHERE a = 1;
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
SET citus.enable_stat_counters TO true;
-- increment query_execution_single_shard counter
SELECT * FROM dist_table WHERE a = 1;
SELECT query_execution_single_shard = 1 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- increment query_execution_single_shard counter
SELECT * FROM dist_table WHERE a = 1;
SELECT query_execution_single_shard = 1 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- verify that we can reset the stats for a specific database
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT query_execution_single_shard = 0 FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- increment counters a bit
SELECT * FROM dist_table WHERE a = 1;
SELECT * FROM dist_table WHERE a = 1;
SELECT * FROM dist_table WHERE a = 1;
SELECT * FROM dist_table;
SELECT * FROM dist_table;
-- Close the current connection and open a new one to make sure that
-- backends save their stats before exiting.
\c - - - -
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
-- these will be ineffecitve because the GUC is disabled
SELECT * FROM stat_counters.dist_table;
SELECT * FROM stat_counters.dist_table;
-- Verify that we can observe the counters incremented before the GUC was
-- disabled, even when the GUC is disabled.
SELECT query_execution_single_shard = 3, query_execution_multi_shard = 2
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
SET citus.enable_stat_counters TO true;
-- increment the counters a bit more
SELECT * FROM stat_counters.dist_table WHERE a = 1;
SELECT * FROM stat_counters.dist_table;
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
SELECT * FROM stat_counters.dist_table;
RESET citus.force_max_query_parallelization;
-- (*1) For the last two queries, we forced opening as many connections as
-- possible. So, we should expect connection_establishment_succeeded to be
-- incremented by some value closer to 32 shards * 2 queries = 64. However,
-- it might not be that high if the shard queries complete very quickly. So,
-- heuristically, we check that it's at least 50 to avoid making the test
-- flaky.
SELECT query_execution_single_shard = 4, query_execution_multi_shard = 5, connection_establishment_succeeded >= 50
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- We can even see the counter values for "regression" database from
-- other databases that has Citus installed.
\c stat_counters_test_db - - -
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
SELECT query_execution_single_shard = 4, query_execution_multi_shard = 5
FROM (SELECT (pg_catalog.citus_stat_counters(oid)).* FROM pg_database WHERE datname = 'regression') q;
-- enable it before exiting to make sure we save (all-zero) stats into the shared hash when exiting
SET citus.enable_stat_counters TO true;
-- repeat some of the tests from a worker node
\c regression - - :worker_1_port
-- make sure that the GUC is disabled
SET citus.enable_stat_counters TO false;
SET client_min_messages TO NOTICE;
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
-- no one could have incremented query related stats so far
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
SET citus.enable_stat_counters TO true;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
SELECT * FROM stat_counters.dist_table WHERE a = 1;
SET citus.force_max_query_parallelization TO ON;
SELECT * FROM stat_counters.dist_table;
SELECT * FROM stat_counters.dist_table;
SELECT * FROM stat_counters.dist_table;
RESET citus.force_max_query_parallelization;
-- As in (*1), we don't directly compare connection_establishment_succeeded
-- with 3 * 32 = 96 but with something smaller.
SELECT query_execution_single_shard = 2, query_execution_multi_shard = 3, connection_establishment_succeeded >= 80
FROM citus_stat_counters WHERE name = current_database();
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0 FROM citus_stat_counters;
SELECT stats_reset into saved_stats_reset_t1 FROM citus_stat_counters WHERE name = current_database();
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT stats_reset into saved_stats_reset_t2 FROM citus_stat_counters WHERE name = current_database();
-- check that that the latter is greater than the former
SELECT t1.stats_reset < t2.stats_reset FROM saved_stats_reset_t1 t1, saved_stats_reset_t2 t2;
DROP TABLE saved_stats_reset_t1, saved_stats_reset_t2;
\c regression postgres - :master_port
CREATE USER stat_counters_test_user;
GRANT ALL PRIVILEGES ON DATABASE regression TO stat_counters_test_user;
GRANT ALL PRIVILEGES ON SCHEMA stat_counters TO stat_counters_test_user;
ALTER USER stat_counters_test_user SET citus.enable_stat_counters TO true;
SET search_path TO stat_counters;
SET citus.next_shard_id to 1980000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE dist_table_1 (a int, b int);
SELECT create_distributed_table('dist_table_1', 'a');
CREATE TABLE uncolocated_dist_table (a int, b int);
SELECT create_distributed_table('uncolocated_dist_table', 'a', colocate_with => 'none');
CREATE TABLE single_shard (a int, b int);
SELECT create_distributed_table('single_shard', null);
CREATE TABLE single_shard_1 (a int, b int);
SELECT create_distributed_table('single_shard_1', null);
CREATE TABLE uncolocated_single_shard (a int, b int);
SELECT create_distributed_table('uncolocated_single_shard', null, colocate_with => 'none');
CREATE TABLE ref_table (a int, b int);
SELECT create_reference_table('ref_table');
CREATE TABLE local_table (a int, b int);
INSERT INTO local_table (a, b) VALUES (1, 1), (2, 2), (3, 3);
CREATE TABLE citus_local (a int, b int);
INSERT INTO citus_local (a, b) VALUES (1, 1), (2, 2), (3, 3);
SELECT citus_add_local_table_to_metadata('citus_local');
GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user;
\c stat_counters_test_db postgres - :master_port
-- reset from another database as superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'regression';
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
-- make sure that we can update and read the stats from a non-superuser
\c regression stat_counters_test_user - -
SET search_path TO stat_counters;
CREATE PROCEDURE exec_query_and_check_query_counters(
input_sql text,
query_execution_single_shard_diff_expected bigint,
query_execution_multi_shard_diff_expected bigint
)
LANGUAGE PLPGSQL AS $$
DECLARE
old_query_execution_single_shard bigint;
old_query_execution_multi_shard bigint;
new_query_execution_single_shard bigint;
new_query_execution_multi_shard bigint;
BEGIN
SELECT query_execution_single_shard, query_execution_multi_shard
INTO old_query_execution_single_shard, old_query_execution_multi_shard
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
COMMIT;
EXECUTE input_sql;
SELECT query_execution_single_shard, query_execution_multi_shard
INTO new_query_execution_single_shard, new_query_execution_multi_shard
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
IF (new_query_execution_single_shard - old_query_execution_single_shard != query_execution_single_shard_diff_expected) THEN
RAISE EXCEPTION 'query_execution_single_shard counter is not incremented as expected, expected % but got %',
query_execution_single_shard_diff_expected,
new_query_execution_single_shard - old_query_execution_single_shard;
END IF;
IF (new_query_execution_multi_shard - old_query_execution_multi_shard != query_execution_multi_shard_diff_expected) THEN
RAISE EXCEPTION 'query_execution_multi_shard counter is not incremented as expected, expected % but got %',
query_execution_multi_shard_diff_expected,
new_query_execution_multi_shard - old_query_execution_multi_shard;
END IF;
END;
$$;
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a WHERE dist_table.a = 1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 1
);
SET citus.enable_repartition_joins TO true;
-- A repartition join only increments query_execution_multi_shard once, although
-- this doesn't feel so much ideal.
CALL exec_query_and_check_query_counters($$
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.b WHERE dist_table.a = 1
$$,
0, 1
);
RESET citus.enable_repartition_joins;
-- Subplans and the top level query plans separately increment the counters.
-- We first create an intermediate result for dist_table_1, this increments
-- query_execution_multi_shard by 1. Then we join the intermediate result with
-- ref_table, this increments query_execution_single_shard by 1 because it
-- is a single shard query.
CALL exec_query_and_check_query_counters($$
SELECT * FROM ref_table LEFT JOIN dist_table ON dist_table.a = ref_table.a
$$,
1, 1
);
-- OFFSET 0 forces creating an intermediate result for dist_table, this increments
-- query_execution_multi_shard by 1. Then we query the intermediate result
-- with a single shard query, this increments query_execution_single_shard by 1.
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 1
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
$$,
1, 0
);
-- shard pruning is considered too
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a >= 1 AND a = 1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
UPDATE dist_table
SET b = 1
FROM dist_table_1
JOIN ref_table ON dist_table_1.a = ref_table.a
WHERE dist_table_1.a = 1 AND dist_table.a = dist_table_1.a
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table
USING dist_table_1
WHERE dist_table.a = dist_table_1.a
$$,
0, 1
);
-- single-shard inserts
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3)
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-4, -4)
$$,
1, 0
);
PREPARE p1 (bigint) AS SELECT * FROM dist_table WHERE a = $1;
CALL exec_query_and_check_query_counters($$
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
EXECUTE p1(1);
$$,
10, 0
);
CALL exec_query_and_check_query_counters($$
WITH deleted_rows AS (
-- multi-shard
DELETE FROM uncolocated_dist_table
RETURNING *
),
dummy_cte AS (
SELECT count(*) FROM -- single-shard (cross join between intermediate results)
(SELECT * FROM dist_table_1 LIMIT 1) q1, -- multi-shard
(SELECT b, count(*) AS a_count FROM dist_table_1 GROUP BY b) q2 -- multi-shard
)
-- multi-shard
UPDATE dist_table
SET b = 1
FROM dist_table_1
JOIN ref_table ON dist_table_1.a = ref_table.a
JOIN deleted_rows ON dist_table_1.a = deleted_rows.a
CROSS JOIN dummy_cte
WHERE dist_table.a = dist_table_1.a;
$$,
1, 4
);
-- Select query is multi-shard and the same is also true for the final insert
-- but only if it doesn't prune to zero shards, which happens when the source
-- table is empty. So here, we both query_execution_multi_shard and
-- query_execution_single_shard are incremented by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
$$,
1, 1
);
insert into uncolocated_dist_table (a, b) values (1, 1), (2, 2), (3, 3);
-- However, the same insert increments query_execution_multi_shard by 2
-- when the source table is not empty.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
$$,
0, 2
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM uncolocated_single_shard
$$,
2, 0
);
CALL exec_query_and_check_query_counters($$
WITH big_cte AS (
WITH first_cte AS (
-- multi-shard
SELECT b, sum(a) AS a_sum
FROM uncolocated_dist_table
GROUP BY b
),
dummy_cte AS (
SELECT count(*) FROM -- single-shard (cross join between intermediate results)
(SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q1, -- multi-shard
(SELECT b, count(*) AS a_count FROM dist_table_1 GROUP BY b) q2 -- multi-shard
)
-- multi-shard
SELECT dist_table.a, dist_table.b
FROM dist_table
JOIN dist_table_1 ON dist_table.a = dist_table_1.a
JOIN first_cte ON dist_table_1.a = first_cte.a_sum
CROSS JOIN dummy_cte
WHERE dist_table.a = dist_table_1.a
),
another_cte AS (
-- single-shard
SELECT * FROM ref_table ORDER BY a LIMIT 64
)
-- final insert: multi-shard
INSERT INTO dist_table (a, b)
-- source: multi-shard
SELECT uncolocated_dist_table.a, uncolocated_dist_table.b FROM uncolocated_dist_table
LEFT JOIN big_cte ON uncolocated_dist_table.a = big_cte.a
LEFT JOIN another_cte ON uncolocated_dist_table.a = another_cte.a
$$,
2, 6
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) SELECT * FROM local_table
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO local_table (a, b) SELECT * FROM dist_table
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) SELECT * FROM citus_local
$$,
1, 1
);
CALL exec_query_and_check_query_counters($$
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
1, 1
);
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
-- verify that we cannot execute citus_stat_counters_reset() from a non-superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
\c - postgres - -
ALTER USER stat_counters_test_user SUPERUSER;
\c - stat_counters_test_user - -
-- verify that another superuser can execute citus_stat_counters_reset()
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();
SELECT query_execution_single_shard = 0, query_execution_multi_shard = 0
FROM (SELECT (citus_stat_counters(oid)).* FROM pg_database WHERE datname = current_database()) q;
\c regression postgres - :master_port
-- drop the test cluster
\c regression - - :worker_1_port
DROP DATABASE stat_counters_test_db WITH (FORCE);
\c regression - - :worker_2_port
DROP DATABASE stat_counters_test_db WITH (FORCE);
\c regression - - :master_port
-- save its oid before dropping
SELECT oid AS stat_counters_test_db_oid FROM pg_database WHERE datname = 'stat_counters_test_db' \gset
DROP DATABASE stat_counters_test_db WITH (FORCE);
-- even if the database is dropped, citus_stat_counters() still returns a row for it
SELECT COUNT(*) = 1 FROM citus_stat_counters() WHERE database_id = :'stat_counters_test_db_oid';
SELECT COUNT(*) = 1 FROM (SELECT citus_stat_counters(:'stat_counters_test_db_oid')) q;
-- However, citus_stat_counters just ignores dropped databases
SELECT COUNT(*) = 0 FROM citus_stat_counters WHERE name = 'stat_counters_test_db';
-- clean up for the current database
REVOKE ALL ON DATABASE regression FROM stat_counters_test_user;
REVOKE ALL ON SCHEMA stat_counters FROM stat_counters_test_user;
REVOKE ALL ON ALL TABLES IN SCHEMA stat_counters FROM stat_counters_test_user;
SET client_min_messages TO WARNING;
DROP SCHEMA stat_counters CASCADE;
DROP USER stat_counters_test_user;