apply review items

pull/3491/head
SaitTalhaNisanci 2020-02-12 16:52:58 +03:00
parent 1b78045867
commit 9302e6e699
1 changed files with 19 additions and 23 deletions

View File

@ -576,7 +576,7 @@ static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList);
static bool TaskListRequires2PC(List *taskList); static bool TaskListRequires2PC(List *taskList);
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList); static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
@ -590,7 +590,7 @@ static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
@ -619,9 +619,9 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static int GetEventSetSize(List *sessionList); static int GetEventSetSize(List *sessionList);
static int UpdateWaitEventSet(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution);
static bool ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount); eventCount, bool *cancellationReceived);
/* /*
* AdaptiveExecutor is called via CitusExecScan on the * AdaptiveExecutor is called via CitusExecScan on the
@ -1589,7 +1589,7 @@ UnclaimAllSessionConnections(List *sessionList)
* the task placement executions to the assigned task queue of the connection. * the task placement executions to the assigned task queue of the connection.
*/ */
static void static void
AssignTasksToConnectionsOrWorkerPool (DistributedExecution *execution) AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
{ {
RowModifyLevel modLevel = execution->modLevel; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
@ -2005,7 +2005,7 @@ RunDistributedExecution(DistributedExecution *execution)
{ {
WaitEvent *events = NULL; WaitEvent *events = NULL;
AssignTasksToConnectionsOrWorkerPool (execution); AssignTasksToConnectionsOrWorkerPool(execution);
PG_TRY(); PG_TRY();
{ {
@ -2037,23 +2037,20 @@ RunDistributedExecution(DistributedExecution *execution)
pfree(events); pfree(events);
events = NULL; events = NULL;
} }
eventSetSize = UpdateWaitEventSet(execution); eventSetSize = RebuildWaitEventSet(execution);
events = palloc0(eventSetSize * sizeof(WaitEvent)); events = palloc0(eventSetSize * sizeof(WaitEvent));
} }
else if (execution->waitFlagsChanged) else if (execution->waitFlagsChanged)
{ {
UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
} }
/* wait for I/O events */ /* wait for I/O events */
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
if (!ProcessWaitEvents(execution, events, eventCount)) ProcessWaitEvents(execution, events, eventCount, &cancellationReceived);
{
cancellationReceived = true;
}
} }
if (events != NULL) if (events != NULL)
@ -2096,13 +2093,13 @@ RunDistributedExecution(DistributedExecution *execution)
/* /*
* UpdateWaitEventSet updates the waitEventSet for the distributed execution. * RebuildWaitEventSet updates the waitEventSet for the distributed execution.
* This happens when the connection set for the distributed execution is changed, * This happens when the connection set for the distributed execution is changed,
* which means that we need to update which connections we wait on for events. * which means that we need to update which connections we wait on for events.
* It returns the new event set size. * It returns the new event set size.
*/ */
static int static int
UpdateWaitEventSet(DistributedExecution *execution) RebuildWaitEventSet(DistributedExecution *execution)
{ {
if (execution->waitEventSet != NULL) if (execution->waitEventSet != NULL)
{ {
@ -2120,10 +2117,10 @@ UpdateWaitEventSet(DistributedExecution *execution)
/* /*
* ProcessWaitEvents processes the received events from connections. * ProcessWaitEvents processes the received events from connections.
* It returns true if all events are processed, otherwise false.
*/ */
static bool static void
ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount) ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount,
bool *cancellationReceived)
{ {
int eventIndex = 0; int eventIndex = 0;
@ -2153,7 +2150,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
* We cannot use "return" here inside a PG_TRY() block since * We cannot use "return" here inside a PG_TRY() block since
* then the exception stack won't be reset. * then the exception stack won't be reset.
*/ */
return false; *cancellationReceived = true;
} }
continue; continue;
@ -2164,7 +2161,6 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
return true;
} }
@ -3835,7 +3831,7 @@ BuildWaitEventSet(List *sessionList)
/* /*
* GetEventSetSize returns the event set size. * GetEventSetSize returns the event set size for a list of sessions.
*/ */
static int static int
GetEventSetSize(List *sessionList) GetEventSetSize(List *sessionList)
@ -3846,11 +3842,11 @@ GetEventSetSize(List *sessionList)
/* /*
* UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags * RebuildWaitEventSetFlags modifies the given waitEventSet with the wait flags
* for connections in the sessionList. * for connections in the sessionList.
*/ */
static void static void
UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
{ {
WorkerSession *session = NULL; WorkerSession *session = NULL;
foreach_ptr(session, sessionList) foreach_ptr(session, sessionList)