From 52042d4a0021d0dee3dc64bc610a9e257774bf43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 12 Feb 2020 21:42:43 +0000 Subject: [PATCH] Prefer instr_time to TimestampTz when we want CLOCK_MONOTONIC --- .../connection/connection_management.c | 37 +++++++++----- .../distributed/executor/adaptive_executor.c | 48 +++++++++---------- .../distributed/planner/multi_explain.c | 4 +- src/backend/distributed/utils/acquire_lock.c | 25 ++-------- .../distributed/connection_management.h | 5 +- 5 files changed, 57 insertions(+), 62 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index f9b08c50d..359431eb6 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -29,6 +29,7 @@ #include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "mb/pg_wchar.h" +#include "portability/instr_time.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -680,9 +681,9 @@ MultiConnectionStateEventMask(MultiConnectionPollState *connectionState) void FinishConnectionListEstablishment(List *multiConnectionList) { - const TimestampTz connectionStart = GetCurrentTimestamp(); - const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, - NodeConnectionTimeout); + instr_time connectionStart; + INSTR_TIME_SET_CURRENT(connectionStart); + List *connectionStates = NULL; ListCell *multiConnectionCell = NULL; @@ -729,7 +730,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) ALLOCSET_DEFAULT_SIZES)); while (waitCount > 0) { - long timeout = DeadlineTimestampTzToTimeout(deadline); + long timeout = MillisecondsToTimeout(connectionStart, NodeConnectionTimeout); if (waitEventSetRebuild) { @@ -812,8 +813,7 @@ FinishConnectionListEstablishment(List *multiConnectionList) * connectionStart and if passed close all non-finished connections */ - TimestampTz now = GetCurrentTimestamp(); - if (TimestampDifferenceExceeds(connectionStart, now, NodeConnectionTimeout)) + if (MillisecondsPassedSince(connectionStart) >= NodeConnectionTimeout) { /* * showing as a warning, can't be an error. In some cases queries can @@ -837,17 +837,28 @@ FinishConnectionListEstablishment(List *multiConnectionList) /* - * DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse - * before the deadline provided as an argument will be reached. The outcome can be used to + * MillisecondsPassedSince returns the number of milliseconds elapsed between an + * instr_time & the current time. + */ +double +MillisecondsPassedSince(instr_time moment) +{ + instr_time timeSinceMoment; + INSTR_TIME_SET_CURRENT(timeSinceMoment); + INSTR_TIME_SUBTRACT(timeSinceMoment, moment); + return INSTR_TIME_GET_MILLISEC(timeSinceMoment); +} + + +/* + * MillisecondsToTimeout returns the numer of milliseconds that still need to elapse + * before msAfterStart milliseconds have passed since start. The outcome can be used to * pass to the Wait of an EventSet to make sure it returns after the timeout has passed. */ long -DeadlineTimestampTzToTimeout(TimestampTz deadline) +MillisecondsToTimeout(instr_time start, long msAfterStart) { - long secs = 0; - int microsecs = 0; - TimestampDifference(GetCurrentTimestamp(), deadline, &secs, µsecs); - return secs * 1000 + microsecs / 1000; + return msAfterStart - MillisecondsPassedSince(start); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f10f25f7f..71c3f6e29 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -157,6 +157,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "lib/ilist.h" +#include "portability/instr_time.h" #include "storage/fd.h" #include "storage/latch.h" #include "utils/int8.h" @@ -362,13 +363,13 @@ typedef struct WorkerPool * We keep this for enforcing the connection timeouts. In our definition, a pool * starts when the first connection establishment starts. */ - TimestampTz poolStartTime; + instr_time poolStartTime; /* indicates whether to check for the connection timeout */ bool checkForPoolTimeout; /* last time we opened a connection */ - TimestampTz lastConnectionOpenTime; + instr_time lastConnectionOpenTime; /* maximum number of connections we are allowed to open at once */ uint32 maxNewConnectionsPerCycle; @@ -588,7 +589,6 @@ static void ManageWorkerPool(WorkerPool *workerPool); static void CheckConnectionTimeout(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); -static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); @@ -622,6 +622,7 @@ static int GetEventSetSize(List *sessionList); static int RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); +static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); /* * AdaptiveExecutor is called via CitusExecScan on the @@ -1845,7 +1846,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool)); workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; - workerPool->poolStartTime = 0; + INSTR_TIME_SET_ZERO(workerPool->poolStartTime); workerPool->distributedExecution = execution; /* "open" connections aggressively when there are cached connections */ @@ -1905,7 +1906,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) */ if (list_length(workerPool->sessionList) == 0) { - workerPool->poolStartTime = GetCurrentTimestamp(); + INSTR_TIME_SET_CURRENT(workerPool->poolStartTime); workerPool->checkForPoolTimeout = true; } @@ -2243,10 +2244,8 @@ ManageWorkerPool(WorkerPool *workerPool) if (newConnectionCount > 0 && ExecutorSlowStartInterval > 0) { - TimestampTz now = GetCurrentTimestamp(); - - if (TimestampDifferenceExceeds(workerPool->lastConnectionOpenTime, now, - ExecutorSlowStartInterval)) + if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >= + ExecutorSlowStartInterval) { newConnectionCount = Min(newConnectionCount, workerPool->maxNewConnectionsPerCycle); @@ -2307,7 +2306,7 @@ ManageWorkerPool(WorkerPool *workerPool) ConnectionStateMachine(session); } - workerPool->lastConnectionOpenTime = GetCurrentTimestamp(); + INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); execution->connectionSetChanged = true; } @@ -2329,8 +2328,9 @@ static void CheckConnectionTimeout(WorkerPool *workerPool) { DistributedExecution *execution = workerPool->distributedExecution; - TimestampTz poolStartTime = workerPool->poolStartTime; - TimestampTz now = GetCurrentTimestamp(); + instr_time poolStartTime = workerPool->poolStartTime; + instr_time now; + INSTR_TIME_SET_CURRENT(now); int initiatedConnectionCount = list_length(workerPool->sessionList); int activeConnectionCount = workerPool->activeConnectionCount; @@ -2339,7 +2339,7 @@ CheckConnectionTimeout(WorkerPool *workerPool) if (initiatedConnectionCount == 0) { /* no connection has been planned for the pool yet */ - Assert(poolStartTime == 0); + Assert(INSTR_TIME_IS_ZERO(poolStartTime)); return; } @@ -2354,7 +2354,7 @@ CheckConnectionTimeout(WorkerPool *workerPool) requiredActiveConnectionCount = initiatedConnectionCount; } - if (TimestampDifferenceExceeds(poolStartTime, now, NodeConnectionTimeout)) + if (MillisecondsBetweenTimestamps(poolStartTime, now) >= NodeConnectionTimeout) { if (activeConnectionCount < requiredActiveConnectionCount) { @@ -2425,7 +2425,8 @@ UsableConnectionCount(WorkerPool *workerPool) static long NextEventTimeout(DistributedExecution *execution) { - TimestampTz now = GetCurrentTimestamp(); + instr_time now; + INSTR_TIME_SET_CURRENT(now); long eventTimeout = 1000; /* milliseconds */ WorkerPool *workerPool = NULL; @@ -2437,7 +2438,8 @@ NextEventTimeout(DistributedExecution *execution) continue; } - if (workerPool->poolStartTime != 0 && workerPool->checkForPoolTimeout) + if (!INSTR_TIME_IS_ZERO(workerPool->poolStartTime) && + workerPool->checkForPoolTimeout) { long timeSincePoolStartMs = MillisecondsBetweenTimestamps(workerPool->poolStartTime, now); @@ -2488,14 +2490,10 @@ NextEventTimeout(DistributedExecution *execution) * long. */ static long -MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime) +MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime) { - long secs = 0; - int micros = 0; - - TimestampDifference(startTime, endTime, &secs, µs); - - return secs * 1000 + micros / 1000; + INSTR_TIME_SUBTRACT(endTime, startTime); + return INSTR_TIME_GET_MILLISEC(endTime); } @@ -3427,7 +3425,7 @@ WorkerPoolFailed(WorkerPool *workerPool) foreach_ptr(pool, workerList) { /* failed pools or pools without any connection attempts ignored */ - if (pool->failed || pool->poolStartTime == 0) + if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime)) { continue; } @@ -3436,7 +3434,7 @@ WorkerPoolFailed(WorkerPool *workerPool) * This should give another NodeConnectionTimeout until all * the necessary connections are established. */ - pool->poolStartTime = GetCurrentTimestamp(); + INSTR_TIME_SET_CURRENT(pool->poolStartTime); pool->checkForPoolTimeout = true; } } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 5249e6867..cc36677f7 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -212,9 +212,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es) es->indent += 3; } - /* set the planning time to 0 */ - INSTR_TIME_SET_CURRENT(planduration); - INSTR_TIME_SUBTRACT(planduration, planduration); + INSTR_TIME_SET_ZERO(planduration); ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration); diff --git a/src/backend/distributed/utils/acquire_lock.c b/src/backend/distributed/utils/acquire_lock.c index 8af1f35fc..229340c97 100644 --- a/src/backend/distributed/utils/acquire_lock.c +++ b/src/backend/distributed/utils/acquire_lock.c @@ -28,11 +28,13 @@ #include "executor/spi.h" #include "miscadmin.h" #include "pgstat.h" +#include "portability/instr_time.h" #include "storage/ipc.h" #include "storage/latch.h" #include "utils/snapmgr.h" #include "distributed/citus_acquire_lock.h" +#include "distributed/connection_management.h" #include "distributed/version_compat.h" /* forward declaration of background worker entrypoint */ @@ -41,7 +43,6 @@ extern void LockAcquireHelperMain(Datum main_arg); /* forward declaration of helper functions */ static void lock_acquire_helper_sigterm(SIGNAL_ARGS); static void EnsureStopLockAcquireHelper(void *arg); -static long DeadlineTimestampTzToTimeout(TimestampTz deadline); /* LockAcquireHelperArgs contains extra arguments to be used to start the worker */ typedef struct LockAcquireHelperArgs @@ -188,9 +189,8 @@ LockAcquireHelperMain(Datum main_arg) StringInfoData sql; LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra; long timeout = 0; - const TimestampTz connectionStart = GetCurrentTimestamp(); - const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, - args->lock_cooldown); + instr_time connectionStart; + INSTR_TIME_SET_CURRENT(connectionStart); /* parameters for sql query to be executed */ const int paramCount = 1; @@ -211,7 +211,7 @@ LockAcquireHelperMain(Datum main_arg) * the lock. */ do { - timeout = DeadlineTimestampTzToTimeout(deadline); + timeout = MillisecondsToTimeout(connectionStart, args->lock_cooldown); } while (timeout > 0 && ShouldAcquireLock(timeout)); /* connecting to the database */ @@ -304,18 +304,3 @@ LockAcquireHelperMain(Datum main_arg) /* safely got to the end, exit without problem */ proc_exit(0); } - - -/* - * DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse - * before the deadline provided as an argument will be reached. The outcome can be used to - * pass to the Wait of an EventSet to make sure it returns after the timeout has passed. - */ -static long -DeadlineTimestampTzToTimeout(TimestampTz deadline) -{ - long secs = 0; - int msecs = 0; - TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs); - return secs * 1000 + msecs / 1000; -} diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 2424dfd13..571e03c38 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -14,6 +14,7 @@ #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" +#include "portability/instr_time.h" #include "utils/guc.h" #include "utils/hsearch.h" #include "utils/timestamp.h" @@ -227,12 +228,14 @@ extern void FinishConnectionListEstablishment(List *multiConnectionList); extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); -extern long DeadlineTimestampTzToTimeout(TimestampTz deadline); /* dealing with notice handler */ extern void SetCitusNoticeProcessor(MultiConnection *connection); extern char * TrimLogLevel(const char *message); extern void UnsetCitusNoticeLevel(void); +/* time utilities */ +extern double MillisecondsPassedSince(instr_time moment); +extern long MillisecondsToTimeout(instr_time start, long msAfterStart); #endif /* CONNECTION_MANAGMENT_H */