diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 54ae7c625..6e2182b1e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -479,6 +479,7 @@ bool EnableBinaryProtocol = false; /* GUC, number of ms to wait between opening connections to the same worker */ int ExecutorSlowStartInterval = 10; bool EnableCostBasedConnectionEstablishment = true; +bool PreventIncompleteConnectionEstablishment = true; /* @@ -663,6 +664,7 @@ static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementEx static bool SendNextQuery(TaskPlacementExecution *placementExecution, WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session); +static bool HasUnfinishedTaskForSession(WorkerSession *session); static void HandleMultiConnectionSuccess(WorkerSession *session); static bool HasAnyConnectionFailure(WorkerPool *workerPool); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); @@ -688,6 +690,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); +static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); @@ -2288,7 +2291,26 @@ RunDistributedExecution(DistributedExecution *execution) /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; - while (execution->unfinishedTaskCount > 0 && !cancellationReceived) + /* + * Iterate until all the tasks are finished. Once all the tasks + * are finished, ensure that that all the connection initializations + * are also finished. Otherwise, those connections are terminated + * abruptly before they are established (or failed). Instead, we let + * the ConnectionStateMachine() to properly handle them. + * + * Note that we could have the connections that are not established + * as a side effect of slow-start algorithm. At the time the algorithm + * decides to establish new connections, the execution might have tasks + * to finish. But, the execution might finish before the new connections + * are established. + * + * Note that the rules explained above could be overriden by any + * cancellation to the query. In that case, we terminate the execution + * irrespective of the current status of the tasks or the connections. + */ + while (!cancellationReceived && + (execution->unfinishedTaskCount > 0 || + HasIncompleteConnectionEstablishment(execution))) { WorkerPool *workerPool = NULL; foreach_ptr(workerPool, execution->workerList) @@ -2370,6 +2392,33 @@ RunDistributedExecution(DistributedExecution *execution) } +/* + * HasIncompleteConnectionEstablishment returns true if any of the connections + * that has been initiated by the executor is in initilization stage. + */ +static bool +HasIncompleteConnectionEstablishment(DistributedExecution *execution) +{ + if (!PreventIncompleteConnectionEstablishment) + { + return false; + } + + WorkerSession *session = NULL; + foreach_ptr(session, execution->sessionList) + { + MultiConnection *connection = session->connection; + if (connection->connectionState == MULTI_CONNECTION_INITIAL || + connection->connectionState == MULTI_CONNECTION_CONNECTING) + { + return true; + } + } + + return false; +} + + /* * RebuildWaitEventSet updates the waitEventSet for the distributed execution. * This happens when the connection set for the distributed execution is changed, @@ -3241,8 +3290,32 @@ ConnectionStateMachine(WorkerSession *session) case MULTI_CONNECTION_CONNECTED: { - /* connection is ready, run the transaction state machine */ - TransactionStateMachine(session); + if (HasUnfinishedTaskForSession(session)) + { + /* + * Connection is ready, and we have unfinished tasks. + * So, run the transaction state machine. + */ + TransactionStateMachine(session); + } + else + { + /* + * Connection is ready, but we don't have any unfinished + * tasks that this session can execute. + * + * Note that we can be in a situation where the executor + * decides to establish a connection, but not need to + * use it at the time the connection is established. This could + * happen when the earlier connections manages to finish all the + * tasks after this connection + * + * As no tasks are ready to be executed at the moment, we + * mark the socket readable to get any notices if exists. + */ + UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE); + } + break; } @@ -3357,6 +3430,41 @@ ConnectionStateMachine(WorkerSession *session) } +/* + * HasUnfinishedTaskForSession gets a session and returns true if there + * are any tasks that this session can execute. + */ +static bool +HasUnfinishedTaskForSession(WorkerSession *session) +{ + if (session->currentTask != NULL) + { + /* the session is executing a command right now */ + return true; + } + + dlist_head *sessionReadyTaskQueue = &(session->readyTaskQueue); + if (!dlist_is_empty(sessionReadyTaskQueue)) + { + /* session has an assigned task, which is ready for execution */ + return true; + } + + WorkerPool *workerPool = session->workerPool; + dlist_head *poolReadyTaskQueue = &(workerPool->readyTaskQueue); + if (!dlist_is_empty(poolReadyTaskQueue)) + { + /* + * Pool has unassigned tasks that can be executed + * by the input session. + */ + return true; + } + + return false; +} + + /* * HandleMultiConnectionSuccess logs the established connection and updates * connection's state. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ad59e524c..f457fd0e2 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1387,6 +1387,21 @@ RegisterCitusConfigVariables(void) GUC_UNIT_KB | GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.prevent_incomplete_connection_establishment", + gettext_noop("When enabled, the executor waits until all the connections " + "are successfully established."), + gettext_noop("Under some load, the executor may decide to establish some " + "extra connections to further parallelize the execution. However," + "before the connection establishment is done, the execution might " + "have already finished. When this GUC is set to true, the execution " + "waits for such connections to be established."), + &PreventIncompleteConnectionEstablishment, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.propagate_set_commands", gettext_noop("Sets which SET commands are propagated to workers."), diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 111d886c0..0a3768177 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -12,6 +12,7 @@ extern bool EnableBinaryProtocol; /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; extern bool EnableCostBasedConnectionEstablishment; +extern bool PreventIncompleteConnectionEstablishment; extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);