Code cleanup of adaptive_executor, connection_management, placement_connection

adaptive_executor: sort includes, use foreach_ptr, remove lies from FinishDistributedExecution docs
connection_management: rename msecs, which isn't milliseconds
placement_connection: small typos
pull/3399/head
Philip Dubé 2020-01-15 21:02:37 +00:00
parent 5f34399e1f
commit fdcc413559
3 changed files with 50 additions and 87 deletions

View File

@ -845,9 +845,9 @@ long
DeadlineTimestampTzToTimeout(TimestampTz deadline) DeadlineTimestampTzToTimeout(TimestampTz deadline)
{ {
long secs = 0; long secs = 0;
int msecs = 0; int microsecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs); TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &microsecs);
return secs * 1000 + msecs / 1000; return secs * 1000 + microsecs / 1000;
} }

View File

@ -71,7 +71,7 @@ struct ColocatedPlacementsHashEntry;
* Hash table mapping placements to a list of connections. * Hash table mapping placements to a list of connections.
* *
* This stores a list of connections for each placement, because multiple * This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. a * connections to the same placement may exist at the same time. E.g. an
* adaptive executor query may reference the same placement in several * adaptive executor query may reference the same placement in several
* sub-tasks. * sub-tasks.
* *
@ -118,7 +118,7 @@ static HTAB *ConnectionPlacementHash;
* placements (i.e. the corresponding placements for different colocated * placements (i.e. the corresponding placements for different colocated
* distributed tables) need to share connections. Otherwise things like * distributed tables) need to share connections. Otherwise things like
* foreign keys can very easily lead to unprincipled deadlocks. This means * foreign keys can very easily lead to unprincipled deadlocks. This means
* that there can only one DML/DDL connection for a set of colocated * that there can only be one DML/DDL connection for a set of colocated
* placements. * placements.
* *
* A set of colocated placements is identified, besides node identifying * A set of colocated placements is identified, besides node identifying

View File

@ -131,10 +131,14 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/schemacmds.h"
#include "distributed/adaptive_executor.h"
#include "distributed/cancel_utils.h"
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h" #include "distributed/distributed_execution_locks.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -145,17 +149,14 @@
#include "distributed/placement_access.h" #include "distributed/placement_access.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/cancel_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/adaptive_executor.h" #include "distributed/worker_protocol.h"
#include "distributed/repartition_join_execution.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "commands/schemacmds.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/int8.h" #include "utils/int8.h"
@ -1352,8 +1353,6 @@ ReadOnlyTask(TaskType taskType)
static bool static bool
SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList)
{ {
ListCell *rtiLockCell = NULL;
if (modLevel != ROW_MODIFY_READONLY) if (modLevel != ROW_MODIFY_READONLY)
{ {
return false; return false;
@ -1366,9 +1365,9 @@ SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList)
} }
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
foreach(rtiLockCell, task->relationRowLockList) RelationRowLock *relationRowLock = NULL;
foreach_ptr(relationRowLock, task->relationRowLockList)
{ {
RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell);
Oid relationId = relationRowLock->relationId; Oid relationId = relationRowLock->relationId;
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
@ -1444,12 +1443,9 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
*/ */
if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)) if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList))
{ {
ListCell *taskCell = NULL; Task *task = NULL;
foreach_ptr(task, taskList)
foreach(taskCell, taskList)
{ {
Task *task = (Task *) lfirst(taskCell);
AcquireExecutorShardLocks(task, modLevel); AcquireExecutorShardLocks(task, modLevel);
} }
} }
@ -1462,8 +1458,7 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
/* /*
* FinishDistributedExecution cleans up resources associated with a * FinishDistributedExecution cleans up resources associated with a
* distributed execution. In particular, it releases connections and * distributed execution.
* clears their state.
*/ */
static void static void
FinishDistributedExecution(DistributedExecution *execution) FinishDistributedExecution(DistributedExecution *execution)
@ -1479,25 +1474,22 @@ FinishDistributedExecution(DistributedExecution *execution)
/* /*
* CleanUpSessions does any clean-up necessary for the session * CleanUpSessions does any clean-up necessary for the session used
* used during the execution. We only reach the function after * during the execution. We only reach the function after successfully
* successfully completing all the tasks and we expect no tasks * completing all the tasks and we expect no tasks are still in progress.
* are still in progress.
*/ */
static void static void
CleanUpSessions(DistributedExecution *execution) CleanUpSessions(DistributedExecution *execution)
{ {
List *sessionList = execution->sessionList; List *sessionList = execution->sessionList;
ListCell *sessionCell = NULL;
/* we get to this function only after successful executions */ /* we get to this function only after successful executions */
Assert(!execution->failed && execution->unfinishedTaskCount == 0); Assert(!execution->failed && execution->unfinishedTaskCount == 0);
/* always trigger wait event set in the first round */ /* always trigger wait event set in the first round */
foreach(sessionCell, sessionList) WorkerSession *session = NULL;
foreach_ptr(session, sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld",
@ -1574,10 +1566,9 @@ CleanUpSessions(DistributedExecution *execution)
static void static void
UnclaimAllSessionConnections(List *sessionList) UnclaimAllSessionConnections(List *sessionList)
{ {
ListCell *sessionCell = NULL; WorkerSession *session = NULL;
foreach(sessionCell, sessionList) foreach_ptr(session, sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
UnclaimConnection(connection); UnclaimConnection(connection);
@ -1598,13 +1589,9 @@ AssignTasksToConnections(DistributedExecution *execution)
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
bool hasReturning = execution->hasReturning; bool hasReturning = execution->hasReturning;
ListCell *taskCell = NULL; Task *task = NULL;
ListCell *sessionCell = NULL; foreach_ptr(task, taskList)
foreach(taskCell, taskList)
{ {
Task *task = (Task *) lfirst(taskCell);
ListCell *taskPlacementCell = NULL;
bool placementExecutionReady = true; bool placementExecutionReady = true;
int placementExecutionIndex = 0; int placementExecutionIndex = 0;
int placementExecutionCount = list_length(task->taskPlacementList); int placementExecutionCount = list_length(task->taskPlacementList);
@ -1626,9 +1613,9 @@ AssignTasksToConnections(DistributedExecution *execution)
(hasReturning && !task->partiallyLocalOrRemote) || (hasReturning && !task->partiallyLocalOrRemote) ||
modLevel == ROW_MODIFY_READONLY; modLevel == ROW_MODIFY_READONLY;
foreach(taskPlacementCell, task->taskPlacementList) ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = 0; int connectionFlags = 0;
char *nodeName = taskPlacement->nodeName; char *nodeName = taskPlacement->nodeName;
int nodePort = taskPlacement->nodePort; int nodePort = taskPlacement->nodePort;
@ -1753,9 +1740,9 @@ AssignTasksToConnections(DistributedExecution *execution)
* We need to do this after assigning tasks to connections because the same * We need to do this after assigning tasks to connections because the same
* connection may be be returned multiple times by GetPlacementListConnectionIfCached. * connection may be be returned multiple times by GetPlacementListConnectionIfCached.
*/ */
foreach(sessionCell, execution->sessionList) WorkerSession *session = NULL;
foreach_ptr(session, execution->sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
ClaimConnectionExclusively(connection); ClaimConnectionExclusively(connection);
@ -1834,12 +1821,8 @@ static WorkerPool *
FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort) FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort)
{ {
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
ListCell *workerCell = NULL; foreach_ptr(workerPool, execution->workerList)
foreach(workerCell, execution->workerList)
{ {
workerPool = lfirst(workerCell);
if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 && if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 &&
nodePort == workerPool->nodePort) nodePort == workerPool->nodePort)
{ {
@ -1875,14 +1858,11 @@ static WorkerSession *
FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
{ {
DistributedExecution *execution = workerPool->distributedExecution; DistributedExecution *execution = workerPool->distributedExecution;
WorkerSession *session = NULL;
ListCell *sessionCell = NULL;
static uint64 sessionId = 1; static uint64 sessionId = 1;
foreach(sessionCell, workerPool->sessionList) WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{ {
session = lfirst(sessionCell);
if (session->connection == connection) if (session->connection == connection)
{ {
return session; return session;
@ -1968,8 +1948,6 @@ static void
SequentialRunDistributedExecution(DistributedExecution *execution) SequentialRunDistributedExecution(DistributedExecution *execution)
{ {
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
ListCell *taskCell = NULL;
int connectionMode = MultiShardConnectionType; int connectionMode = MultiShardConnectionType;
/* /*
@ -1978,10 +1956,9 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
*/ */
MultiShardConnectionType = SEQUENTIAL_CONNECTION; MultiShardConnectionType = SEQUENTIAL_CONNECTION;
foreach(taskCell, taskList) Task *taskToExecute = NULL;
foreach_ptr(taskToExecute, taskList)
{ {
Task *taskToExecute = (Task *) lfirst(taskCell);
/* execute each task one by one */ /* execute each task one by one */
execution->tasksToExecute = list_make1(taskToExecute); execution->tasksToExecute = list_make1(taskToExecute);
execution->totalTaskCount = 1; execution->totalTaskCount = 1;
@ -2031,12 +2008,11 @@ RunDistributedExecution(DistributedExecution *execution)
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{ {
int eventIndex = 0; int eventIndex = 0;
ListCell *workerCell = NULL;
long timeout = NextEventTimeout(execution); long timeout = NextEventTimeout(execution);
foreach(workerCell, execution->workerList) WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList)
{ {
WorkerPool *workerPool = lfirst(workerCell);
ManageWorkerPool(workerPool); ManageWorkerPool(workerPool);
} }
@ -2418,14 +2394,12 @@ UsableConnectionCount(WorkerPool *workerPool)
static long static long
NextEventTimeout(DistributedExecution *execution) NextEventTimeout(DistributedExecution *execution)
{ {
ListCell *workerCell = NULL;
TimestampTz now = GetCurrentTimestamp(); TimestampTz now = GetCurrentTimestamp();
long eventTimeout = 1000; /* milliseconds */ long eventTimeout = 1000; /* milliseconds */
foreach(workerCell, execution->workerList) WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList)
{ {
WorkerPool *workerPool = (WorkerPool *) lfirst(workerCell);
if (workerPool->failed) if (workerPool->failed)
{ {
/* worker pool may have already timed out */ /* worker pool may have already timed out */
@ -2701,7 +2675,7 @@ HandleMultiConnectionSuccess(WorkerSession *session)
* BEGIN; * BEGIN;
* -- assume that the following INSERT goes to worker-A * -- assume that the following INSERT goes to worker-A
* -- also note that this single command does not activate * -- also note that this single command does not activate
* -- 2PC itself since it is a single shard mofication * -- 2PC itself since it is a single shard modification
* INSERT INTO distributed_table (dist_key) VALUES (1); * INSERT INTO distributed_table (dist_key) VALUES (1);
* *
* -- do one more single shard UPDATE hitting the same * -- do one more single shard UPDATE hitting the same
@ -3377,7 +3351,6 @@ WorkerPoolFailed(WorkerPool *workerPool)
{ {
bool succeeded = false; bool succeeded = false;
dlist_iter iter; dlist_iter iter;
ListCell *sessionCell = NULL;
/* a pool cannot fail multiple times */ /* a pool cannot fail multiple times */
Assert(!workerPool->failed); Assert(!workerPool->failed);
@ -3398,10 +3371,9 @@ WorkerPoolFailed(WorkerPool *workerPool)
PlacementExecutionDone(placementExecution, succeeded); PlacementExecutionDone(placementExecution, succeeded);
} }
foreach(sessionCell, workerPool->sessionList) WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
WorkerSessionFailed(session); WorkerSessionFailed(session);
} }
@ -3418,13 +3390,11 @@ WorkerPoolFailed(WorkerPool *workerPool)
*/ */
if (UseConnectionPerPlacement()) if (UseConnectionPerPlacement())
{ {
ListCell *workerCell = NULL;
List *workerList = workerPool->distributedExecution->workerList; List *workerList = workerPool->distributedExecution->workerList;
foreach(workerCell, workerList) WorkerPool *pool = NULL;
foreach_ptr(pool, workerList)
{ {
WorkerPool *pool = (WorkerPool *) lfirst(workerCell);
/* failed pools or pools without any connection attempts ignored */ /* failed pools or pools without any connection attempts ignored */
if (pool->failed || pool->poolStartTime == 0) if (pool->failed || pool->poolStartTime == 0)
{ {
@ -3679,8 +3649,6 @@ PlacementExecutionReady(TaskPlacementExecution *placementExecution)
} }
else else
{ {
ListCell *sessionCell = NULL;
if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY) if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY)
{ {
/* remove from not-ready task queue */ /* remove from not-ready task queue */
@ -3694,9 +3662,9 @@ PlacementExecutionReady(TaskPlacementExecution *placementExecution)
workerPool->readyTaskCount++; workerPool->readyTaskCount++;
/* wake up an idle connection by checking whether the connection is writeable */ /* wake up an idle connection by checking whether the connection is writeable */
foreach(sessionCell, workerPool->sessionList) WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
RemoteTransaction *transaction = &(connection->remoteTransaction); RemoteTransaction *transaction = &(connection->remoteTransaction);
RemoteTransactionState transactionState = transaction->transactionState; RemoteTransactionState transactionState = transaction->transactionState;
@ -3789,17 +3757,15 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
static WaitEventSet * static WaitEventSet *
BuildWaitEventSet(List *sessionList) BuildWaitEventSet(List *sessionList)
{ {
ListCell *sessionCell = NULL;
/* additional 2 is for postmaster and latch */ /* additional 2 is for postmaster and latch */
int eventSetSize = list_length(sessionList) + 2; int eventSetSize = list_length(sessionList) + 2;
WaitEventSet *waitEventSet = WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize); CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
foreach(sessionCell, sessionList) WorkerSession *session = NULL;
foreach_ptr(session, sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
if (connection->pgConn == NULL) if (connection->pgConn == NULL)
@ -3822,8 +3788,7 @@ BuildWaitEventSet(List *sessionList)
} }
int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags,
sock, sock, NULL, (void *) session);
NULL, (void *) session);
session->waitEventSetIndex = waitEventSetIndex; session->waitEventSetIndex = waitEventSetIndex;
} }
@ -3841,11 +3806,9 @@ BuildWaitEventSet(List *sessionList)
static void static void
UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
{ {
ListCell *sessionCell = NULL; WorkerSession *session = NULL;
foreach_ptr(session, sessionList)
foreach(sessionCell, sessionList)
{ {
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
int waitEventSetIndex = session->waitEventSetIndex; int waitEventSetIndex = session->waitEventSetIndex;