mirror of https://github.com/citusdata/citus.git
Merge d2cf3f05cd into c569f8321f
commit
a2deab185a
|
|
@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
|
||||||
DATA_built = $(generated_sql_files)
|
DATA_built = $(generated_sql_files)
|
||||||
|
|
||||||
# directories with source files
|
# directories with source files
|
||||||
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit stats test transaction utils worker clock
|
||||||
# enterprise modules
|
# enterprise modules
|
||||||
SUBDIRS += replication
|
SUBDIRS += replication
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2735,11 +2735,15 @@ CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId)
|
||||||
ExprContext *econtext = GetPerTupleExprContext(estate);
|
ExprContext *econtext = GetPerTupleExprContext(estate);
|
||||||
econtext->ecxt_scantuple = slot;
|
econtext->ecxt_scantuple = slot;
|
||||||
const bool nonPublishableData = false;
|
const bool nonPublishableData = false;
|
||||||
|
|
||||||
|
/* we don't track query counters when distributing a table */
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
DestReceiver *copyDest =
|
DestReceiver *copyDest =
|
||||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
|
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
|
||||||
columnNameList,
|
columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
estate, NULL, nonPublishableData);
|
estate, NULL, nonPublishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* initialise state for writing to shards, we'll open connections on demand */
|
/* initialise state for writing to shards, we'll open connections on demand */
|
||||||
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);
|
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);
|
||||||
|
|
|
||||||
|
|
@ -110,6 +110,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
|
@ -497,10 +498,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
|
|
||||||
/* set up the destination for the COPY */
|
/* set up the destination for the COPY */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
|
|
||||||
|
/* we want to track query counters for "COPY (to) distributed-table .." commands */
|
||||||
|
const bool trackQueryCounters = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState, NULL,
|
executorState, NULL,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* if the user specified an explicit append-to_shard option, write to it */
|
/* if the user specified an explicit append-to_shard option, write to it */
|
||||||
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
|
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
|
||||||
|
|
@ -1875,11 +1880,15 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
||||||
* of intermediate results that are co-located with the actual table.
|
* of intermediate results that are co-located with the actual table.
|
||||||
* The names of the intermediate results with be of the form:
|
* The names of the intermediate results with be of the form:
|
||||||
* intermediateResultIdPrefix_<shardid>
|
* intermediateResultIdPrefix_<shardid>
|
||||||
|
*
|
||||||
|
* If trackQueryCounters is true, the COPY will increment the query stat
|
||||||
|
* counters as needed at the end of the COPY.
|
||||||
*/
|
*/
|
||||||
CitusCopyDestReceiver *
|
CitusCopyDestReceiver *
|
||||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
char *intermediateResultIdPrefix, bool isPublishable)
|
char *intermediateResultIdPrefix, bool isPublishable,
|
||||||
|
bool trackQueryCounters)
|
||||||
{
|
{
|
||||||
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
|
||||||
sizeof(CitusCopyDestReceiver));
|
sizeof(CitusCopyDestReceiver));
|
||||||
|
|
@ -1899,6 +1908,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
||||||
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
||||||
copyDest->memoryContext = CurrentMemoryContext;
|
copyDest->memoryContext = CurrentMemoryContext;
|
||||||
copyDest->isPublishable = isPublishable;
|
copyDest->isPublishable = isPublishable;
|
||||||
|
copyDest->trackQueryCounters = trackQueryCounters;
|
||||||
|
|
||||||
return copyDest;
|
return copyDest;
|
||||||
}
|
}
|
||||||
|
|
@ -2589,8 +2599,9 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
|
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
|
||||||
* CitusCopyDestReceiver. It ends the COPY on all the open connections and closes
|
* CitusCopyDestReceiver. It ends the COPY on all the open connections, closes
|
||||||
* the relation.
|
* the relation and increments the query stat counters based on the shards
|
||||||
|
* copied into if requested.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
|
@ -2601,6 +2612,26 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
ListCell *connectionStateCell = NULL;
|
ListCell *connectionStateCell = NULL;
|
||||||
Relation distributedRelation = copyDest->distributedRelation;
|
Relation distributedRelation = copyDest->distributedRelation;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Increment the query stat counters based on the shards copied into
|
||||||
|
* if requested.
|
||||||
|
*/
|
||||||
|
if (copyDest->trackQueryCounters)
|
||||||
|
{
|
||||||
|
int copiedShardCount =
|
||||||
|
copyDest->shardStateHash ?
|
||||||
|
hash_get_num_entries(copyDest->shardStateHash) :
|
||||||
|
0;
|
||||||
|
if (copiedShardCount <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
||||||
|
|
||||||
FinishLocalColocatedIntermediateFiles(copyDest);
|
FinishLocalColocatedIntermediateFiles(copyDest);
|
||||||
|
|
@ -3076,6 +3107,15 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
|
|
||||||
SendCopyEnd(copyOutState);
|
SendCopyEnd(copyOutState);
|
||||||
|
|
||||||
|
if (list_length(shardIntervalList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
table_close(distributedRelation, AccessShareLock);
|
table_close(distributedRelation, AccessShareLock);
|
||||||
|
|
||||||
if (completionTag != NULL)
|
if (completionTag != NULL)
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/run_from_same_connection.h"
|
#include "distributed/run_from_same_connection.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/time_constants.h"
|
#include "distributed/time_constants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
|
|
@ -354,6 +355,18 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
|
||||||
if (connection)
|
if (connection)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Increment the connection stat counter for the connections that are
|
||||||
|
* reused only if the connection is in a good state. Here we don't
|
||||||
|
* bother shutting down the connection or such if it is not in a good
|
||||||
|
* state but we mostly want to avoid incrementing the connection stat
|
||||||
|
* counter for a connection that the caller cannot really use.
|
||||||
|
*/
|
||||||
|
if (PQstatus(connection->pgConn) == CONNECTION_OK)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_REUSED);
|
||||||
|
}
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -395,6 +408,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
dlist_delete(&connection->connectionNode);
|
dlist_delete(&connection->connectionNode);
|
||||||
pfree(connection);
|
pfree(connection);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for the optional
|
||||||
|
* connections that we gave up establishing due to connection throttling
|
||||||
|
* because the callers who request optional connections know how to
|
||||||
|
* survive without them.
|
||||||
|
*/
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -981,6 +1000,14 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
{
|
{
|
||||||
waitCount++;
|
waitCount++;
|
||||||
}
|
}
|
||||||
|
else if (connectionState->phase == MULTI_CONNECTION_PHASE_ERROR)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Here we count the connections establishments that failed and that
|
||||||
|
* we won't wait anymore.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* prepare space for socket events */
|
/* prepare space for socket events */
|
||||||
|
|
@ -1025,6 +1052,11 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
|
|
||||||
if (event->events & WL_POSTMASTER_DEATH)
|
if (event->events & WL_POSTMASTER_DEATH)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for the
|
||||||
|
* optional failed connections because this is not a connection
|
||||||
|
* failure, but a postmaster death in the local node.
|
||||||
|
*/
|
||||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1041,6 +1073,12 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
* reset the memory context
|
* reset the memory context
|
||||||
*/
|
*/
|
||||||
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
|
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Similarly, we don't increment the connection stat counter for the
|
||||||
|
* failed connections here because this is not a connection failure
|
||||||
|
* but a cancellation request is received.
|
||||||
|
*/
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1071,6 +1109,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
eventMask, NULL);
|
eventMask, NULL);
|
||||||
if (!success)
|
if (!success)
|
||||||
{
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("connection establishment for node %s:%d "
|
errmsg("connection establishment for node %s:%d "
|
||||||
"failed", connection->hostname,
|
"failed", connection->hostname,
|
||||||
|
|
@ -1087,7 +1126,15 @@ FinishConnectionListEstablishment(List *multiConnectionList)
|
||||||
*/
|
*/
|
||||||
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
|
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTED)
|
||||||
{
|
{
|
||||||
MarkConnectionConnected(connectionState->connection);
|
/*
|
||||||
|
* Since WaitEventSetFromMultiConnectionStates() only adds the
|
||||||
|
* connections that we haven't completed the connection
|
||||||
|
* establishment yet, here we always have a new connection.
|
||||||
|
* In other words, at this point, we surely know that we're
|
||||||
|
* not dealing with a cached connection.
|
||||||
|
*/
|
||||||
|
bool newConnection = true;
|
||||||
|
MarkConnectionConnected(connectionState->connection, newConnection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1171,6 +1218,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
|
||||||
|
|
||||||
/* close connection, otherwise we take up resource on the other side */
|
/* close connection, otherwise we take up resource on the other side */
|
||||||
CitusPQFinish(connection);
|
CitusPQFinish(connection);
|
||||||
|
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1583,7 +1632,7 @@ RemoteTransactionIdle(MultiConnection *connection)
|
||||||
* establishment time when necessary.
|
* establishment time when necessary.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
MarkConnectionConnected(MultiConnection *connection)
|
MarkConnectionConnected(MultiConnection *connection, bool newConnection)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
connection->connectionState = MULTI_CONNECTION_CONNECTED;
|
||||||
|
|
||||||
|
|
@ -1591,6 +1640,11 @@ MarkConnectionConnected(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
|
INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (newConnection)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,7 @@
|
||||||
#include "distributed/repartition_join_execution.h"
|
#include "distributed/repartition_join_execution.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/transaction_identifier.h"
|
#include "distributed/transaction_identifier.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
|
@ -692,7 +693,7 @@ static bool SendNextQuery(TaskPlacementExecution *placementExecution,
|
||||||
WorkerSession *session);
|
WorkerSession *session);
|
||||||
static void ConnectionStateMachine(WorkerSession *session);
|
static void ConnectionStateMachine(WorkerSession *session);
|
||||||
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
||||||
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
static void HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection);
|
||||||
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
||||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||||
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
|
||||||
|
|
@ -2029,6 +2030,7 @@ ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -2813,21 +2815,21 @@ CheckConnectionTimeout(WorkerPool *workerPool)
|
||||||
logLevel = ERROR;
|
logLevel = ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
|
||||||
errmsg("could not establish any connections to the node "
|
|
||||||
"%s:%d after %u ms", workerPool->nodeName,
|
|
||||||
workerPool->nodePort,
|
|
||||||
NodeConnectionTimeout)));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We hit the connection timeout. In that case, we should not let the
|
* We hit the connection timeout. In that case, we should not let the
|
||||||
* connection establishment to continue because the execution logic
|
* connection establishment to continue because the execution logic
|
||||||
* pretends that failed sessions are not going to be used anymore.
|
* pretends that failed sessions are not going to be used anymore.
|
||||||
*
|
*
|
||||||
* That's why we mark the connection as timed out to trigger the state
|
* That's why we mark the connection as timed out to trigger the state
|
||||||
* changes in the executor.
|
* changes in the executor, if we don't throw an error below.
|
||||||
*/
|
*/
|
||||||
MarkEstablishingSessionsTimedOut(workerPool);
|
MarkEstablishingSessionsTimedOut(workerPool);
|
||||||
|
|
||||||
|
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not establish any connections to the node "
|
||||||
|
"%s:%d after %u ms", workerPool->nodeName,
|
||||||
|
workerPool->nodePort,
|
||||||
|
NodeConnectionTimeout)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
@ -2855,6 +2857,7 @@ MarkEstablishingSessionsTimedOut(WorkerPool *workerPool)
|
||||||
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
connection->connectionState == MULTI_CONNECTION_INITIAL)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
connection->connectionState = MULTI_CONNECTION_TIMED_OUT;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -3012,6 +3015,10 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
* the state machines might have already progressed and used
|
* the state machines might have already progressed and used
|
||||||
* new pools/sessions instead. That's why we terminate the
|
* new pools/sessions instead. That's why we terminate the
|
||||||
* connection, clear any state associated with it.
|
* connection, clear any state associated with it.
|
||||||
|
*
|
||||||
|
* Note that here we don't increment the failed connection
|
||||||
|
* stat counter because MarkEstablishingSessionsTimedOut()
|
||||||
|
* already did that.
|
||||||
*/
|
*/
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
break;
|
break;
|
||||||
|
|
@ -3022,7 +3029,12 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
ConnStatusType status = PQstatus(connection->pgConn);
|
ConnStatusType status = PQstatus(connection->pgConn);
|
||||||
if (status == CONNECTION_OK)
|
if (status == CONNECTION_OK)
|
||||||
{
|
{
|
||||||
HandleMultiConnectionSuccess(session);
|
/*
|
||||||
|
* Connection was already established, possibly a cached
|
||||||
|
* connection.
|
||||||
|
*/
|
||||||
|
bool newConnection = false;
|
||||||
|
HandleMultiConnectionSuccess(session, newConnection);
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
break;
|
break;
|
||||||
|
|
@ -3030,6 +3042,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
else if (status == CONNECTION_BAD)
|
else if (status == CONNECTION_BAD)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3045,6 +3058,7 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
if (pollMode == PGRES_POLLING_FAILED)
|
if (pollMode == PGRES_POLLING_FAILED)
|
||||||
{
|
{
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
IncrementStatCounterForMyDb(STAT_CONNECTION_ESTABLISHMENT_FAILED);
|
||||||
}
|
}
|
||||||
else if (pollMode == PGRES_POLLING_READING)
|
else if (pollMode == PGRES_POLLING_READING)
|
||||||
{
|
{
|
||||||
|
|
@ -3062,7 +3076,12 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
HandleMultiConnectionSuccess(session);
|
/*
|
||||||
|
* Connection was not established befoore (!= CONNECTION_OK)
|
||||||
|
* but PQconnectPoll() did so now.
|
||||||
|
*/
|
||||||
|
bool newConnection = true;
|
||||||
|
HandleMultiConnectionSuccess(session, newConnection);
|
||||||
UpdateConnectionWaitFlags(session,
|
UpdateConnectionWaitFlags(session,
|
||||||
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
|
||||||
|
|
||||||
|
|
@ -3140,6 +3159,11 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Here we don't increment the connection stat counter for failed
|
||||||
|
* connections because we don't track the connections that we could
|
||||||
|
* establish but lost later.
|
||||||
|
*/
|
||||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -3302,12 +3326,12 @@ HasUnfinishedTaskForSession(WorkerSession *session)
|
||||||
* connection's state.
|
* connection's state.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
HandleMultiConnectionSuccess(WorkerSession *session)
|
HandleMultiConnectionSuccess(WorkerSession *session, bool newConnection)
|
||||||
{
|
{
|
||||||
MultiConnection *connection = session->connection;
|
MultiConnection *connection = session->connection;
|
||||||
WorkerPool *workerPool = session->workerPool;
|
WorkerPool *workerPool = session->workerPool;
|
||||||
|
|
||||||
MarkConnectionConnected(connection);
|
MarkConnectionConnected(connection, newConnection);
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
ereport(DEBUG4, (errmsg("established connection to %s:%d for "
|
||||||
"session %ld in %ld microseconds",
|
"session %ld in %ld microseconds",
|
||||||
|
|
|
||||||
|
|
@ -43,8 +43,9 @@
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/query_stats.h"
|
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
|
#include "distributed/stats/query_stats.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
|
@ -206,7 +207,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
|
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* INSERT..SELECT via coordinator or re-partitioning are special because
|
* INSERT..SELECT / MERGE via coordinator or re-partitioning are special because
|
||||||
* the SELECT part is planned separately.
|
* the SELECT part is planned separately.
|
||||||
*/
|
*/
|
||||||
return;
|
return;
|
||||||
|
|
@ -262,8 +263,19 @@ CitusExecScan(CustomScanState *node)
|
||||||
|
|
||||||
if (!scanState->finishedRemoteScan)
|
if (!scanState->finishedRemoteScan)
|
||||||
{
|
{
|
||||||
|
bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan);
|
||||||
|
|
||||||
AdaptiveExecutor(scanState);
|
AdaptiveExecutor(scanState);
|
||||||
|
|
||||||
|
if (isMultiTaskPlan)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
|
@ -183,6 +184,22 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
targetRelation,
|
targetRelation,
|
||||||
binaryFormat);
|
binaryFormat);
|
||||||
|
|
||||||
|
if (list_length(distSelectTaskList) <= 1)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Probably we will never get here for a repartitioned
|
||||||
|
* INSERT..SELECT because when the source is a single shard
|
||||||
|
* table, we should most probably choose to use
|
||||||
|
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
|
||||||
|
* here.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point select query has been executed on workers and results
|
* At this point select query has been executed on workers and results
|
||||||
* have been fetched in such a way that they are colocated with corresponding
|
* have been fetched in such a way that they are colocated with corresponding
|
||||||
|
|
@ -203,6 +220,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
taskList, tupleDest,
|
taskList, tupleDest,
|
||||||
hasReturning);
|
hasReturning);
|
||||||
|
|
||||||
|
if (list_length(taskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsInserted;
|
executorState->es_processed = rowsInserted;
|
||||||
|
|
||||||
if (SortReturning && hasReturning)
|
if (SortReturning && hasReturning)
|
||||||
|
|
@ -277,6 +303,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
|
||||||
SortTupleStore(scanState);
|
SortTupleStore(scanState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (list_length(prunedTaskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
@ -318,6 +353,12 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
||||||
columnNameList);
|
columnNameList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't track query counters for the COPY commands that are executed to
|
||||||
|
* prepare intermediate results.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the intermediate table */
|
/* set up a DestReceiver that copies into the intermediate table */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
|
|
@ -325,7 +366,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState,
|
executorState,
|
||||||
intermediateResultIdPrefix,
|
intermediateResultIdPrefix,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
|
|
@ -354,13 +396,20 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
|
||||||
columnNameList);
|
columnNameList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We want to track query counters for the COPY commands that are executed to
|
||||||
|
* perform the final INSERT for such INSERT..SELECT queries.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = true;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the distributed table */
|
/* set up a DestReceiver that copies into the distributed table */
|
||||||
const bool publishableData = true;
|
const bool publishableData = true;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
columnNameList,
|
columnNameList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState, NULL,
|
executorState, NULL,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -104,8 +104,8 @@
|
||||||
#include "distributed/query_utils.h"
|
#include "distributed/query_utils.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
|
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
|
||||||
|
#include "distributed/stats/stat_tenants.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
|
|
||||||
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
|
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
|
||||||
|
|
@ -166,6 +167,21 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
distSourceTaskList, partitionColumnIndex,
|
distSourceTaskList, partitionColumnIndex,
|
||||||
targetRelation, binaryFormat);
|
targetRelation, binaryFormat);
|
||||||
|
|
||||||
|
if (list_length(distSourceTaskList) <= 1)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Probably we will never get here for a repartitioned MERGE
|
||||||
|
* because when the source is a single shard table, we should
|
||||||
|
* most probably choose to use ExecuteSourceAtCoordAndRedistribution(),
|
||||||
|
* but we still keep this here.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
|
||||||
"intermediate results")));
|
"intermediate results")));
|
||||||
|
|
||||||
|
|
@ -193,6 +209,16 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
|
||||||
tupleDest,
|
tupleDest,
|
||||||
hasReturning,
|
hasReturning,
|
||||||
paramListInfo);
|
paramListInfo);
|
||||||
|
|
||||||
|
if (list_length(taskList) <= 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,7 +298,11 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
|
|
||||||
if (prunedTaskList == NIL)
|
if (prunedTaskList == NIL)
|
||||||
{
|
{
|
||||||
/* No task to execute */
|
/*
|
||||||
|
* No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD
|
||||||
|
* as per our convention.
|
||||||
|
*/
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -292,6 +322,16 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
||||||
tupleDest,
|
tupleDest,
|
||||||
hasReturning,
|
hasReturning,
|
||||||
paramListInfo);
|
paramListInfo);
|
||||||
|
|
||||||
|
if (list_length(prunedTaskList) == 1)
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
|
||||||
|
}
|
||||||
|
|
||||||
executorState->es_processed = rowsMerged;
|
executorState->es_processed = rowsMerged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -317,6 +357,12 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
List *columnNameList =
|
List *columnNameList =
|
||||||
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
|
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't track query counters for the COPY commands that are executed to
|
||||||
|
* prepare intermediate results.
|
||||||
|
*/
|
||||||
|
const bool trackQueryCounters = false;
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the intermediate file */
|
/* set up a DestReceiver that copies into the intermediate file */
|
||||||
const bool publishableData = false;
|
const bool publishableData = false;
|
||||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||||
|
|
@ -324,7 +370,8 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
executorState,
|
executorState,
|
||||||
intermediateResultIdPrefix,
|
intermediateResultIdPrefix,
|
||||||
publishableData);
|
publishableData,
|
||||||
|
trackQueryCounters);
|
||||||
|
|
||||||
/* We can skip when writing to intermediate files */
|
/* We can skip when writing to intermediate files */
|
||||||
copyDest->skipCoercions = true;
|
copyDest->skipCoercions = true;
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
#include "distributed/stats/stat_tenants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@
|
||||||
#include "distributed/recursive_planning.h"
|
#include "distributed/recursive_planning.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
#include "distributed/stats/stat_tenants.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_shard_visibility.h"
|
#include "distributed/worker_shard_visibility.h"
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,6 @@
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/priority.h"
|
#include "distributed/priority.h"
|
||||||
#include "distributed/query_pushdown_planning.h"
|
#include "distributed/query_pushdown_planning.h"
|
||||||
#include "distributed/query_stats.h"
|
|
||||||
#include "distributed/recursive_planning.h"
|
#include "distributed/recursive_planning.h"
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
|
|
@ -105,11 +104,13 @@
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
|
#include "distributed/stats/query_stats.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
|
#include "distributed/stats/stat_tenants.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/time_constants.h"
|
#include "distributed/time_constants.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/transaction_recovery.h"
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
|
||||||
#include "distributed/utils/directory.h"
|
#include "distributed/utils/directory.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
|
@ -190,8 +191,10 @@ static void ResizeStackToMaximumDepth(void);
|
||||||
static void multi_log_hook(ErrorData *edata);
|
static void multi_log_hook(ErrorData *edata);
|
||||||
static bool IsSequenceOverflowError(ErrorData *edata);
|
static bool IsSequenceOverflowError(ErrorData *edata);
|
||||||
static void RegisterConnectionCleanup(void);
|
static void RegisterConnectionCleanup(void);
|
||||||
|
static void RegisterSaveBackendStatsIntoSavedBackendStatsHash(void);
|
||||||
static void RegisterExternalClientBackendCounterDecrement(void);
|
static void RegisterExternalClientBackendCounterDecrement(void);
|
||||||
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
|
||||||
|
static void SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg);
|
||||||
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
static void DecrementExternalClientBackendCounterAtExit(int code, Datum arg);
|
||||||
static void CreateRequiredDirectories(void);
|
static void CreateRequiredDirectories(void);
|
||||||
static void RegisterCitusConfigVariables(void);
|
static void RegisterCitusConfigVariables(void);
|
||||||
|
|
@ -509,6 +512,8 @@ _PG_init(void)
|
||||||
InitializeShardSplitSMHandleManagement();
|
InitializeShardSplitSMHandleManagement();
|
||||||
|
|
||||||
InitializeMultiTenantMonitorSMHandleManagement();
|
InitializeMultiTenantMonitorSMHandleManagement();
|
||||||
|
InitializeStatCountersShmem();
|
||||||
|
|
||||||
|
|
||||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||||
if (IsBinaryUpgrade)
|
if (IsBinaryUpgrade)
|
||||||
|
|
@ -622,6 +627,8 @@ citus_shmem_request(void)
|
||||||
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
|
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
|
||||||
RequestAddinShmemSpace(LogicalClockShmemSize());
|
RequestAddinShmemSpace(LogicalClockShmemSize());
|
||||||
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
|
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
|
||||||
|
RequestAddinShmemSpace(StatCountersShmemSize());
|
||||||
|
RequestNamedLWLockTranche(SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -797,6 +804,8 @@ StartupCitusBackend(void)
|
||||||
|
|
||||||
SetBackendDataDatabaseId();
|
SetBackendDataDatabaseId();
|
||||||
RegisterConnectionCleanup();
|
RegisterConnectionCleanup();
|
||||||
|
RegisterSaveBackendStatsIntoSavedBackendStatsHash();
|
||||||
|
|
||||||
FinishedStartupCitusBackend = true;
|
FinishedStartupCitusBackend = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -834,6 +843,24 @@ RegisterConnectionCleanup(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RegisterSaveBackendStatsIntoSavedBackendStatsHash registers the function
|
||||||
|
* that saves the backend stats for the exited backends into the saved backend
|
||||||
|
* stats hash.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RegisterSaveBackendStatsIntoSavedBackendStatsHash(void)
|
||||||
|
{
|
||||||
|
static bool registeredSaveBackendStats = false;
|
||||||
|
if (registeredSaveBackendStats == false)
|
||||||
|
{
|
||||||
|
before_shmem_exit(SaveBackendStatsIntoSavedBackendStatsHashAtExit, 0);
|
||||||
|
|
||||||
|
registeredSaveBackendStats = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
|
* RegisterExternalClientBackendCounterDecrement is called when the backend terminates.
|
||||||
* For all client backends, we register a callback that will undo
|
* For all client backends, we register a callback that will undo
|
||||||
|
|
@ -874,6 +901,24 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHashAtExit is called before_shmem_exit()
|
||||||
|
* of the backend for the purposes of saving the backend stats for the exited
|
||||||
|
* backends into the saved backend stats hash.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHashAtExit(int code, Datum arg)
|
||||||
|
{
|
||||||
|
if (code)
|
||||||
|
{
|
||||||
|
/* don't try to save the stats during a crash */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHash();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
|
* DecrementExternalClientBackendCounterAtExit is called before_shmem_exit() of the
|
||||||
* backend for the purposes decrementing
|
* backend for the purposes decrementing
|
||||||
|
|
@ -1476,6 +1521,20 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_stat_counters",
|
||||||
|
gettext_noop("Enables the collection of statistic counters for Citus."),
|
||||||
|
gettext_noop("When enabled, Citus maintains a set of statistic "
|
||||||
|
"counters for the Citus extension. These statistics are "
|
||||||
|
"available in the citus_stat_counters view and are "
|
||||||
|
"lost on server shutdown and can be reset by executing "
|
||||||
|
"the function citus_stat_counters_reset() on demand."),
|
||||||
|
&EnableStatCounters,
|
||||||
|
ENABLE_STAT_COUNTERS_DEFAULT,
|
||||||
|
PGC_SUSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_statistics_collection",
|
"citus.enable_statistics_collection",
|
||||||
gettext_noop("Enables sending basic usage statistics to Citus."),
|
gettext_noop("Enables sending basic usage statistics to Citus."),
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/query_stats.h"
|
#include "distributed/stats/query_stats.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
|
||||||
|
|
@ -0,0 +1,973 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* stat_counters.c
|
||||||
|
*
|
||||||
|
* This file contains functions to track various statistic counters for
|
||||||
|
* Citus.
|
||||||
|
*
|
||||||
|
* We create an array of "BackendStatsSlot"s in shared memory, one for
|
||||||
|
* each backend. Each backend increments its own stat counters in its
|
||||||
|
* own slot via IncrementStatCounterForMyDb(). And when a backend exits,
|
||||||
|
* it saves its stat counters from its slot via
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash() into a hash table in
|
||||||
|
* shared memory, whose entries are "SavedBackendStatsHashEntry"s and
|
||||||
|
* the key is the database id. In other words, each entry of the hash
|
||||||
|
* table is used to aggregate the stat counters for backends that were
|
||||||
|
* connected to that database and exited since the last server restart.
|
||||||
|
* Plus, each entry is responsible for keeping track of the reset
|
||||||
|
* timestamp for both active and exited backends too.
|
||||||
|
* Note that today we don't evict the entries of the said hash table
|
||||||
|
* that point to dropped databases because the wrapper view anyway
|
||||||
|
* filters them out (thanks to LEFT JOIN) and we don't expect a
|
||||||
|
* performance hit due to that unless users have a lot of databases
|
||||||
|
* that are dropped and recreated frequently.
|
||||||
|
*
|
||||||
|
* The reason why we save the stat counters for exited backends in the
|
||||||
|
* shared hash table is that we cannot guarantee that the backend slot
|
||||||
|
* that was used by an exited backend will be reused by another backend
|
||||||
|
* connected to the same database. For this reason, we need to save the
|
||||||
|
* stat counters for exited backends into a shared hash table so that we
|
||||||
|
* can reset the counters within the corresponding backend slots while
|
||||||
|
* the backends exit.
|
||||||
|
*
|
||||||
|
* When citus_stat_counters() is called, we first aggregate the stat
|
||||||
|
* counters from the backend slots of all the active backends and then
|
||||||
|
* we add the aggregated stat counters from the exited backends that
|
||||||
|
* are stored in the shared hash table. Also, we don't persist backend
|
||||||
|
* stats on server shutdown, but we might want to do that in the future.
|
||||||
|
*
|
||||||
|
* Similarly, when citus_stat_counters_reset() is called, we reset the
|
||||||
|
* stat counters for the active backends and the exited backends that are
|
||||||
|
* stored in the shared hash table. Then, it also updates the
|
||||||
|
* resetTimestamp in the shared hash table entry appropriately. So,
|
||||||
|
* similarly, when citus_stat_counters() is called, we just report
|
||||||
|
* resetTimestamp as stats_reset column.
|
||||||
|
*
|
||||||
|
* Caveats:
|
||||||
|
*
|
||||||
|
* There is chance that citus_stat_counters_reset() might race with a
|
||||||
|
* backend that is trying to increment one of the counters in its slot
|
||||||
|
* and as a result it can effectively fail to reset that counter due to
|
||||||
|
* the reasons documented in IncrementStatCounterForMyDb() function.
|
||||||
|
* However, this should be a very rare case and we can live with that
|
||||||
|
* for now.
|
||||||
|
*
|
||||||
|
* Also, citus_stat_counters() might observe the counters for a backend
|
||||||
|
* twice or perhaps unsee it if it's concurrently exiting, depending on
|
||||||
|
* the order we call CollectActiveBackendStatsIntoHTAB() and
|
||||||
|
* CollectSavedBackendStatsIntoHTAB() in citus_stat_counters(). However,
|
||||||
|
* the next call to citus_stat_counters() will see the correct values
|
||||||
|
* for the counters, so we can live with that for now.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "common/hashfn.h"
|
||||||
|
#include "port/atomics.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
#include "pg_version_compat.h"
|
||||||
|
|
||||||
|
#include "distributed/argutils.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/stats/stat_counters.h"
|
||||||
|
#include "distributed/tuplestore.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* saved backend stats - hash table constants
|
||||||
|
*
|
||||||
|
* Configurations used to create the hash table for saved backend stats.
|
||||||
|
* The places where SAVED_BACKEND_STATS_HASH_MAX_DATABASES is used do not
|
||||||
|
* impose a hard limit on the number of databases that can be tracked but
|
||||||
|
* in ShmemInitHash() it's documented that the access efficiency will degrade
|
||||||
|
* if it is exceeded substantially.
|
||||||
|
*
|
||||||
|
* XXX: Consider using dshash_table instead of (shared) HTAB if that becomes
|
||||||
|
* a concern.
|
||||||
|
*/
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_INIT_DATABASES 8
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_MAX_DATABASES 1024
|
||||||
|
|
||||||
|
|
||||||
|
/* fixed size array types to store the stat counters */
|
||||||
|
typedef pg_atomic_uint64 AtomicStatCounters[N_CITUS_STAT_COUNTERS];
|
||||||
|
typedef uint64 StatCounters[N_CITUS_STAT_COUNTERS];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* saved backend stats - hash entry definition
|
||||||
|
*
|
||||||
|
* This is used to define & access the shared hash table used to aggregate the stat
|
||||||
|
* counters for the backends exited so far since last server restart. It's also
|
||||||
|
* responsible for keeping track of the reset timestamp.
|
||||||
|
*/
|
||||||
|
typedef struct SavedBackendStatsHashEntry
|
||||||
|
{
|
||||||
|
/* hash entry key, must always be the first */
|
||||||
|
Oid databaseId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Needs to be locked whenever we read / write counters or resetTimestamp
|
||||||
|
* in this struct since we don't use atomic counters for this struct. Plus,
|
||||||
|
* we want to update the stat counters and resetTimestamp atomically.
|
||||||
|
*/
|
||||||
|
slock_t mutex;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* While "counters" only represents the stat counters for exited backends,
|
||||||
|
* the "resetTimestamp" doesn't only represent the reset timestamp for exited
|
||||||
|
* backends' stat counters but also for the active backends.
|
||||||
|
*/
|
||||||
|
StatCounters counters;
|
||||||
|
TimestampTz resetTimestamp;
|
||||||
|
} SavedBackendStatsHashEntry;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Hash entry definition used for the local hash table created by
|
||||||
|
* citus_stat_counters() at the runtime to aggregate the stat counters
|
||||||
|
* across all backends.
|
||||||
|
*/
|
||||||
|
typedef struct DatabaseStatsHashEntry
|
||||||
|
{
|
||||||
|
/* hash entry key, must always be the first */
|
||||||
|
Oid databaseId;
|
||||||
|
|
||||||
|
StatCounters counters;
|
||||||
|
TimestampTz resetTimestamp;
|
||||||
|
} DatabaseStatsHashEntry;
|
||||||
|
|
||||||
|
/* definition of a one per-backend stat counters slot in shared memory */
|
||||||
|
typedef struct BackendStatsSlot
|
||||||
|
{
|
||||||
|
AtomicStatCounters counters;
|
||||||
|
} BackendStatsSlot;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GUC variable
|
||||||
|
*
|
||||||
|
* This only controls whether we track the stat counters or not, via
|
||||||
|
* IncrementStatCounterForMyDb() and
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash(). In other words, even
|
||||||
|
* when the GUC is disabled, we still allocate the shared memory
|
||||||
|
* structures etc. and citus_stat_counters() / citus_stat_counters_reset()
|
||||||
|
* will still work.
|
||||||
|
*/
|
||||||
|
bool EnableStatCounters = ENABLE_STAT_COUNTERS_DEFAULT;
|
||||||
|
|
||||||
|
/* saved backend stats - shared memory variables */
|
||||||
|
static LWLockId *SharedSavedBackendStatsHashLock = NULL;
|
||||||
|
static HTAB *SharedSavedBackendStatsHash = NULL;
|
||||||
|
|
||||||
|
/* per-backend stat counter slots - shared memory array */
|
||||||
|
BackendStatsSlot *SharedBackendStatsSlotArray = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't expect the callsites that check this (via
|
||||||
|
* EnsureStatCountersShmemInitDone()) to be executed before
|
||||||
|
* StatCountersShmemInit() is done. Plus, once StatCountersShmemInit()
|
||||||
|
* is done, we also don't expect shared memory variables to be
|
||||||
|
* initialized improperly. However, we still set this to true only
|
||||||
|
* once StatCountersShmemInit() is done and if all three of the shared
|
||||||
|
* memory variables above are initialized properly. And in the callsites
|
||||||
|
* where these shared memory variables are accessed, we check this
|
||||||
|
* variable first just to be on the safe side.
|
||||||
|
*/
|
||||||
|
static bool StatCountersShmemInitDone = false;
|
||||||
|
|
||||||
|
/* saved shmem_startup_hook */
|
||||||
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
|
||||||
|
|
||||||
|
/* shared memory init & management */
|
||||||
|
static void StatCountersShmemInit(void);
|
||||||
|
static Size SharedBackendStatsSlotArrayShmemSize(void);
|
||||||
|
|
||||||
|
/* helper functions for citus_stat_counters() */
|
||||||
|
static void CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
|
||||||
|
static void CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats);
|
||||||
|
static DatabaseStatsHashEntry * DatabaseStatsHashEntryFindOrCreate(Oid databaseId,
|
||||||
|
HTAB *databaseStats);
|
||||||
|
static void StoreDatabaseStatsIntoTupStore(HTAB *databaseStats,
|
||||||
|
Tuplestorestate *tupleStore,
|
||||||
|
TupleDesc tupleDescriptor);
|
||||||
|
|
||||||
|
/* helper functions for citus_stat_counters_reset() */
|
||||||
|
static bool ResetActiveBackendStats(Oid databaseId);
|
||||||
|
static void ResetSavedBackendStats(Oid databaseId, bool force);
|
||||||
|
|
||||||
|
/* saved backend stats */
|
||||||
|
static SavedBackendStatsHashEntry * SavedBackendStatsHashEntryCreateIfNotExists(Oid
|
||||||
|
databaseId);
|
||||||
|
|
||||||
|
|
||||||
|
/* sql exports */
|
||||||
|
PG_FUNCTION_INFO_V1(citus_stat_counters);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_stat_counters_reset);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureStatCountersShmemInitDone returns true if the shared memory
|
||||||
|
* data structures used for keeping track of stat counters have been
|
||||||
|
* properly initialized, otherwise, returns false and emits a warning.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
EnsureStatCountersShmemInitDone(void)
|
||||||
|
{
|
||||||
|
if (!StatCountersShmemInitDone)
|
||||||
|
{
|
||||||
|
ereport(WARNING,
|
||||||
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("shared memory for stat counters was not properly initialized")));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_stat_counters returns stats counters for the given database id.
|
||||||
|
*
|
||||||
|
* This only returns rows for the databases which have been connected to
|
||||||
|
* by at least one backend since the last server restart (even if no
|
||||||
|
* observations have been made for none of the counters or if they were
|
||||||
|
* reset) and it considers such a database even if it has been dropped later.
|
||||||
|
*
|
||||||
|
* When InvalidOid is provided, all such databases are considered; otherwise
|
||||||
|
* only the database with the given id is considered.
|
||||||
|
*
|
||||||
|
* So, as an outcome, when a database id that is different than InvalidOid
|
||||||
|
* is provided and no backend has connected to it since the last server
|
||||||
|
* restart, or, if we didn't ever have such a database, then the function
|
||||||
|
* returns an empty set.
|
||||||
|
*
|
||||||
|
* Finally, stats_reset column is set to NULL if the stat counters for the
|
||||||
|
* database were never reset since the last server restart.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_stat_counters(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function's sql definition allows Postgres to silently
|
||||||
|
* ignore NULL, but we still check.
|
||||||
|
*/
|
||||||
|
PG_ENSURE_ARGNOTNULL(0, "database_id");
|
||||||
|
Oid databaseId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
|
||||||
|
HASHCTL info;
|
||||||
|
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
|
||||||
|
memset(&info, 0, sizeof(info));
|
||||||
|
info.keysize = sizeof(Oid);
|
||||||
|
info.hash = oid_hash;
|
||||||
|
info.entrysize = sizeof(DatabaseStatsHashEntry);
|
||||||
|
|
||||||
|
HTAB *databaseStats = hash_create("Citus Database Stats Collect Hash", 8, &info,
|
||||||
|
hashFlags);
|
||||||
|
|
||||||
|
CollectActiveBackendStatsIntoHTAB(databaseId, databaseStats);
|
||||||
|
CollectSavedBackendStatsIntoHTAB(databaseId, databaseStats);
|
||||||
|
|
||||||
|
StoreDatabaseStatsIntoTupStore(databaseStats, tupleStore, tupleDescriptor);
|
||||||
|
|
||||||
|
hash_destroy(databaseStats);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_stat_counters_reset resets Citus stat counters for given database
|
||||||
|
* id or for the current database if InvalidOid is provided.
|
||||||
|
*
|
||||||
|
* If a valid database id is provided, stat counters for that database are
|
||||||
|
* reset, even if it was dropped later.
|
||||||
|
*
|
||||||
|
* Otherwise, if the provided database id is not valid, then the function
|
||||||
|
* effectively does nothing.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_stat_counters_reset(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function's sql definition allows Postgres to silently
|
||||||
|
* ignore NULL, but we still check.
|
||||||
|
*/
|
||||||
|
PG_ENSURE_ARGNOTNULL(0, "database_id");
|
||||||
|
Oid databaseId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the database id is InvalidOid, then we assume that
|
||||||
|
* the caller wants to reset the stat counters for the
|
||||||
|
* current database.
|
||||||
|
*/
|
||||||
|
if (databaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
databaseId = MyDatabaseId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool foundAnyBackendsForDb = ResetActiveBackendStats(databaseId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Even when we don't have an entry for the given database id in the
|
||||||
|
* saved backend stats hash table, we still want to create one for
|
||||||
|
* it to save the resetTimestamp if we currently have at least backend
|
||||||
|
* connected to it. By providing foundAnyBackendsForDb, we effectively
|
||||||
|
* let the function do that. Since ResetActiveBackendStats() doesn't
|
||||||
|
* filter the active backends, foundAnyBackendsForDb being true
|
||||||
|
* not always means that at least one backend is connected to it right
|
||||||
|
* now, but it means that we had such a backend at some point in time
|
||||||
|
* since the last server restart. If all backends refered to in the
|
||||||
|
* shared array are already exited, then we should already have an
|
||||||
|
* entry for it in the saved backend stats hash table, so providing
|
||||||
|
* a "true" wouldn't do anything in that case. Otherwise, if at least
|
||||||
|
* one backend is still connected to it, providing a "true" will
|
||||||
|
* effectively create a new entry for it if it doesn't exist yet,
|
||||||
|
* which is what we actually want to do.
|
||||||
|
*
|
||||||
|
* That way, we can save the resetTimestamp for the active backends
|
||||||
|
* into the relevant entry of the saved backend stats hash table.
|
||||||
|
* Note that we don't do that for the databases that don't have
|
||||||
|
* any active backends connected to them because we actually don't
|
||||||
|
* reset anything for such databases.
|
||||||
|
*/
|
||||||
|
ResetSavedBackendStats(databaseId, foundAnyBackendsForDb);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeStatCountersShmem saves the previous shmem_startup_hook and sets
|
||||||
|
* up a new shmem_startup_hook for initializing the shared memory data structures
|
||||||
|
* used for keeping track of stat counters.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeStatCountersShmem(void)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
|
shmem_startup_hook = StatCountersShmemInit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StatCountersShmemSize calculates and returns shared memory size
|
||||||
|
* required for the shared memory data structures used for keeping track of
|
||||||
|
* stat counters.
|
||||||
|
*/
|
||||||
|
Size
|
||||||
|
StatCountersShmemSize(void)
|
||||||
|
{
|
||||||
|
Size backendStatsSlotArraySize = SharedBackendStatsSlotArrayShmemSize();
|
||||||
|
Size savedBackendStatsHashLockSize = MAXALIGN(sizeof(LWLockId));
|
||||||
|
Size savedBackendStatsHashSize = hash_estimate_size(
|
||||||
|
SAVED_BACKEND_STATS_HASH_MAX_DATABASES, sizeof(SavedBackendStatsHashEntry));
|
||||||
|
|
||||||
|
return add_size(add_size(backendStatsSlotArraySize, savedBackendStatsHashLockSize),
|
||||||
|
savedBackendStatsHashSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IncrementStatCounterForMyDb increments the stat counter for the given statId
|
||||||
|
* for this backend.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
IncrementStatCounterForMyDb(int statId)
|
||||||
|
{
|
||||||
|
if (!EnableStatCounters)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int myBackendSlotIdx = MyProc->pgprocno;
|
||||||
|
BackendStatsSlot *myBackendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[myBackendSlotIdx];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When there cannot be any other writers, incrementing an atomic
|
||||||
|
* counter via pg_atomic_read_u64() and pg_atomic_write_u64() is
|
||||||
|
* same as incrementing it via pg_atomic_fetch_add_u64(). Plus, the
|
||||||
|
* former is cheaper than the latter because the latter has to do
|
||||||
|
* extra work to deal with concurrent writers.
|
||||||
|
*
|
||||||
|
* In our case, the only concurrent writer could be the backend that
|
||||||
|
* is executing citus_stat_counters_reset(). So, there is chance that
|
||||||
|
* we read the counter value, then it gets reset by a concurrent call
|
||||||
|
* made to citus_stat_counters_reset() and then we write the
|
||||||
|
* incremented value back, by effectively overriding the reset value.
|
||||||
|
* But this should be a rare case and we can live with that, for the
|
||||||
|
* sake of lock-free implementation of this function.
|
||||||
|
*/
|
||||||
|
pg_atomic_uint64 *statPtr = &myBackendStatsSlot->counters[statId];
|
||||||
|
pg_atomic_write_u64(statPtr, pg_atomic_read_u64(statPtr) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SaveBackendStatsIntoSavedBackendStatsHash saves the stat counters
|
||||||
|
* for this backend into the saved backend stats hash table.
|
||||||
|
*
|
||||||
|
* So, this is only supposed to be called when a backend exits.
|
||||||
|
*
|
||||||
|
* Also, we do our best to avoid throwing errors in this function because
|
||||||
|
* this function is called when a backend is exiting and throwing errors
|
||||||
|
* at that point will cause the backend to crash.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SaveBackendStatsIntoSavedBackendStatsHash(void)
|
||||||
|
{
|
||||||
|
if (!EnableStatCounters)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* just to be on the safe side */
|
||||||
|
if (!EnsureStatCountersShmemInitDone())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid databaseId = MyDatabaseId;
|
||||||
|
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/* promote the lock to exclusive to insert the new entry for this database */
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry =
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Couldn't allocate a new hash entry because we're out of
|
||||||
|
* (shared) memory. In that case, we just log a warning and
|
||||||
|
* return, instead of throwing an error due to the reasons
|
||||||
|
* mentioned in function's comment.
|
||||||
|
*/
|
||||||
|
ereport(WARNING,
|
||||||
|
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||||
|
errmsg("failed to allocate saved backend stats hash entry")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* re-acquire the shared lock */
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
}
|
||||||
|
|
||||||
|
int myBackendSlotIdx = MyProc->pgprocno;
|
||||||
|
BackendStatsSlot *myBackendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[myBackendSlotIdx];
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx] +=
|
||||||
|
pg_atomic_read_u64(&myBackendStatsSlot->counters[statIdx]);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given that this function is only called when a backend exits, later on
|
||||||
|
* another backend might be assigned to the same slot. So, we reset each
|
||||||
|
* stat counter of this slot to 0 after saving it.
|
||||||
|
*/
|
||||||
|
pg_atomic_write_u64(&myBackendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StatCountersShmemInit initializes the shared memory data structures used
|
||||||
|
* for keeping track of stat counters.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
StatCountersShmemInit(void)
|
||||||
|
{
|
||||||
|
if (prev_shmem_startup_hook != NULL)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook();
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
bool sharedBackendStatsSlotArrayAlreadyInit = false;
|
||||||
|
SharedBackendStatsSlotArray = (BackendStatsSlot *)
|
||||||
|
ShmemInitStruct("Citus Shared Backend Stats Slot Array",
|
||||||
|
SharedBackendStatsSlotArrayShmemSize(),
|
||||||
|
&sharedBackendStatsSlotArrayAlreadyInit);
|
||||||
|
|
||||||
|
bool sharedSavedBackendStatsHashLockAlreadyInit = false;
|
||||||
|
SharedSavedBackendStatsHashLock = ShmemInitStruct(
|
||||||
|
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME,
|
||||||
|
sizeof(LWLockId),
|
||||||
|
&
|
||||||
|
sharedSavedBackendStatsHashLockAlreadyInit);
|
||||||
|
|
||||||
|
HASHCTL hashInfo = {
|
||||||
|
.keysize = sizeof(Oid),
|
||||||
|
.entrysize = sizeof(SavedBackendStatsHashEntry),
|
||||||
|
.hash = oid_hash,
|
||||||
|
};
|
||||||
|
SharedSavedBackendStatsHash = ShmemInitHash("Citus Shared Saved Backend Stats Hash",
|
||||||
|
SAVED_BACKEND_STATS_HASH_INIT_DATABASES,
|
||||||
|
SAVED_BACKEND_STATS_HASH_MAX_DATABASES,
|
||||||
|
&hashInfo,
|
||||||
|
HASH_ELEM | HASH_FUNCTION);
|
||||||
|
|
||||||
|
Assert(sharedBackendStatsSlotArrayAlreadyInit ==
|
||||||
|
sharedSavedBackendStatsHashLockAlreadyInit);
|
||||||
|
if (!sharedBackendStatsSlotArrayAlreadyInit)
|
||||||
|
{
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
pg_atomic_init_u64(&backendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*SharedSavedBackendStatsHashLock = &(
|
||||||
|
GetNamedLWLockTranche(
|
||||||
|
SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME)
|
||||||
|
)->lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(AddinShmemInitLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* At this point, they should have been set to non-null values already,
|
||||||
|
* but we still check them just to be sure.
|
||||||
|
*/
|
||||||
|
if (SharedBackendStatsSlotArray &&
|
||||||
|
SharedSavedBackendStatsHashLock &&
|
||||||
|
SharedSavedBackendStatsHash)
|
||||||
|
{
|
||||||
|
StatCountersShmemInitDone = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SharedBackendStatsSlotArrayShmemSize returns the size of the shared
|
||||||
|
* backend stats slot array.
|
||||||
|
*/
|
||||||
|
static Size
|
||||||
|
SharedBackendStatsSlotArrayShmemSize(void)
|
||||||
|
{
|
||||||
|
return mul_size(sizeof(BackendStatsSlot), MaxBackends);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CollectActiveBackendStatsIntoHTAB aggregates the stat counters for the
|
||||||
|
* given database id from all the active backends into the databaseStats
|
||||||
|
* hash table. The function doesn't actually filter the slots of active
|
||||||
|
* backends but it's just fine to read the stat counters from all because
|
||||||
|
* exited backends anyway zero out their stat counters when they exit.
|
||||||
|
*
|
||||||
|
* If the database id is InvalidOid, then all the active backends will be
|
||||||
|
* considered regardless of the database they are connected to.
|
||||||
|
*
|
||||||
|
* Otherwise, if the database id is different than InvalidOid, then only
|
||||||
|
* the active backends whose PGPROC->databaseId is the same as the given
|
||||||
|
* database id will be considered, if any.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CollectActiveBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
|
||||||
|
|
||||||
|
if (backendProc->pid == 0)
|
||||||
|
{
|
||||||
|
/* unused slot */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid procDatabaseId = backendProc->databaseId;
|
||||||
|
if (procDatabaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Not connected to any database, something like logical replication
|
||||||
|
* launcher, autovacuum launcher or such.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseId != InvalidOid && databaseId != procDatabaseId)
|
||||||
|
{
|
||||||
|
/* not a database we are interested in */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(procDatabaseId, databaseStats);
|
||||||
|
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
pg_atomic_read_u64(&backendStatsSlot->counters[statIdx]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CollectSavedBackendStatsIntoHTAB fetches the saved stat counters and
|
||||||
|
* resetTimestamp for the given database id from the saved backend stats
|
||||||
|
* hash table and saves them into the databaseStats hash table.
|
||||||
|
*
|
||||||
|
* If the database id is InvalidOid, then all the databases that present
|
||||||
|
* in the saved backend stats hash table will be considered.
|
||||||
|
*
|
||||||
|
* Otherwise, if the database id is different than InvalidOid, then only
|
||||||
|
* the entry that belongs to given database will be considered, if there
|
||||||
|
* is such an entry.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CollectSavedBackendStatsIntoHTAB(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
if (databaseId != InvalidOid)
|
||||||
|
{
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(databaseId, databaseStats);
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->resetTimestamp =
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
hash_seq_init(&hashSeqStatus, SharedSavedBackendStatsHash);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry = NULL;
|
||||||
|
while ((dbSavedBackendStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry =
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(dbSavedBackendStatsEntry->databaseId,
|
||||||
|
databaseStats);
|
||||||
|
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
dbStatsEntry->counters[statIdx] +=
|
||||||
|
dbSavedBackendStatsEntry->counters[statIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
dbStatsEntry->resetTimestamp =
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp;
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DatabaseStatsHashEntryFindOrCreate creates a new entry in databaseStats
|
||||||
|
* hash table for the given database id if it doesn't already exist and
|
||||||
|
* initializes it, or just returns the existing entry if it does.
|
||||||
|
*/
|
||||||
|
static DatabaseStatsHashEntry *
|
||||||
|
DatabaseStatsHashEntryFindOrCreate(Oid databaseId, HTAB *databaseStats)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry = (DatabaseStatsHashEntry *)
|
||||||
|
hash_search(databaseStats, &databaseId,
|
||||||
|
HASH_ENTER, &found);
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
MemSet(dbStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
dbStatsEntry->resetTimestamp = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbStatsEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StoreDatabaseStatsIntoTupStore stores the database stats from the
|
||||||
|
* databaseStats hash table into given tuple store.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
StoreDatabaseStatsIntoTupStore(HTAB *databaseStats, Tuplestorestate *tupleStore,
|
||||||
|
TupleDesc tupleDescriptor)
|
||||||
|
{
|
||||||
|
HASH_SEQ_STATUS hashSeqStatus;
|
||||||
|
hash_seq_init(&hashSeqStatus, databaseStats);
|
||||||
|
|
||||||
|
DatabaseStatsHashEntry *dbStatsEntry = NULL;
|
||||||
|
while ((dbStatsEntry = hash_seq_search(&hashSeqStatus)) != NULL)
|
||||||
|
{
|
||||||
|
/* +2 for database_id (first) and the stats_reset (last) column */
|
||||||
|
Datum values[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
||||||
|
bool isNulls[N_CITUS_STAT_COUNTERS + 2] = { 0 };
|
||||||
|
|
||||||
|
values[0] = ObjectIdGetDatum(dbStatsEntry->databaseId);
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
uint64 statCounter = dbStatsEntry->counters[statIdx];
|
||||||
|
values[statIdx + 1] = UInt64GetDatum(statCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* set stats_reset column to NULL if it was never reset */
|
||||||
|
if (dbStatsEntry->resetTimestamp == 0)
|
||||||
|
{
|
||||||
|
isNulls[N_CITUS_STAT_COUNTERS + 1] = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
values[N_CITUS_STAT_COUNTERS + 1] =
|
||||||
|
TimestampTzGetDatum(dbStatsEntry->resetTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetActiveBackendStats resets the stat counters for the given database
|
||||||
|
* id for all the active backends. The function doesn't actually filter the
|
||||||
|
* slots of active backends but it's just fine to reset the stat counters
|
||||||
|
* for all because doing so just means resetting the stat counters for
|
||||||
|
* exited backends once again, which were already reset when they exited.
|
||||||
|
*
|
||||||
|
* Only active backends whose PGPROC->databaseId is the same as the given
|
||||||
|
* database id will be considered, if any.
|
||||||
|
*
|
||||||
|
* Returns true if any active backend was found.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ResetActiveBackendStats(Oid databaseId)
|
||||||
|
{
|
||||||
|
bool foundAny = false;
|
||||||
|
|
||||||
|
for (int backendSlotIdx = 0; backendSlotIdx < MaxBackends; ++backendSlotIdx)
|
||||||
|
{
|
||||||
|
PGPROC *backendProc = GetPGProcByNumber(backendSlotIdx);
|
||||||
|
|
||||||
|
if (backendProc->pid == 0)
|
||||||
|
{
|
||||||
|
/* unused slot */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid procDatabaseId = backendProc->databaseId;
|
||||||
|
if (procDatabaseId == InvalidOid)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* not connected to any database, something like logical replication
|
||||||
|
* launcher, autovacuum launcher, etc.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (databaseId != procDatabaseId)
|
||||||
|
{
|
||||||
|
/* not a database we are interested in */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
foundAny = true;
|
||||||
|
|
||||||
|
BackendStatsSlot *backendStatsSlot =
|
||||||
|
&SharedBackendStatsSlotArray[backendSlotIdx];
|
||||||
|
|
||||||
|
for (int statIdx = 0; statIdx < N_CITUS_STAT_COUNTERS; statIdx++)
|
||||||
|
{
|
||||||
|
pg_atomic_write_u64(&backendStatsSlot->counters[statIdx], 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return foundAny;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetSavedBackendStats resets the saved stat counters for the given
|
||||||
|
* database id and sets the resetTimestamp for it to the current timestamp.
|
||||||
|
*
|
||||||
|
* If force is true, then we first make sure that we have an entry for
|
||||||
|
* the given database id in the saved backend stats hash table.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ResetSavedBackendStats(Oid databaseId, bool force)
|
||||||
|
{
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(
|
||||||
|
SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_FIND,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry && force)
|
||||||
|
{
|
||||||
|
/* promote the lock to exclusive to insert the new entry for this database */
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry =
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(databaseId);
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Couldn't allocate a new hash entry because we're out of
|
||||||
|
* (shared) memory.
|
||||||
|
*/
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||||
|
errmsg("failed to allocate saved backend stats hash entry")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* re-acquire the shared lock */
|
||||||
|
LWLockAcquire(*SharedSavedBackendStatsHashLock, LW_SHARED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Actually reset the stat counters for the exited backends and set
|
||||||
|
* the resetTimestamp to the current timestamp if we already had
|
||||||
|
* an entry for it or if we just created it.
|
||||||
|
*/
|
||||||
|
if (dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
SpinLockAcquire(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
|
||||||
|
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
SpinLockRelease(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(*SharedSavedBackendStatsHashLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SavedBackendStatsHashEntryCreateIfNotExists creates a new entry in the
|
||||||
|
* saved backend stats hash table for the given database id if it doesn't
|
||||||
|
* already exist and initializes it.
|
||||||
|
*
|
||||||
|
* Assumes that the caller has exclusive access to the hash table since it
|
||||||
|
* performs HASH_ENTER_NULL.
|
||||||
|
*
|
||||||
|
* Returns NULL if the entry didn't exist and couldn't be allocated since
|
||||||
|
* we're out of (shared) memory.
|
||||||
|
*/
|
||||||
|
static SavedBackendStatsHashEntry *
|
||||||
|
SavedBackendStatsHashEntryCreateIfNotExists(Oid databaseId)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
SavedBackendStatsHashEntry *dbSavedBackendStatsEntry =
|
||||||
|
(SavedBackendStatsHashEntry *) hash_search(SharedSavedBackendStatsHash,
|
||||||
|
(void *) &databaseId,
|
||||||
|
HASH_ENTER_NULL,
|
||||||
|
&found);
|
||||||
|
|
||||||
|
if (!dbSavedBackendStatsEntry)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* As we provided HASH_ENTER_NULL, returning NULL means OOM.
|
||||||
|
* In that case, we return and let the caller decide what to do.
|
||||||
|
*/
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
memset(dbSavedBackendStatsEntry->counters, 0, sizeof(StatCounters));
|
||||||
|
|
||||||
|
dbSavedBackendStatsEntry->resetTimestamp = 0;
|
||||||
|
|
||||||
|
SpinLockInit(&dbSavedBackendStatsEntry->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbSavedBackendStatsEntry;
|
||||||
|
}
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
/*-------------------------------------------------------------------------
|
/*-------------------------------------------------------------------------
|
||||||
*
|
*
|
||||||
* citus_stat_tenants.c
|
* stat_tenants.c
|
||||||
* Routines related to the multi tenant monitor.
|
* Routines related to the multi tenant monitor.
|
||||||
*
|
*
|
||||||
* Copyright (c) Citus Data, Inc.
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
|
@ -34,9 +34,9 @@
|
||||||
#include "distributed/log_utils.h"
|
#include "distributed/log_utils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/stats/stat_tenants.h"
|
||||||
#include "distributed/tenant_schema_metadata.h"
|
#include "distributed/tenant_schema_metadata.h"
|
||||||
#include "distributed/tuplestore.h"
|
#include "distributed/tuplestore.h"
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= PG_VERSION_15)
|
#if (PG_VERSION_NUM >= PG_VERSION_15)
|
||||||
#include "common/pg_prng.h"
|
#include "common/pg_prng.h"
|
||||||
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "sys/time.h"
|
#include "sys/time.h"
|
||||||
|
|
||||||
#include "distributed/utils/citus_stat_tenants.h"
|
#include "distributed/stats/stat_tenants.h"
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(sleep_until_next_period);
|
PG_FUNCTION_INFO_V1(sleep_until_next_period);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,10 +54,10 @@
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/query_stats.h"
|
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shard_cleaner.h"
|
#include "distributed/shard_cleaner.h"
|
||||||
#include "distributed/statistics_collection.h"
|
#include "distributed/statistics_collection.h"
|
||||||
|
#include "distributed/stats/query_stats.h"
|
||||||
#include "distributed/transaction_recovery.h"
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -154,6 +154,11 @@ typedef struct CitusCopyDestReceiver
|
||||||
* when merging into the target tables.
|
* when merging into the target tables.
|
||||||
*/
|
*/
|
||||||
bool skipCoercions;
|
bool skipCoercions;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Determines whether the COPY command should track query stat counters.
|
||||||
|
*/
|
||||||
|
bool trackQueryCounters;
|
||||||
} CitusCopyDestReceiver;
|
} CitusCopyDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -170,7 +175,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
int partitionColumnIndex,
|
int partitionColumnIndex,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
char *intermediateResultPrefix,
|
char *intermediateResultPrefix,
|
||||||
bool isPublishable);
|
bool isPublishable,
|
||||||
|
bool trackQueryCounters);
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
||||||
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);
|
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);
|
||||||
|
|
|
||||||
|
|
@ -342,7 +342,7 @@ extern void FinishConnectionEstablishment(MultiConnection *connection);
|
||||||
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
|
extern void ForceConnectionCloseAtTransactionEnd(MultiConnection *connection);
|
||||||
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
extern void ClaimConnectionExclusively(MultiConnection *connection);
|
||||||
extern void UnclaimConnection(MultiConnection *connection);
|
extern void UnclaimConnection(MultiConnection *connection);
|
||||||
extern void MarkConnectionConnected(MultiConnection *connection);
|
extern void MarkConnectionConnected(MultiConnection *connection, bool newConnection);
|
||||||
|
|
||||||
/* waiteventset utilities */
|
/* waiteventset utilities */
|
||||||
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* stat_counters.h
|
||||||
|
*
|
||||||
|
* This file contains the exported functions to track various statistic
|
||||||
|
* counters for Citus.
|
||||||
|
*
|
||||||
|
* -------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef STAT_COUNTERS_H
|
||||||
|
#define STAT_COUNTERS_H
|
||||||
|
|
||||||
|
|
||||||
|
/* saved backend stats - constants */
|
||||||
|
#define SAVED_BACKEND_STATS_HASH_LOCK_TRANCHE_NAME \
|
||||||
|
"citus_stat_counters saved backend stats hash"
|
||||||
|
|
||||||
|
/* default value for the GUC variable */
|
||||||
|
#define ENABLE_STAT_COUNTERS_DEFAULT false
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Must be in the same order as the output columns defined in citus_stat_counters() UDF,
|
||||||
|
* see src/backend/distributed/sql/udfs/citus_stat_counters/latest.sql
|
||||||
|
*/
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* These are mainly tracked by connection_management.c and
|
||||||
|
* adaptive_executor.c.
|
||||||
|
*/
|
||||||
|
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
|
||||||
|
STAT_CONNECTION_ESTABLISHMENT_FAILED,
|
||||||
|
STAT_CONNECTION_REUSED,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* These are maintained by ExecCustomScan methods implemented
|
||||||
|
* for CustomScan nodes provided by Citus to account for actual
|
||||||
|
* execution of the queries and subplans. By maintaining these
|
||||||
|
* counters in ExecCustomScan callbacks, we ensure avoid
|
||||||
|
* incrementing them for plain EXPLAIN (i.e., without ANALYZE).
|
||||||
|
* queries. And, prefering the executor methods rather than the
|
||||||
|
* planner methods helps us capture the execution of prepared
|
||||||
|
* statements too.
|
||||||
|
*/
|
||||||
|
STAT_QUERY_EXECUTION_SINGLE_SHARD,
|
||||||
|
STAT_QUERY_EXECUTION_MULTI_SHARD,
|
||||||
|
|
||||||
|
/* do not use this and ensure it is the last entry */
|
||||||
|
N_CITUS_STAT_COUNTERS
|
||||||
|
} StatType;
|
||||||
|
|
||||||
|
|
||||||
|
/* GUC variable */
|
||||||
|
extern bool EnableStatCounters;
|
||||||
|
|
||||||
|
|
||||||
|
/* shared memory init */
|
||||||
|
extern void InitializeStatCountersShmem(void);
|
||||||
|
extern Size StatCountersShmemSize(void);
|
||||||
|
|
||||||
|
/* main entry point for the callers who want to increment the stat counters */
|
||||||
|
extern void IncrementStatCounterForMyDb(int statId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Exported to define a before_shmem_exit() callback that saves
|
||||||
|
* the stat counters for exited backends into the shared memory.
|
||||||
|
*/
|
||||||
|
extern void SaveBackendStatsIntoSavedBackendStatsHash(void);
|
||||||
|
|
||||||
|
#endif /* STAT_COUNTERS_H */
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
/*-------------------------------------------------------------------------
|
/*-------------------------------------------------------------------------
|
||||||
*
|
*
|
||||||
* citus_stat_tenants.h
|
* stat_tenants.h
|
||||||
* Routines related to the multi tenant monitor.
|
* Routines related to the multi tenant monitor.
|
||||||
*
|
*
|
||||||
* Copyright (c) Citus Data, Inc.
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
|
@ -92,6 +92,69 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT * FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
product_no | name | price
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- this time set citus.force_max_query_parallelization set to on
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT * FROM products;
|
||||||
|
WARNING: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
product_no | name | price
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
-- Make sure that we fall back to a working node for reads, even if it's not
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- the first choice in our task assignment policy.
|
-- the first choice in our task assignment policy.
|
||||||
SET citus.node_connection_timeout TO 900;
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
|
@ -168,6 +231,48 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test insert into a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test select from a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SELECT count(*) FROM single_replicatated;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx after 900 ms
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- one similar test, and this time on modification queries
|
-- one similar test, and this time on modification queries
|
||||||
-- to see that connection establishement failures could
|
-- to see that connection establishement failures could
|
||||||
-- fail the transaction (but not mark any placements as INVALID)
|
-- fail the transaction (but not mark any placements as INVALID)
|
||||||
|
|
|
||||||
|
|
@ -556,3 +556,67 @@ BEGIN
|
||||||
ORDER BY node_type;
|
ORDER BY node_type;
|
||||||
END;
|
END;
|
||||||
$func$ LANGUAGE plpgsql;
|
$func$ LANGUAGE plpgsql;
|
||||||
|
SET search_path TO public, pg_catalog;
|
||||||
|
--
|
||||||
|
-- create citus_stat_counters() and citus_stat_counters view
|
||||||
|
-- by entirely copying src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql.
|
||||||
|
--
|
||||||
|
-- See the comments for the function in
|
||||||
|
-- src/backend/distributed/stats/stat_counters.c for more details.
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
database_id oid DEFAULT 0,
|
||||||
|
-- must always be the first column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT database_id oid,
|
||||||
|
-- Following stat counter columns must be in the same order as the
|
||||||
|
-- StatType enum defined in src/include/distributed/stats/stat_counters.h
|
||||||
|
OUT connection_establishment_succeeded bigint,
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
OUT connection_reused bigint,
|
||||||
|
OUT query_execution_single_shard bigint,
|
||||||
|
OUT query_execution_multi_shard bigint,
|
||||||
|
-- must always be the last column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT stats_reset timestamp with time zone
|
||||||
|
)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'citus', $$citus_stat_counters$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
-- returns the stat counters for all the databases in local node
|
||||||
|
CREATE VIEW citus.citus_stat_counters AS
|
||||||
|
SELECT pg_database.oid,
|
||||||
|
pg_database.datname as name,
|
||||||
|
-- We always COALESCE the counters to 0 because the LEFT JOIN
|
||||||
|
-- will bring the databases that have never been connected to
|
||||||
|
-- since the last restart with NULL counters, but we want to
|
||||||
|
-- show them with 0 counters in the view.
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
|
||||||
|
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
|
||||||
|
citus_stat_counters.stats_reset
|
||||||
|
FROM pg_catalog.pg_database
|
||||||
|
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
|
||||||
|
ON (oid = database_id);
|
||||||
|
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
||||||
|
--
|
||||||
|
-- create citus_stat_counters_reset()
|
||||||
|
-- by entirely copying src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql.
|
||||||
|
--
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT PARALLEL SAFE
|
||||||
|
AS 'citus', $$citus_stat_counters_reset$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID or for the current database if nothing or 0 is provided.';
|
||||||
|
-- Rather than using explicit superuser() check in the function, we use
|
||||||
|
-- the GRANT system to REVOKE access to it when creating the extension.
|
||||||
|
-- Administrators can later change who can access it, or leave them as
|
||||||
|
-- only available to superuser / database cluster owner, if they choose.
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
||||||
|
--
|
||||||
|
-- done creating citus_stat_counters(), citus_stat_counters_reset() and citus_stat_counters view
|
||||||
|
--
|
||||||
|
RESET search_path;
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,43 @@ show citus.node_conninfo;
|
||||||
-- Should give a connection error because of bad sslmode
|
-- Should give a connection error because of bad sslmode
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
select count(*) from test;
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
-- Test a function that tries to establish parallel node connections.
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
-- we don't care about the result, hence make it always return true
|
||||||
|
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -133,6 +133,8 @@ ORDER BY 1;
|
||||||
function citus_shards_on_worker()
|
function citus_shards_on_worker()
|
||||||
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
|
function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode)
|
||||||
function citus_stat_activity()
|
function citus_stat_activity()
|
||||||
|
function citus_stat_counters(oid)
|
||||||
|
function citus_stat_counters_reset(oid)
|
||||||
function citus_stat_statements()
|
function citus_stat_statements()
|
||||||
function citus_stat_statements_reset()
|
function citus_stat_statements_reset()
|
||||||
function citus_stat_tenants(boolean)
|
function citus_stat_tenants(boolean)
|
||||||
|
|
@ -338,10 +340,10 @@ ORDER BY 1;
|
||||||
view citus_shards
|
view citus_shards
|
||||||
view citus_shards_on_worker
|
view citus_shards_on_worker
|
||||||
view citus_stat_activity
|
view citus_stat_activity
|
||||||
|
view citus_stat_counters
|
||||||
view citus_stat_statements
|
view citus_stat_statements
|
||||||
view citus_stat_tenants
|
view citus_stat_tenants
|
||||||
view citus_stat_tenants_local
|
view citus_stat_tenants_local
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(333 rows)
|
(333 rows)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,8 @@ test: failure_savepoints
|
||||||
test: failure_multi_row_insert
|
test: failure_multi_row_insert
|
||||||
test: failure_mx_metadata_sync
|
test: failure_mx_metadata_sync
|
||||||
test: failure_mx_metadata_sync_multi_trans
|
test: failure_mx_metadata_sync_multi_trans
|
||||||
|
# Do not parallelize with others because this measures stat counters
|
||||||
|
# for failed connections for a few queries.
|
||||||
test: failure_connection_establishment
|
test: failure_connection_establishment
|
||||||
|
|
||||||
# this test syncs metadata to the workers
|
# this test syncs metadata to the workers
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,12 @@ test: create_single_shard_table
|
||||||
test: single_shard_table_udfs
|
test: single_shard_table_udfs
|
||||||
test: schema_based_sharding
|
test: schema_based_sharding
|
||||||
test: citus_schema_distribute_undistribute
|
test: citus_schema_distribute_undistribute
|
||||||
|
# Don't parallelize stat_counters with others because we don't want statistics
|
||||||
|
# to be updated by other tests concurrently except Citus Maintenance Daemon.
|
||||||
|
#
|
||||||
|
# Also, this needs to be the first test that calls citus_stat_counters()
|
||||||
|
# because it checks the value of stats_reset column before calling the function.
|
||||||
|
test: stat_counters
|
||||||
|
|
||||||
test: multi_test_catalog_views
|
test: multi_test_catalog_views
|
||||||
test: multi_table_ddl
|
test: multi_table_ddl
|
||||||
|
|
@ -278,6 +284,9 @@ test: multi_colocation_utils
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# node_conninfo_reload tests that node_conninfo changes take effect
|
# node_conninfo_reload tests that node_conninfo changes take effect
|
||||||
|
#
|
||||||
|
# Do not parallelize with others because this measures stat counters
|
||||||
|
# for failed connections for a few queries.
|
||||||
# ----------
|
# ----------
|
||||||
test: node_conninfo_reload
|
test: node_conninfo_reload
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -493,6 +493,8 @@ push(@pgOptions, "citus.stat_statements_track = 'all'");
|
||||||
push(@pgOptions, "citus.enable_change_data_capture=off");
|
push(@pgOptions, "citus.enable_change_data_capture=off");
|
||||||
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
push(@pgOptions, "citus.stat_tenants_limit = 2");
|
||||||
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
|
||||||
|
push(@pgOptions, "citus.enable_stat_counters=on");
|
||||||
|
push(@pgOptions, "citus.superuser = 'postgres'");
|
||||||
|
|
||||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||||
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
||||||
|
|
@ -1199,4 +1201,3 @@ else {
|
||||||
die "Failed in ". ($endTime - $startTime)." seconds. \n";
|
die "Failed in ". ($endTime - $startTime)." seconds. \n";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,43 @@ ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
|
||||||
RESET citus.node_connection_timeout;
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT * FROM products;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- this time set citus.force_max_query_parallelization set to on
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT * FROM products;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
-- Make sure that we fall back to a working node for reads, even if it's not
|
-- Make sure that we fall back to a working node for reads, even if it's not
|
||||||
-- the first choice in our task assignment policy.
|
-- the first choice in our task assignment policy.
|
||||||
SET citus.node_connection_timeout TO 900;
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
|
@ -87,6 +124,33 @@ RESET citus.force_max_query_parallelization;
|
||||||
RESET citus.node_connection_timeout;
|
RESET citus.node_connection_timeout;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
SET citus.force_max_query_parallelization TO ON;
|
||||||
|
SET citus.node_connection_timeout TO 900;
|
||||||
|
SELECT citus.mitmproxy('conn.connect_delay(1400)');
|
||||||
|
|
||||||
|
-- test insert into a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
INSERT INTO single_replicatated VALUES (100);
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
-- test select from a single replicated table
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SELECT count(*) FROM single_replicatated;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
RESET citus.force_max_query_parallelization;
|
||||||
|
RESET citus.node_connection_timeout;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
-- one similar test, and this time on modification queries
|
-- one similar test, and this time on modification queries
|
||||||
-- to see that connection establishement failures could
|
-- to see that connection establishement failures could
|
||||||
|
|
|
||||||
|
|
@ -581,3 +581,83 @@ BEGIN
|
||||||
ORDER BY node_type;
|
ORDER BY node_type;
|
||||||
END;
|
END;
|
||||||
$func$ LANGUAGE plpgsql;
|
$func$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
SET search_path TO public, pg_catalog;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- create citus_stat_counters() and citus_stat_counters view
|
||||||
|
-- by entirely copying src/backend/distributed/sql/udfs/citus_stat_counters/13.1-1.sql.
|
||||||
|
--
|
||||||
|
|
||||||
|
-- See the comments for the function in
|
||||||
|
-- src/backend/distributed/stats/stat_counters.c for more details.
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters(
|
||||||
|
database_id oid DEFAULT 0,
|
||||||
|
|
||||||
|
-- must always be the first column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT database_id oid,
|
||||||
|
|
||||||
|
-- Following stat counter columns must be in the same order as the
|
||||||
|
-- StatType enum defined in src/include/distributed/stats/stat_counters.h
|
||||||
|
OUT connection_establishment_succeeded bigint,
|
||||||
|
OUT connection_establishment_failed bigint,
|
||||||
|
OUT connection_reused bigint,
|
||||||
|
OUT query_execution_single_shard bigint,
|
||||||
|
OUT query_execution_multi_shard bigint,
|
||||||
|
|
||||||
|
-- must always be the last column or you should accordingly update
|
||||||
|
-- StoreDatabaseStatsIntoTupStore() function in src/backend/distributed/stats/stat_counters.c
|
||||||
|
OUT stats_reset timestamp with time zone
|
||||||
|
)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'citus', $$citus_stat_counters$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters(oid) IS 'Returns Citus stat counters for the given database OID, or for all databases if 0 is passed. Includes only databases with at least one connection since last restart, including dropped ones.';
|
||||||
|
|
||||||
|
-- returns the stat counters for all the databases in local node
|
||||||
|
CREATE VIEW citus.citus_stat_counters AS
|
||||||
|
SELECT pg_database.oid,
|
||||||
|
pg_database.datname as name,
|
||||||
|
|
||||||
|
-- We always COALESCE the counters to 0 because the LEFT JOIN
|
||||||
|
-- will bring the databases that have never been connected to
|
||||||
|
-- since the last restart with NULL counters, but we want to
|
||||||
|
-- show them with 0 counters in the view.
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_succeeded, 0) as connection_establishment_succeeded,
|
||||||
|
COALESCE(citus_stat_counters.connection_establishment_failed, 0) as connection_establishment_failed,
|
||||||
|
COALESCE(citus_stat_counters.connection_reused, 0) as connection_reused,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_single_shard, 0) as query_execution_single_shard,
|
||||||
|
COALESCE(citus_stat_counters.query_execution_multi_shard, 0) as query_execution_multi_shard,
|
||||||
|
|
||||||
|
citus_stat_counters.stats_reset
|
||||||
|
FROM pg_catalog.pg_database
|
||||||
|
LEFT JOIN (SELECT (pg_catalog.citus_stat_counters(0)).*) citus_stat_counters
|
||||||
|
ON (oid = database_id);
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stat_counters SET SCHEMA pg_catalog;
|
||||||
|
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stat_counters TO PUBLIC;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- create citus_stat_counters_reset()
|
||||||
|
-- by entirely copying src/backend/distributed/sql/udfs/citus_stat_counters_reset/13.1-1.sql.
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_counters_reset(database_oid oid DEFAULT 0)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT PARALLEL SAFE
|
||||||
|
AS 'citus', $$citus_stat_counters_reset$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) IS 'Resets Citus stat counters for the given database OID or for the current database if nothing or 0 is provided.';
|
||||||
|
|
||||||
|
-- Rather than using explicit superuser() check in the function, we use
|
||||||
|
-- the GRANT system to REVOKE access to it when creating the extension.
|
||||||
|
-- Administrators can later change who can access it, or leave them as
|
||||||
|
-- only available to superuser / database cluster owner, if they choose.
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_counters_reset(oid) FROM PUBLIC;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- done creating citus_stat_counters(), citus_stat_counters_reset() and citus_stat_counters view
|
||||||
|
--
|
||||||
|
|
||||||
|
RESET search_path;
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,30 @@ show citus.node_conninfo;
|
||||||
-- Should give a connection error because of bad sslmode
|
-- Should give a connection error because of bad sslmode
|
||||||
select count(*) from test where a = 0;
|
select count(*) from test where a = 0;
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
select count(*) from test;
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed > 0
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed AS old_connection_establishment_failed
|
||||||
|
FROM pg_database WHERE datname = current_database() \gset
|
||||||
|
|
||||||
|
-- Test a function that tries to establish parallel node connections.
|
||||||
|
SET citus.enable_stat_counters TO true;
|
||||||
|
-- we don't care about the result, hence make it always return true
|
||||||
|
SELECT COUNT(*) > -1 FROM get_global_active_transactions();
|
||||||
|
RESET citus.enable_stat_counters;
|
||||||
|
|
||||||
|
-- make sure that we properly updated the connection_establishment_failed counter
|
||||||
|
SELECT (citus_stat_counters(oid)).connection_establishment_failed - :old_connection_establishment_failed = 2
|
||||||
|
FROM pg_database WHERE datname = current_database();
|
||||||
|
|
||||||
-- Reset it again
|
-- Reset it again
|
||||||
ALTER SYSTEM RESET citus.node_conninfo;
|
ALTER SYSTEM RESET citus.node_conninfo;
|
||||||
select pg_reload_conf();
|
select pg_reload_conf();
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue