diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index e0fca3533..bb2352af5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -845,9 +845,9 @@ long DeadlineTimestampTzToTimeout(TimestampTz deadline) { long secs = 0; - int msecs = 0; - TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs); - return secs * 1000 + msecs / 1000; + int microsecs = 0; + TimestampDifference(GetCurrentTimestamp(), deadline, &secs, µsecs); + return secs * 1000 + microsecs / 1000; } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 0db02f57c..cb13ebfcc 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -71,7 +71,7 @@ struct ColocatedPlacementsHashEntry; * Hash table mapping placements to a list of connections. * * This stores a list of connections for each placement, because multiple - * connections to the same placement may exist at the same time. E.g. a + * connections to the same placement may exist at the same time. E.g. an * adaptive executor query may reference the same placement in several * sub-tasks. * @@ -118,7 +118,7 @@ static HTAB *ConnectionPlacementHash; * placements (i.e. the corresponding placements for different colocated * distributed tables) need to share connections. Otherwise things like * foreign keys can very easily lead to unprincipled deadlocks. This means - * that there can only one DML/DDL connection for a set of colocated + * that there can only be one DML/DDL connection for a set of colocated * placements. * * A set of colocated placements is identified, besides node identifying diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ca6bf1931..f77774403 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -131,10 +131,14 @@ #include "access/xact.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" +#include "commands/schemacmds.h" +#include "distributed/adaptive_executor.h" +#include "distributed/cancel_utils.h" #include "distributed/citus_custom_scan.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" +#include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_executor.h" @@ -145,17 +149,14 @@ #include "distributed/placement_access.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" -#include "distributed/cancel_utils.h" #include "distributed/remote_commands.h" +#include "distributed/repartition_join_execution.h" #include "distributed/resource_lock.h" #include "distributed/subplan_execution.h" #include "distributed/transaction_management.h" -#include "distributed/worker_protocol.h" #include "distributed/version_compat.h" -#include "distributed/adaptive_executor.h" -#include "distributed/repartition_join_execution.h" +#include "distributed/worker_protocol.h" #include "lib/ilist.h" -#include "commands/schemacmds.h" #include "storage/fd.h" #include "storage/latch.h" #include "utils/int8.h" @@ -1352,8 +1353,6 @@ ReadOnlyTask(TaskType taskType) static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) { - ListCell *rtiLockCell = NULL; - if (modLevel != ROW_MODIFY_READONLY) { return false; @@ -1366,9 +1365,9 @@ SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) } Task *task = (Task *) linitial(taskList); - foreach(rtiLockCell, task->relationRowLockList) + RelationRowLock *relationRowLock = NULL; + foreach_ptr(relationRowLock, task->relationRowLockList) { - RelationRowLock *relationRowLock = (RelationRowLock *) lfirst(rtiLockCell); Oid relationId = relationRowLock->relationId; if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) @@ -1444,12 +1443,9 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) */ if (list_length(taskList) == 1 || ShouldRunTasksSequentially(taskList)) { - ListCell *taskCell = NULL; - - foreach(taskCell, taskList) + Task *task = NULL; + foreach_ptr(task, taskList) { - Task *task = (Task *) lfirst(taskCell); - AcquireExecutorShardLocks(task, modLevel); } } @@ -1462,8 +1458,7 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) /* * FinishDistributedExecution cleans up resources associated with a - * distributed execution. In particular, it releases connections and - * clears their state. + * distributed execution. */ static void FinishDistributedExecution(DistributedExecution *execution) @@ -1479,25 +1474,22 @@ FinishDistributedExecution(DistributedExecution *execution) /* - * CleanUpSessions does any clean-up necessary for the session - * used during the execution. We only reach the function after - * successfully completing all the tasks and we expect no tasks - * are still in progress. + * CleanUpSessions does any clean-up necessary for the session used + * during the execution. We only reach the function after successfully + * completing all the tasks and we expect no tasks are still in progress. */ static void CleanUpSessions(DistributedExecution *execution) { List *sessionList = execution->sessionList; - ListCell *sessionCell = NULL; /* we get to this function only after successful executions */ Assert(!execution->failed && execution->unfinishedTaskCount == 0); /* always trigger wait event set in the first round */ - foreach(sessionCell, sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) { - WorkerSession *session = lfirst(sessionCell); - MultiConnection *connection = session->connection; ereport(DEBUG4, (errmsg("Total number of commands sent over the session %ld: %ld", @@ -1574,10 +1566,9 @@ CleanUpSessions(DistributedExecution *execution) static void UnclaimAllSessionConnections(List *sessionList) { - ListCell *sessionCell = NULL; - foreach(sessionCell, sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) { - WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; UnclaimConnection(connection); @@ -1598,13 +1589,9 @@ AssignTasksToConnections(DistributedExecution *execution) List *taskList = execution->tasksToExecute; bool hasReturning = execution->hasReturning; - ListCell *taskCell = NULL; - ListCell *sessionCell = NULL; - - foreach(taskCell, taskList) + Task *task = NULL; + foreach_ptr(task, taskList) { - Task *task = (Task *) lfirst(taskCell); - ListCell *taskPlacementCell = NULL; bool placementExecutionReady = true; int placementExecutionIndex = 0; int placementExecutionCount = list_length(task->taskPlacementList); @@ -1626,9 +1613,9 @@ AssignTasksToConnections(DistributedExecution *execution) (hasReturning && !task->partiallyLocalOrRemote) || modLevel == ROW_MODIFY_READONLY; - foreach(taskPlacementCell, task->taskPlacementList) + ShardPlacement *taskPlacement = NULL; + foreach_ptr(taskPlacement, task->taskPlacementList) { - ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); int connectionFlags = 0; char *nodeName = taskPlacement->nodeName; int nodePort = taskPlacement->nodePort; @@ -1753,9 +1740,9 @@ AssignTasksToConnections(DistributedExecution *execution) * We need to do this after assigning tasks to connections because the same * connection may be be returned multiple times by GetPlacementListConnectionIfCached. */ - foreach(sessionCell, execution->sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, execution->sessionList) { - WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; ClaimConnectionExclusively(connection); @@ -1834,12 +1821,8 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort) { WorkerPool *workerPool = NULL; - ListCell *workerCell = NULL; - - foreach(workerCell, execution->workerList) + foreach_ptr(workerPool, execution->workerList) { - workerPool = lfirst(workerCell); - if (strncmp(nodeName, workerPool->nodeName, WORKER_LENGTH) == 0 && nodePort == workerPool->nodePort) { @@ -1875,14 +1858,11 @@ static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) { DistributedExecution *execution = workerPool->distributedExecution; - WorkerSession *session = NULL; - ListCell *sessionCell = NULL; static uint64 sessionId = 1; - foreach(sessionCell, workerPool->sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) { - session = lfirst(sessionCell); - if (session->connection == connection) { return session; @@ -1968,8 +1948,6 @@ static void SequentialRunDistributedExecution(DistributedExecution *execution) { List *taskList = execution->tasksToExecute; - - ListCell *taskCell = NULL; int connectionMode = MultiShardConnectionType; /* @@ -1978,10 +1956,9 @@ SequentialRunDistributedExecution(DistributedExecution *execution) */ MultiShardConnectionType = SEQUENTIAL_CONNECTION; - foreach(taskCell, taskList) + Task *taskToExecute = NULL; + foreach_ptr(taskToExecute, taskList) { - Task *taskToExecute = (Task *) lfirst(taskCell); - /* execute each task one by one */ execution->tasksToExecute = list_make1(taskToExecute); execution->totalTaskCount = 1; @@ -2031,12 +2008,11 @@ RunDistributedExecution(DistributedExecution *execution) while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { int eventIndex = 0; - ListCell *workerCell = NULL; long timeout = NextEventTimeout(execution); - foreach(workerCell, execution->workerList) + WorkerPool *workerPool = NULL; + foreach_ptr(workerPool, execution->workerList) { - WorkerPool *workerPool = lfirst(workerCell); ManageWorkerPool(workerPool); } @@ -2418,14 +2394,12 @@ UsableConnectionCount(WorkerPool *workerPool) static long NextEventTimeout(DistributedExecution *execution) { - ListCell *workerCell = NULL; TimestampTz now = GetCurrentTimestamp(); long eventTimeout = 1000; /* milliseconds */ - foreach(workerCell, execution->workerList) + WorkerPool *workerPool = NULL; + foreach_ptr(workerPool, execution->workerList) { - WorkerPool *workerPool = (WorkerPool *) lfirst(workerCell); - if (workerPool->failed) { /* worker pool may have already timed out */ @@ -2701,7 +2675,7 @@ HandleMultiConnectionSuccess(WorkerSession *session) * BEGIN; * -- assume that the following INSERT goes to worker-A * -- also note that this single command does not activate - * -- 2PC itself since it is a single shard mofication + * -- 2PC itself since it is a single shard modification * INSERT INTO distributed_table (dist_key) VALUES (1); * * -- do one more single shard UPDATE hitting the same @@ -3377,7 +3351,6 @@ WorkerPoolFailed(WorkerPool *workerPool) { bool succeeded = false; dlist_iter iter; - ListCell *sessionCell = NULL; /* a pool cannot fail multiple times */ Assert(!workerPool->failed); @@ -3398,10 +3371,9 @@ WorkerPoolFailed(WorkerPool *workerPool) PlacementExecutionDone(placementExecution, succeeded); } - foreach(sessionCell, workerPool->sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) { - WorkerSession *session = lfirst(sessionCell); - WorkerSessionFailed(session); } @@ -3418,13 +3390,11 @@ WorkerPoolFailed(WorkerPool *workerPool) */ if (UseConnectionPerPlacement()) { - ListCell *workerCell = NULL; List *workerList = workerPool->distributedExecution->workerList; - foreach(workerCell, workerList) + WorkerPool *pool = NULL; + foreach_ptr(pool, workerList) { - WorkerPool *pool = (WorkerPool *) lfirst(workerCell); - /* failed pools or pools without any connection attempts ignored */ if (pool->failed || pool->poolStartTime == 0) { @@ -3679,8 +3649,6 @@ PlacementExecutionReady(TaskPlacementExecution *placementExecution) } else { - ListCell *sessionCell = NULL; - if (placementExecution->executionState == PLACEMENT_EXECUTION_NOT_READY) { /* remove from not-ready task queue */ @@ -3694,9 +3662,9 @@ PlacementExecutionReady(TaskPlacementExecution *placementExecution) workerPool->readyTaskCount++; /* wake up an idle connection by checking whether the connection is writeable */ - foreach(sessionCell, workerPool->sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, workerPool->sessionList) { - WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; RemoteTransaction *transaction = &(connection->remoteTransaction); RemoteTransactionState transactionState = transaction->transactionState; @@ -3789,17 +3757,15 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution) static WaitEventSet * BuildWaitEventSet(List *sessionList) { - ListCell *sessionCell = NULL; - /* additional 2 is for postmaster and latch */ int eventSetSize = list_length(sessionList) + 2; WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); - foreach(sessionCell, sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) { - WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; if (connection->pgConn == NULL) @@ -3822,8 +3788,7 @@ BuildWaitEventSet(List *sessionList) } int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, - sock, - NULL, (void *) session); + sock, NULL, (void *) session); session->waitEventSetIndex = waitEventSetIndex; } @@ -3841,11 +3806,9 @@ BuildWaitEventSet(List *sessionList) static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) { - ListCell *sessionCell = NULL; - - foreach(sessionCell, sessionList) + WorkerSession *session = NULL; + foreach_ptr(session, sessionList) { - WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; int waitEventSetIndex = session->waitEventSetIndex;