mirror of https://github.com/citusdata/citus.git
refactor: separate failed connections counter
Soon this won't be a simple counter but an array or such for us to be able to report the failed connection establishments in a "per targetserver" basis.stat-counters-conn-failure-target-node
parent
432b69eb9d
commit
76320723ef
|
|
@ -2622,11 +2622,11 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
0;
|
0;
|
||||||
if (copiedShardCount <= 1)
|
if (copiedShardCount <= 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3174,11 +3174,11 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
|
|
||||||
if (list_length(shardIntervalList) <= 1)
|
if (list_length(shardIntervalList) <= 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
table_close(distributedRelation, AccessShareLock);
|
table_close(distributedRelation, AccessShareLock);
|
||||||
|
|
|
||||||
|
|
@ -364,7 +364,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
*/
|
*/
|
||||||
if (PQstatus(connection->pgConn) == CONNECTION_OK)
|
if (PQstatus(connection->pgConn) == CONNECTION_OK)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
|
IncrementSimpleStatCounterForMyDb(STAT_CONNECTION_REUSED);
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
|
|
@ -1007,7 +1007,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
* Here we count the connections establishments that failed and that
|
* Here we count the connections establishments that failed and that
|
||||||
* we won't wait anymore.
|
* we won't wait anymore.
|
||||||
*/
|
*/
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1110,7 +1110,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
eventMask, NULL);
|
eventMask, NULL);
|
||||||
if (!success)
|
if (!success)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("connection establishment for node %s:%d "
|
errmsg("connection establishment for node %s:%d "
|
||||||
"failed", connection->hostname,
|
"failed", connection->hostname,
|
||||||
|
|
@ -1220,7 +1220,7 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
|
||||||
/* close connection, otherwise we take up resource on the other side */
|
/* close connection, otherwise we take up resource on the other side */
|
||||||
CitusPQFinish(connection);
|
CitusPQFinish(connection);
|
||||||
|
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1644,7 +1644,7 @@ MarkConnectionConnected(MultiConnection *connection, bool newConnection)
|
||||||
|
|
||||||
if (newConnection)
|
if (newConnection)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
|
IncrementSimpleStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2036,7 +2036,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -2854,7 +2854,7 @@ MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
|
||||||
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -3039,7 +3039,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
else if (status == CONNECTION_BAD)
|
else if (status == CONNECTION_BAD)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3055,7 +3055,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
if (pollMode == PGRES_POLLING_FAILED)
|
if (pollMode == PGRES_POLLING_FAILED)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
IncrementConnEstabFailedStatCounterForMyDb();
|
||||||
}
|
}
|
||||||
else if (pollMode == PGRES_POLLING_READING)
|
else if (pollMode == PGRES_POLLING_READING)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -269,11 +269,11 @@ CitusExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (isMultiTaskPlan)
|
if (isMultiTaskPlan)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
|
|
|
||||||
|
|
@ -189,11 +189,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
|
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
|
||||||
* here.
|
* here.
|
||||||
*/
|
*/
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -218,11 +218,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (list_length(taskList) <= 1)
|
if (list_length(taskList) <= 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsInserted;
|
executorState->es_processed = rowsInserted;
|
||||||
|
|
@ -302,11 +302,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (list_length(prunedTaskList) <= 1)
|
if (list_length(prunedTaskList) <= 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
||||||
|
|
@ -176,11 +176,11 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
* most probably choose to use ExecuteSourceAtCoordAndRedistribution(),
|
* most probably choose to use ExecuteSourceAtCoordAndRedistribution(),
|
||||||
* but we still keep this here.
|
* but we still keep this here.
|
||||||
*/
|
*/
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
||||||
|
|
@ -213,11 +213,11 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
|
|
||||||
if (list_length(taskList) <= 1)
|
if (list_length(taskList) <= 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
|
|
@ -318,7 +318,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
* No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD
|
* No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD
|
||||||
* as per our convention.
|
* as per our convention.
|
||||||
*/
|
*/
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -341,11 +341,11 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
|
|
||||||
if (list_length(prunedTaskList) == 1)
|
if (list_length(prunedTaskList) == 1)
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
IncrementSimpleStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
}
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
|
|
|
||||||
|
|
@ -3,3 +3,4 @@
|
||||||
|
|
||||||
#include "udfs/citus_prepare_pg_upgrade/14.0-1.sql"
|
#include "udfs/citus_prepare_pg_upgrade/14.0-1.sql"
|
||||||
#include "udfs/citus_finish_pg_upgrade/14.0-1.sql"
|
#include "udfs/citus_finish_pg_upgrade/14.0-1.sql"
|
||||||
|
#include "udfs/citus_stat_counters/14.0-1.sql"
|
||||||
|
|
|
||||||
|
|
@ -3,3 +3,7 @@
|
||||||
|
|
||||||
#include "../udfs/citus_prepare_pg_upgrade/13.0-1.sql"
|
#include "../udfs/citus_prepare_pg_upgrade/13.0-1.sql"
|
||||||
#include "../udfs/citus_finish_pg_upgrade/13.2-1.sql"
|
#include "../udfs/citus_finish_pg_upgrade/13.2-1.sql"
|
||||||
|
|
||||||
|
DROP VIEW pg_catalog.citus_stat_counters;
|
||||||
|
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||||
|
#include "../udfs/citus_stat_counters/13.1-1.sql"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,53 @@
|
||||||
|
DROP VIEW pg_catalog.citus_stat_counters;
|
||||||
|
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||||
|
|
||||||
|
-- See the comments for the function in
|
||||||
|
-- src/backend/distributed/stats/stat_counters.c for more details.
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
database_id oid DEFAULT 0,
|
||||||
|
|
||||||
|
-- must always be the first column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT database_id oid,
|
||||||
|
|
||||||
|
-- Following stat counter columns must be in the same order as the
|
||||||
|
-- StatType enum defined in src/include/distributed/stats/stat_counters.h
|
||||||
|
OUT connection_establishment_succeeded bigint,
|
||||||
|
OUT connection_reused bigint,
|
||||||
|
OUT query_execution_single_shard bigint,
|
||||||
|
OUT query_execution_multi_shard bigint,
|
||||||
|
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
|
||||||
|
-- must always be the last column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT stats_reset timestamp with time zone
|
||||||
|
)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
|
||||||
|
-- returns the stat counters for all the databases in local node
|
||||||
|
CREATE OR REPLACE VIEW citus.citus_stat_counters AS
|
||||||
|
SELECT pg_database.oid,
|
||||||
|
pg_database.datname as name,
|
||||||
|
|
||||||
|
-- We always COALESCE the counters to 0 because the LEFT JOIN
|
||||||
|
-- will bring the databases that have never been connected to
|
||||||
|
-- since the last restart with NULL counters, but we want to
|
||||||
|
-- show them with 0 counters in the view.
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
|
||||||
|
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
|
||||||
|
|
||||||
|
citus_stat_counters.stats_reset
|
||||||
|
FROM pg_catalog.pg_database
|
||||||
|
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
|
||||||
|
ON (oid = database_id);
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||||
|
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
||||||
|
|
@ -1,3 +1,6 @@
|
||||||
|
DROP VIEW pg_catalog.citus_stat_counters;
|
||||||
|
DROP FUNCTION pg_catalog.citus_stat_counters(oid);
|
||||||
|
|
||||||
-- See the comments for the function in
|
-- See the comments for the function in
|
||||||
-- src/backend/distributed/stats/stat_counters.c for more details.
|
-- src/backend/distributed/stats/stat_counters.c for more details.
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
|
@ -10,11 +13,12 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
-- Following stat counter columns must be in the same order as the
|
-- Following stat counter columns must be in the same order as the
|
||||||
-- StatType enum defined in src/include/distributed/stats/stat_counters.h
|
-- StatType enum defined in src/include/distributed/stats/stat_counters.h
|
||||||
OUT connection_establishment_succeeded bigint,
|
OUT connection_establishment_succeeded bigint,
|
||||||
OUT connection_establishment_failed bigint,
|
|
||||||
OUT connection_reused bigint,
|
OUT connection_reused bigint,
|
||||||
OUT query_execution_single_shard bigint,
|
OUT query_execution_single_shard bigint,
|
||||||
OUT query_execution_multi_shard bigint,
|
OUT query_execution_multi_shard bigint,
|
||||||
|
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
|
||||||
-- must always be the last column or you should accordingly update
|
-- must always be the last column or you should accordingly update
|
||||||
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
OUT stats_reset timestamp with time zone
|
OUT stats_reset timestamp with time zone
|
||||||
|
|
@ -25,7 +29,7 @@ AS 'MODULE_PATHNAME', $$citus_stat_counters$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
|
||||||
-- returns the stat counters for all the databases in local node
|
-- returns the stat counters for all the databases in local node
|
||||||
CREATE VIEW citus.citus_stat_counters AS
|
CREATE OR REPLACE VIEW citus.citus_stat_counters AS
|
||||||
SELECT pg_database.oid,
|
SELECT pg_database.oid,
|
||||||
pg_database.datname as name,
|
pg_database.datname as name,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@
|
||||||
*
|
*
|
||||||
* We create an array of "BackendStatsSlot"s in shared memory, one for
|
* We create an array of "BackendStatsSlot"s in shared memory, one for
|
||||||
* each backend. Each backend increments its own stat counters in its
|
* each backend. Each backend increments its own stat counters in its
|
||||||
* own slot via IncrementStatCounterForMyDb(). And when a backend exits,
|
* own slot via IncrementSimpleStatCounterForMyDb(). And when a backend exits,
|
||||||
* it saves its stat counters from its slot via
|
* it saves its stat counters from its slot via
|
||||||
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
|
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
|
||||||
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
|
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
|
||||||
|
|
@ -48,7 +48,7 @@
|
||||||
* There is chance that citus_stat_counters_reset() might race with a
|
* There is chance that citus_stat_counters_reset() might race with a
|
||||||
* backend that is trying to increment one of the counters in its slot
|
* backend that is trying to increment one of the counters in its slot
|
||||||
* and as a result it can effectively fail to reset that counter due to
|
* and as a result it can effectively fail to reset that counter due to
|
||||||
* the reasons documented in IncrementStatCounterForMyDb() function.
|
* the reasons documented in IncrementSimpleStatCounterForMyDb() function.
|
||||||
* However, this should be a very rare case and we can live with that
|
* However, this should be a very rare case and we can live with that
|
||||||
* for now.
|
* for now.
|
||||||
*
|
*
|
||||||
|
|
@ -97,8 +97,17 @@
|
||||||
|
|
||||||
|
|
||||||
/* fixed size array types to store the stat counters */
|
/* fixed size array types to store the stat counters */
|
||||||
typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS];
|
typedef struct AtomicStatCounters
|
||||||
typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS];
|
{
|
||||||
|
pg_atomic_uint64 simpleCounters[N_SIMPLE_CITUS_STAT_COUNTERS];
|
||||||
|
pg_atomic_uint64 connEstabFailedCounter;
|
||||||
|
} AtomicStatCounters;
|
||||||
|
|
||||||
|
typedef struct StatCounters
|
||||||
|
{
|
||||||
|
uint64 simpleCounters[N_SIMPLE_CITUS_STAT_COUNTERS];
|
||||||
|
uint64 connEstabFailedCounter;
|
||||||
|
} StatCounters;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* saved backend stats - hash entry definition
|
* saved backend stats - hash entry definition
|
||||||
|
|
@ -153,7 +162,7 @@ typedef struct BackendStatsSlot
|
||||||
* GUC variable
|
* GUC variable
|
||||||
*
|
*
|
||||||
* This only controls whether we track the stat counters or not, via
|
* This only controls whether we track the stat counters or not, via
|
||||||
* IncrementStatCounterForMyDb() and
|
* IncrementSimpleStatCounterForMyDb() and
|
||||||
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
|
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
|
||||||
* when the GUC is disabled, we still allocate the shared memory
|
* when the GUC is disabled, we still allocate the shared memory
|
||||||
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
|
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
|
||||||
|
|
@ -394,11 +403,11 @@ StatCountersShmemSize(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IncrementStatCounterForMyDb increments the stat counter for the given statId
|
* IncrementSimpleStatCounterForMyDb increments the stat counter for the given statId
|
||||||
* for this backend.
|
* for this backend.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
IncrementStatCounterForMyDb(int statId)
|
IncrementSimpleStatCounterForMyDb(int statId)
|
||||||
{
|
{
|
||||||
if (!EnableStatCounters)
|
if (!EnableStatCounters)
|
||||||
{
|
{
|
||||||
|
|
@ -430,7 +439,45 @@ IncrementStatCounterForMyDb(int statId)
|
||||||
* But this should be a rare case and we can live with that, for the
|
* But this should be a rare case and we can live with that, for the
|
||||||
* sake of lock-free implementation of this function.
|
* sake of lock-free implementation of this function.
|
||||||
*/
|
*/
|
||||||
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId];
|
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters.simpleCounters[statId];
|
||||||
|
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
IncrementConnEstabFailedStatCounterForMyDb(void)
|
||||||
|
{
|
||||||
|
if (!EnableStatCounters)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int myBackendSlotIdx = getProcNo_compat(MyProc);
|
||||||
|
BackendStatsSlot *myBackendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[myBackendSlotIdx];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When there cannot be any other writers, incrementing an atomic
|
||||||
|
* counter via pg_atomic_read_u64() and pg_atomic_write_u64() is
|
||||||
|
* same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the
|
||||||
|
* former is cheaper than the latter because the latter has to do
|
||||||
|
* extra work to deal with concurrent writers.
|
||||||
|
*
|
||||||
|
* In our case, the only concurrent writer could be the backend that
|
||||||
|
* is executing citus_stat_counters_reset(). So, there is chance that
|
||||||
|
* we read the counter value, then it gets reset by a concurrent call
|
||||||
|
* made to citus_stat_counters_reset() and then we write the
|
||||||
|
* incremented value back, by effectively overriding the reset value.
|
||||||
|
* But this should be a rare case and we can live with that, for the
|
||||||
|
* sake of lock-free implementation of this function.
|
||||||
|
*/
|
||||||
|
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters.connEstabFailedCounter;
|
||||||
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
|
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -504,19 +551,24 @@ SaveBackendStatsIntoSavedBackendStatsHash(void)
|
||||||
|
|
||||||
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
dbSavedBackendStatsEntry->counters[statIdx] +=
|
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx] +=
|
||||||
pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]);
|
pg_atomic_read_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx]);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Given that this function is only called when a backend exits, later on
|
* Given that this function is only called when a backend exits, later on
|
||||||
* another backend might be assigned to the same slot. So, we reset each
|
* another backend might be assigned to the same slot. So, we reset each
|
||||||
* stat counter of this slot to 0 after saving it.
|
* stat counter of this slot to 0 after saving it.
|
||||||
*/
|
*/
|
||||||
pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0);
|
pg_atomic_write_u64(&myBackendStatsSlot->counters.simpleCounters[statIdx], 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry->counters.connEstabFailedCounter +=
|
||||||
|
pg_atomic_read_u64(&myBackendStatsSlot->counters.connEstabFailedCounter);
|
||||||
|
|
||||||
|
pg_atomic_write_u64(&myBackendStatsSlot->counters.connEstabFailedCounter, 0);
|
||||||
|
|
||||||
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
|
@ -570,10 +622,13 @@ StatCountersShmemInit(void)
|
||||||
BackendStatsSlot *backendStatsSlot =
|
BackendStatsSlot *backendStatsSlot =
|
||||||
&SharedBackendStatsSlotArray[backendSlotIdx];
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0);
|
pg_atomic_init_u64(&backendStatsSlot->counters.simpleCounters[statIdx],
|
||||||
|
0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pg_atomic_init_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
*SharedSavedBackendStatsHashLock = &(
|
*SharedSavedBackendStatsHashLock = &(
|
||||||
|
|
@ -657,11 +712,14 @@ CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
BackendStatsSlot *backendStatsSlot =
|
BackendStatsSlot *backendStatsSlot =
|
||||||
&SharedBackendStatsSlotArray[backendSlotIdx];
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
dbStatsEntry->counters[statIdx] +=
|
dbStatsEntry->counters.simpleCounters[statIdx] +=
|
||||||
pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]);
|
pg_atomic_read_u64(&backendStatsSlot->counters.simpleCounters[statIdx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->counters.connEstabFailedCounter +=
|
||||||
|
pg_atomic_read_u64(&backendStatsSlot->counters.connEstabFailedCounter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -699,12 +757,15 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
|
||||||
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
dbStatsEntry->counters[statIdx] +=
|
dbStatsEntry->counters.simpleCounters[statIdx] +=
|
||||||
dbSavedBackendStatsEntry->counters[statIdx];
|
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->counters.connEstabFailedCounter +=
|
||||||
|
dbSavedBackendStatsEntry->counters.connEstabFailedCounter;
|
||||||
|
|
||||||
dbStatsEntry->resetTimestamp =
|
dbStatsEntry->resetTimestamp =
|
||||||
dbSavedBackendStatsEntry->resetTimestamp;
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
|
@ -725,12 +786,15 @@ CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
|
||||||
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
dbStatsEntry->counters[statIdx] +=
|
dbStatsEntry->counters.simpleCounters[statIdx] +=
|
||||||
dbSavedBackendStatsEntry->counters[statIdx];
|
dbSavedBackendStatsEntry->counters.simpleCounters[statIdx];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->counters.connEstabFailedCounter +=
|
||||||
|
dbSavedBackendStatsEntry->counters.connEstabFailedCounter;
|
||||||
|
|
||||||
dbStatsEntry->resetTimestamp =
|
dbStatsEntry->resetTimestamp =
|
||||||
dbSavedBackendStatsEntry->resetTimestamp;
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
|
@ -757,7 +821,8 @@ DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats)
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters));
|
MemSet(dbStatsEntry->counters.simpleCounters, 0, sizeof(dbStatsEntry->counters.simpleCounters));
|
||||||
|
dbStatsEntry->counters.connEstabFailedCounter = 0;
|
||||||
dbStatsEntry->resetTimestamp = 0;
|
dbStatsEntry->resetTimestamp = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -779,29 +844,39 @@ StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore,
|
||||||
DatabaseStatsHashEntry *dbStatsEntry = NULL;
|
DatabaseStatsHashEntry *dbStatsEntry = NULL;
|
||||||
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
{
|
{
|
||||||
/* +2 for database_id (first) and the stats_reset (last) column */
|
/*
|
||||||
Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
* +3 for:
|
||||||
bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
* database_id (first)
|
||||||
|
* conn_estab_failed_count (after the simple counters)
|
||||||
|
* the stats_reset (last)
|
||||||
|
*/
|
||||||
|
Datum values[N_SIMPLE_CITUS_STAT_COUNTERS + 3] = { 0 };
|
||||||
|
bool isNulls[N_SIMPLE_CITUS_STAT_COUNTERS + 3] = { 0 };
|
||||||
|
|
||||||
values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId);
|
int writeIdx = 0;
|
||||||
|
values[writeIdx++] = ObjectIdGetDatum(dbStatsEntry->databaseId);
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
uint64 statCounter = dbStatsEntry->counters[statIdx];
|
uint64 statCounter = dbStatsEntry->counters.simpleCounters[statIdx];
|
||||||
values[statIdx + 1] = UInt64GetDatum(statCounter);
|
values[writeIdx++] = UInt64GetDatum(statCounter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
values[writeIdx++] = UInt64GetDatum(
|
||||||
|
dbStatsEntry->counters.connEstabFailedCounter);
|
||||||
|
|
||||||
/* set stats_reset column to NULL if it was never reset */
|
/* set stats_reset column to NULL if it was never reset */
|
||||||
if (dbStatsEntry->resetTimestamp == 0)
|
if (dbStatsEntry->resetTimestamp == 0)
|
||||||
{
|
{
|
||||||
isNulls[N_CITUS_STAT_COUNTERS + 1] = true;
|
isNulls[writeIdx++] = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
values[N_CITUS_STAT_COUNTERS + 1] =
|
values[writeIdx++] = TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
|
||||||
TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert(writeIdx == lengthof(values));
|
||||||
|
|
||||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -855,10 +930,12 @@ ResetActiveBackendStats(Oid databaseId)
|
||||||
BackendStatsSlot *backendStatsSlot =
|
BackendStatsSlot *backendStatsSlot =
|
||||||
&SharedBackendStatsSlotArray[backendSlotIdx];
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
for (int statIdx = 0; statIdx < N_SIMPLE_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
{
|
{
|
||||||
pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0);
|
pg_atomic_write_u64(&backendStatsSlot->counters.simpleCounters[statIdx], 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pg_atomic_write_u64(&backendStatsSlot->counters.connEstabFailedCounter, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return foundAny;
|
return foundAny;
|
||||||
|
|
@ -920,7 +997,9 @@ ResetSavedBackendStats(Oid databaseId, bool force)
|
||||||
{
|
{
|
||||||
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0,
|
||||||
|
sizeof(dbSavedBackendStatsEntry->counters.simpleCounters));
|
||||||
|
dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0;
|
||||||
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
|
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
@ -962,7 +1041,10 @@ SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId)
|
||||||
|
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
memset(dbSavedBackendStatsEntry->counters.simpleCounters, 0,
|
||||||
|
sizeof(dbSavedBackendStatsEntry->counters.simpleCounters));
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry->counters.connEstabFailedCounter = 0;
|
||||||
|
|
||||||
dbSavedBackendStatsEntry->resetTimestamp = 0;
|
dbSavedBackendStatsEntry->resetTimestamp = 0;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,6 @@ typedef enum
|
||||||
* adaptive_executor.c.
|
* adaptive_executor.c.
|
||||||
*/
|
*/
|
||||||
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
|
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
|
||||||
STAT_CONNECTION_ESTABLISHMENT_FAILED,
|
|
||||||
STAT_CONNECTION_REUSED,
|
STAT_CONNECTION_REUSED,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -48,8 +47,8 @@ typedef enum
|
||||||
STAT_QUERY_EXECUTION_MULTI_SHARD,
|
STAT_QUERY_EXECUTION_MULTI_SHARD,
|
||||||
|
|
||||||
/* do not use this and ensure it is the last entry */
|
/* do not use this and ensure it is the last entry */
|
||||||
N_CITUS_STAT_COUNTERS
|
N_SIMPLE_CITUS_STAT_COUNTERS
|
||||||
} StatType;
|
} SimpleStatType;
|
||||||
|
|
||||||
|
|
||||||
/* GUC variable */
|
/* GUC variable */
|
||||||
|
|
@ -60,8 +59,9 @@ extern bool EnableStatCounters;
|
||||||
extern void InitializeStatCountersShmem(void);
|
extern void InitializeStatCountersShmem(void);
|
||||||
extern Size StatCountersShmemSize(void);
|
extern Size StatCountersShmemSize(void);
|
||||||
|
|
||||||
/* main entry point for the callers who want to increment the stat counters */
|
/* main entry point for the callers who want to increment the simple stat counters */
|
||||||
extern void IncrementStatCounterForMyDb(int statId);
|
extern void IncrementSimpleStatCounterForMyDb(int statId);
|
||||||
|
extern void IncrementConnEstabFailedStatCounterForMyDb(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Exported to define a before_shmem_exit() callback that saves
|
* Exported to define a before_shmem_exit() callback that saves
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue