Prefer instr_time to TimestampTz when we want CLOCK_MONOTONIC

pull/3494/head
Philip Dubé 2020-02-12 21:42:43 +00:00
parent 36bb85e5c0
commit 52042d4a00
5 changed files with 57 additions and 62 deletions

View File

@ -29,6 +29,7 @@
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "portability/instr_time.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -680,9 +681,9 @@ MultiConnectionStateEventMask(MultiConnectionPollState *connectionState)
void void
FinishConnectionListEstablishment(List *multiConnectionList) FinishConnectionListEstablishment(List *multiConnectionList)
{ {
const TimestampTz connectionStart = GetCurrentTimestamp(); instr_time connectionStart;
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, INSTR_TIME_SET_CURRENT(connectionStart);
NodeConnectionTimeout);
List *connectionStates = NULL; List *connectionStates = NULL;
ListCell *multiConnectionCell = NULL; ListCell *multiConnectionCell = NULL;
@ -729,7 +730,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
ALLOCSET_DEFAULT_SIZES)); ALLOCSET_DEFAULT_SIZES));
while (waitCount > 0) while (waitCount > 0)
{ {
long timeout = DeadlineTimestampTzToTimeout(deadline); long timeout = MillisecondsToTimeout(connectionStart, NodeConnectionTimeout);
if (waitEventSetRebuild) if (waitEventSetRebuild)
{ {
@ -812,8 +813,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
* connectionStart and if passed close all non-finished connections * connectionStart and if passed close all non-finished connections
*/ */
TimestampTz now = GetCurrentTimestamp(); if (MillisecondsPassedSince(connectionStart) >= NodeConnectionTimeout)
if (TimestampDifferenceExceeds(connectionStart, now, NodeConnectionTimeout))
{ {
/* /*
* showing as a warning, can't be an error. In some cases queries can * showing as a warning, can't be an error. In some cases queries can
@ -837,17 +837,28 @@ FinishConnectionListEstablishment(List *multiConnectionList)
/* /*
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse * MillisecondsPassedSince returns the number of milliseconds elapsed between an
* before the deadline provided as an argument will be reached. The outcome can be used to * instr_time & the current time.
*/
double
MillisecondsPassedSince(instr_time moment)
{
instr_time timeSinceMoment;
INSTR_TIME_SET_CURRENT(timeSinceMoment);
INSTR_TIME_SUBTRACT(timeSinceMoment, moment);
return INSTR_TIME_GET_MILLISEC(timeSinceMoment);
}
/*
* MillisecondsToTimeout returns the numer of milliseconds that still need to elapse
* before msAfterStart milliseconds have passed since start. The outcome can be used to
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed. * pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
*/ */
long long
DeadlineTimestampTzToTimeout(TimestampTz deadline) MillisecondsToTimeout(instr_time start, long msAfterStart)
{ {
long secs = 0; return msAfterStart - MillisecondsPassedSince(start);
int microsecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &microsecs);
return secs * 1000 + microsecs / 1000;
} }

View File

@ -157,6 +157,7 @@
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "portability/instr_time.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/int8.h" #include "utils/int8.h"
@ -362,13 +363,13 @@ typedef struct WorkerPool
* We keep this for enforcing the connection timeouts. In our definition, a pool * We keep this for enforcing the connection timeouts. In our definition, a pool
* starts when the first connection establishment starts. * starts when the first connection establishment starts.
*/ */
TimestampTz poolStartTime; instr_time poolStartTime;
/* indicates whether to check for the connection timeout */ /* indicates whether to check for the connection timeout */
bool checkForPoolTimeout; bool checkForPoolTimeout;
/* last time we opened a connection */ /* last time we opened a connection */
TimestampTz lastConnectionOpenTime; instr_time lastConnectionOpenTime;
/* maximum number of connections we are allowed to open at once */ /* maximum number of connections we are allowed to open at once */
uint32 maxNewConnectionsPerCycle; uint32 maxNewConnectionsPerCycle;
@ -588,7 +589,6 @@ static void ManageWorkerPool(WorkerPool *workerPool);
static void CheckConnectionTimeout(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
@ -622,6 +622,7 @@ static int GetEventSetSize(List *sessionList);
static int RebuildWaitEventSet(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived); eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
/* /*
* AdaptiveExecutor is called via CitusExecScan on the * AdaptiveExecutor is called via CitusExecScan on the
@ -1845,7 +1846,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool));
workerPool->nodeName = pstrdup(nodeName); workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort; workerPool->nodePort = nodePort;
workerPool->poolStartTime = 0; INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
workerPool->distributedExecution = execution; workerPool->distributedExecution = execution;
/* "open" connections aggressively when there are cached connections */ /* "open" connections aggressively when there are cached connections */
@ -1905,7 +1906,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
*/ */
if (list_length(workerPool->sessionList) == 0) if (list_length(workerPool->sessionList) == 0)
{ {
workerPool->poolStartTime = GetCurrentTimestamp(); INSTR_TIME_SET_CURRENT(workerPool->poolStartTime);
workerPool->checkForPoolTimeout = true; workerPool->checkForPoolTimeout = true;
} }
@ -2243,10 +2244,8 @@ ManageWorkerPool(WorkerPool *workerPool)
if (newConnectionCount > 0 && ExecutorSlowStartInterval > 0) if (newConnectionCount > 0 && ExecutorSlowStartInterval > 0)
{ {
TimestampTz now = GetCurrentTimestamp(); if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
ExecutorSlowStartInterval)
if (TimestampDifferenceExceeds(workerPool->lastConnectionOpenTime, now,
ExecutorSlowStartInterval))
{ {
newConnectionCount = Min(newConnectionCount, newConnectionCount = Min(newConnectionCount,
workerPool->maxNewConnectionsPerCycle); workerPool->maxNewConnectionsPerCycle);
@ -2307,7 +2306,7 @@ ManageWorkerPool(WorkerPool *workerPool)
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
workerPool->lastConnectionOpenTime = GetCurrentTimestamp(); INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->connectionSetChanged = true; execution->connectionSetChanged = true;
} }
@ -2329,8 +2328,9 @@ static void
CheckConnectionTimeout(WorkerPool *workerPool) CheckConnectionTimeout(WorkerPool *workerPool)
{ {
DistributedExecution *execution = workerPool->distributedExecution; DistributedExecution *execution = workerPool->distributedExecution;
TimestampTz poolStartTime = workerPool->poolStartTime; instr_time poolStartTime = workerPool->poolStartTime;
TimestampTz now = GetCurrentTimestamp(); instr_time now;
INSTR_TIME_SET_CURRENT(now);
int initiatedConnectionCount = list_length(workerPool->sessionList); int initiatedConnectionCount = list_length(workerPool->sessionList);
int activeConnectionCount = workerPool->activeConnectionCount; int activeConnectionCount = workerPool->activeConnectionCount;
@ -2339,7 +2339,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
if (initiatedConnectionCount == 0) if (initiatedConnectionCount == 0)
{ {
/* no connection has been planned for the pool yet */ /* no connection has been planned for the pool yet */
Assert(poolStartTime == 0); Assert(INSTR_TIME_IS_ZERO(poolStartTime));
return; return;
} }
@ -2354,7 +2354,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
requiredActiveConnectionCount = initiatedConnectionCount; requiredActiveConnectionCount = initiatedConnectionCount;
} }
if (TimestampDifferenceExceeds(poolStartTime, now, NodeConnectionTimeout)) if (MillisecondsBetweenTimestamps(poolStartTime, now) >= NodeConnectionTimeout)
{ {
if (activeConnectionCount < requiredActiveConnectionCount) if (activeConnectionCount < requiredActiveConnectionCount)
{ {
@ -2425,7 +2425,8 @@ UsableConnectionCount(WorkerPool *workerPool)
static long static long
NextEventTimeout(DistributedExecution *execution) NextEventTimeout(DistributedExecution *execution)
{ {
TimestampTz now = GetCurrentTimestamp(); instr_time now;
INSTR_TIME_SET_CURRENT(now);
long eventTimeout = 1000; /* milliseconds */ long eventTimeout = 1000; /* milliseconds */
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
@ -2437,7 +2438,8 @@ NextEventTimeout(DistributedExecution *execution)
continue; continue;
} }
if (workerPool->poolStartTime != 0 && workerPool->checkForPoolTimeout) if (!INSTR_TIME_IS_ZERO(workerPool->poolStartTime) &&
workerPool->checkForPoolTimeout)
{ {
long timeSincePoolStartMs = long timeSincePoolStartMs =
MillisecondsBetweenTimestamps(workerPool->poolStartTime, now); MillisecondsBetweenTimestamps(workerPool->poolStartTime, now);
@ -2488,14 +2490,10 @@ NextEventTimeout(DistributedExecution *execution)
* long. * long.
*/ */
static long static long
MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime) MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
{ {
long secs = 0; INSTR_TIME_SUBTRACT(endTime, startTime);
int micros = 0; return INSTR_TIME_GET_MILLISEC(endTime);
TimestampDifference(startTime, endTime, &secs, &micros);
return secs * 1000 + micros / 1000;
} }
@ -3427,7 +3425,7 @@ WorkerPoolFailed(WorkerPool *workerPool)
foreach_ptr(pool, workerList) foreach_ptr(pool, workerList)
{ {
/* failed pools or pools without any connection attempts ignored */ /* failed pools or pools without any connection attempts ignored */
if (pool->failed || pool->poolStartTime == 0) if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime))
{ {
continue; continue;
} }
@ -3436,7 +3434,7 @@ WorkerPoolFailed(WorkerPool *workerPool)
* This should give another NodeConnectionTimeout until all * This should give another NodeConnectionTimeout until all
* the necessary connections are established. * the necessary connections are established.
*/ */
pool->poolStartTime = GetCurrentTimestamp(); INSTR_TIME_SET_CURRENT(pool->poolStartTime);
pool->checkForPoolTimeout = true; pool->checkForPoolTimeout = true;
} }
} }

View File

@ -212,9 +212,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
es->indent += 3; es->indent += 3;
} }
/* set the planning time to 0 */ INSTR_TIME_SET_ZERO(planduration);
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planduration);
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration); ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration);

View File

@ -28,11 +28,13 @@
#include "executor/spi.h" #include "executor/spi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "portability/instr_time.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/snapmgr.h" #include "utils/snapmgr.h"
#include "distributed/citus_acquire_lock.h" #include "distributed/citus_acquire_lock.h"
#include "distributed/connection_management.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
/* forward declaration of background worker entrypoint */ /* forward declaration of background worker entrypoint */
@ -41,7 +43,6 @@ extern void LockAcquireHelperMain(Datum main_arg);
/* forward declaration of helper functions */ /* forward declaration of helper functions */
static void lock_acquire_helper_sigterm(SIGNAL_ARGS); static void lock_acquire_helper_sigterm(SIGNAL_ARGS);
static void EnsureStopLockAcquireHelper(void *arg); static void EnsureStopLockAcquireHelper(void *arg);
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
/* LockAcquireHelperArgs contains extra arguments to be used to start the worker */ /* LockAcquireHelperArgs contains extra arguments to be used to start the worker */
typedef struct LockAcquireHelperArgs typedef struct LockAcquireHelperArgs
@ -188,9 +189,8 @@ LockAcquireHelperMain(Datum main_arg)
StringInfoData sql; StringInfoData sql;
LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra; LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra;
long timeout = 0; long timeout = 0;
const TimestampTz connectionStart = GetCurrentTimestamp(); instr_time connectionStart;
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, INSTR_TIME_SET_CURRENT(connectionStart);
args->lock_cooldown);
/* parameters for sql query to be executed */ /* parameters for sql query to be executed */
const int paramCount = 1; const int paramCount = 1;
@ -211,7 +211,7 @@ LockAcquireHelperMain(Datum main_arg)
* the lock. * the lock.
*/ */
do { do {
timeout = DeadlineTimestampTzToTimeout(deadline); timeout = MillisecondsToTimeout(connectionStart, args->lock_cooldown);
} while (timeout > 0 && ShouldAcquireLock(timeout)); } while (timeout > 0 && ShouldAcquireLock(timeout));
/* connecting to the database */ /* connecting to the database */
@ -304,18 +304,3 @@ LockAcquireHelperMain(Datum main_arg)
/* safely got to the end, exit without problem */ /* safely got to the end, exit without problem */
proc_exit(0); proc_exit(0);
} }
/*
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse
* before the deadline provided as an argument will be reached. The outcome can be used to
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
*/
static long
DeadlineTimestampTzToTimeout(TimestampTz deadline)
{
long secs = 0;
int msecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs);
return secs * 1000 + msecs / 1000;
}

View File

@ -14,6 +14,7 @@
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/remote_transaction.h" #include "distributed/remote_transaction.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "portability/instr_time.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
@ -227,12 +228,14 @@ extern void FinishConnectionListEstablishment(List *multiConnectionList);
extern void FinishConnectionEstablishment(MultiConnection *connection); extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection);
extern long DeadlineTimestampTzToTimeout(TimestampTz deadline);
/* dealing with notice handler */ /* dealing with notice handler */
extern void SetCitusNoticeProcessor(MultiConnection *connection); extern void SetCitusNoticeProcessor(MultiConnection *connection);
extern char * TrimLogLevel(const char *message); extern char * TrimLogLevel(const char *message);
extern void UnsetCitusNoticeLevel(void); extern void UnsetCitusNoticeLevel(void);
/* time utilities */
extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
#endif /* CONNECTION_MANAGMENT_H */ #endif /* CONNECTION_MANAGMENT_H */