Merge pull request #3494 from citusdata/use-instr-time

Prefer instr_time to TimestampTz when we want CLOCK_MONOTONIC
pull/3502/head
Philip Dubé 2020-02-19 00:42:52 +00:00 committed by GitHub
commit d66f011f71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 62 deletions

View File

@ -29,6 +29,7 @@
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "mb/pg_wchar.h"
#include "portability/instr_time.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@ -680,9 +681,9 @@ MultiConnectionStateEventMask(MultiConnectionPollState *connectionState)
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
const TimestampTz connectionStart = GetCurrentTimestamp();
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart,
NodeConnectionTimeout);
instr_time connectionStart;
INSTR_TIME_SET_CURRENT(connectionStart);
List *connectionStates = NULL;
ListCell *multiConnectionCell = NULL;
@ -729,7 +730,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
ALLOCSET_DEFAULT_SIZES));
while (waitCount > 0)
{
long timeout = DeadlineTimestampTzToTimeout(deadline);
long timeout = MillisecondsToTimeout(connectionStart, NodeConnectionTimeout);
if (waitEventSetRebuild)
{
@ -812,8 +813,7 @@ FinishConnectionListEstablishment(List *multiConnectionList)
* connectionStart and if passed close all non-finished connections
*/
TimestampTz now = GetCurrentTimestamp();
if (TimestampDifferenceExceeds(connectionStart, now, NodeConnectionTimeout))
if (MillisecondsPassedSince(connectionStart) >= NodeConnectionTimeout)
{
/*
* showing as a warning, can't be an error. In some cases queries can
@ -837,17 +837,28 @@ FinishConnectionListEstablishment(List *multiConnectionList)
/*
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse
* before the deadline provided as an argument will be reached. The outcome can be used to
* MillisecondsPassedSince returns the number of milliseconds elapsed between an
* instr_time & the current time.
*/
double
MillisecondsPassedSince(instr_time moment)
{
instr_time timeSinceMoment;
INSTR_TIME_SET_CURRENT(timeSinceMoment);
INSTR_TIME_SUBTRACT(timeSinceMoment, moment);
return INSTR_TIME_GET_MILLISEC(timeSinceMoment);
}
/*
* MillisecondsToTimeout returns the numer of milliseconds that still need to elapse
* before msAfterStart milliseconds have passed since start. The outcome can be used to
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
*/
long
DeadlineTimestampTzToTimeout(TimestampTz deadline)
MillisecondsToTimeout(instr_time start, long msAfterStart)
{
long secs = 0;
int microsecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &microsecs);
return secs * 1000 + microsecs / 1000;
return msAfterStart - MillisecondsPassedSince(start);
}

View File

@ -157,6 +157,7 @@
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "lib/ilist.h"
#include "portability/instr_time.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/int8.h"
@ -362,13 +363,13 @@ typedef struct WorkerPool
* We keep this for enforcing the connection timeouts. In our definition, a pool
* starts when the first connection establishment starts.
*/
TimestampTz poolStartTime;
instr_time poolStartTime;
/* indicates whether to check for the connection timeout */
bool checkForPoolTimeout;
/* last time we opened a connection */
TimestampTz lastConnectionOpenTime;
instr_time lastConnectionOpenTime;
/* maximum number of connections we are allowed to open at once */
uint32 maxNewConnectionsPerCycle;
@ -588,7 +589,6 @@ static void ManageWorkerPool(WorkerPool *workerPool);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
@ -622,6 +622,7 @@ static int GetEventSetSize(List *sessionList);
static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
/*
* AdaptiveExecutor is called via CitusExecScan on the
@ -1845,7 +1846,7 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
workerPool = (WorkerPool *) palloc0(sizeof(WorkerPool));
workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort;
workerPool->poolStartTime = 0;
INSTR_TIME_SET_ZERO(workerPool->poolStartTime);
workerPool->distributedExecution = execution;
/* "open" connections aggressively when there are cached connections */
@ -1905,7 +1906,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
*/
if (list_length(workerPool->sessionList) == 0)
{
workerPool->poolStartTime = GetCurrentTimestamp();
INSTR_TIME_SET_CURRENT(workerPool->poolStartTime);
workerPool->checkForPoolTimeout = true;
}
@ -2243,10 +2244,8 @@ ManageWorkerPool(WorkerPool *workerPool)
if (newConnectionCount > 0 && ExecutorSlowStartInterval > 0)
{
TimestampTz now = GetCurrentTimestamp();
if (TimestampDifferenceExceeds(workerPool->lastConnectionOpenTime, now,
ExecutorSlowStartInterval))
if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
ExecutorSlowStartInterval)
{
newConnectionCount = Min(newConnectionCount,
workerPool->maxNewConnectionsPerCycle);
@ -2307,7 +2306,7 @@ ManageWorkerPool(WorkerPool *workerPool)
ConnectionStateMachine(session);
}
workerPool->lastConnectionOpenTime = GetCurrentTimestamp();
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->connectionSetChanged = true;
}
@ -2329,8 +2328,9 @@ static void
CheckConnectionTimeout(WorkerPool *workerPool)
{
DistributedExecution *execution = workerPool->distributedExecution;
TimestampTz poolStartTime = workerPool->poolStartTime;
TimestampTz now = GetCurrentTimestamp();
instr_time poolStartTime = workerPool->poolStartTime;
instr_time now;
INSTR_TIME_SET_CURRENT(now);
int initiatedConnectionCount = list_length(workerPool->sessionList);
int activeConnectionCount = workerPool->activeConnectionCount;
@ -2339,7 +2339,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
if (initiatedConnectionCount == 0)
{
/* no connection has been planned for the pool yet */
Assert(poolStartTime == 0);
Assert(INSTR_TIME_IS_ZERO(poolStartTime));
return;
}
@ -2354,7 +2354,7 @@ CheckConnectionTimeout(WorkerPool *workerPool)
requiredActiveConnectionCount = initiatedConnectionCount;
}
if (TimestampDifferenceExceeds(poolStartTime, now, NodeConnectionTimeout))
if (MillisecondsBetweenTimestamps(poolStartTime, now) >= NodeConnectionTimeout)
{
if (activeConnectionCount < requiredActiveConnectionCount)
{
@ -2425,7 +2425,8 @@ UsableConnectionCount(WorkerPool *workerPool)
static long
NextEventTimeout(DistributedExecution *execution)
{
TimestampTz now = GetCurrentTimestamp();
instr_time now;
INSTR_TIME_SET_CURRENT(now);
long eventTimeout = 1000; /* milliseconds */
WorkerPool *workerPool = NULL;
@ -2437,7 +2438,8 @@ NextEventTimeout(DistributedExecution *execution)
continue;
}
if (workerPool->poolStartTime != 0 && workerPool->checkForPoolTimeout)
if (!INSTR_TIME_IS_ZERO(workerPool->poolStartTime) &&
workerPool->checkForPoolTimeout)
{
long timeSincePoolStartMs =
MillisecondsBetweenTimestamps(workerPool->poolStartTime, now);
@ -2488,14 +2490,10 @@ NextEventTimeout(DistributedExecution *execution)
* long.
*/
static long
MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime)
MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime)
{
long secs = 0;
int micros = 0;
TimestampDifference(startTime, endTime, &secs, &micros);
return secs * 1000 + micros / 1000;
INSTR_TIME_SUBTRACT(endTime, startTime);
return INSTR_TIME_GET_MILLISEC(endTime);
}
@ -3427,7 +3425,7 @@ WorkerPoolFailed(WorkerPool *workerPool)
foreach_ptr(pool, workerList)
{
/* failed pools or pools without any connection attempts ignored */
if (pool->failed || pool->poolStartTime == 0)
if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime))
{
continue;
}
@ -3436,7 +3434,7 @@ WorkerPoolFailed(WorkerPool *workerPool)
* This should give another NodeConnectionTimeout until all
* the necessary connections are established.
*/
pool->poolStartTime = GetCurrentTimestamp();
INSTR_TIME_SET_CURRENT(pool->poolStartTime);
pool->checkForPoolTimeout = true;
}
}

View File

@ -212,9 +212,7 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
es->indent += 3;
}
/* set the planning time to 0 */
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planduration);
INSTR_TIME_SET_ZERO(planduration);
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration);

View File

@ -28,11 +28,13 @@
#include "executor/spi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "portability/instr_time.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "utils/snapmgr.h"
#include "distributed/citus_acquire_lock.h"
#include "distributed/connection_management.h"
#include "distributed/version_compat.h"
/* forward declaration of background worker entrypoint */
@ -41,7 +43,6 @@ extern void LockAcquireHelperMain(Datum main_arg);
/* forward declaration of helper functions */
static void lock_acquire_helper_sigterm(SIGNAL_ARGS);
static void EnsureStopLockAcquireHelper(void *arg);
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
/* LockAcquireHelperArgs contains extra arguments to be used to start the worker */
typedef struct LockAcquireHelperArgs
@ -188,9 +189,8 @@ LockAcquireHelperMain(Datum main_arg)
StringInfoData sql;
LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra;
long timeout = 0;
const TimestampTz connectionStart = GetCurrentTimestamp();
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart,
args->lock_cooldown);
instr_time connectionStart;
INSTR_TIME_SET_CURRENT(connectionStart);
/* parameters for sql query to be executed */
const int paramCount = 1;
@ -211,7 +211,7 @@ LockAcquireHelperMain(Datum main_arg)
* the lock.
*/
do {
timeout = DeadlineTimestampTzToTimeout(deadline);
timeout = MillisecondsToTimeout(connectionStart, args->lock_cooldown);
} while (timeout > 0 && ShouldAcquireLock(timeout));
/* connecting to the database */
@ -304,18 +304,3 @@ LockAcquireHelperMain(Datum main_arg)
/* safely got to the end, exit without problem */
proc_exit(0);
}
/*
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse
* before the deadline provided as an argument will be reached. The outcome can be used to
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
*/
static long
DeadlineTimestampTzToTimeout(TimestampTz deadline)
{
long secs = 0;
int msecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs);
return secs * 1000 + msecs / 1000;
}

View File

@ -14,6 +14,7 @@
#include "distributed/transaction_management.h"
#include "distributed/remote_transaction.h"
#include "lib/ilist.h"
#include "portability/instr_time.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
@ -227,12 +228,14 @@ extern void FinishConnectionListEstablishment(List *multiConnectionList);
extern void FinishConnectionEstablishment(MultiConnection *connection);
extern void ClaimConnectionExclusively(MultiConnection *connection);
extern void UnclaimConnection(MultiConnection *connection);
extern long DeadlineTimestampTzToTimeout(TimestampTz deadline);
/* dealing with notice handler */
extern void SetCitusNoticeProcessor(MultiConnection *connection);
extern char * TrimLogLevel(const char *message);
extern void UnsetCitusNoticeLevel(void);
/* time utilities */
extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
#endif /* CONNECTION_MANAGMENT_H */