Merge pull request #4923 from citusdata/wait_until_connections_ready

Wait until all connections are successfully established
pull/4990/head
Önder Kalacı 2021-05-19 16:04:36 +02:00 committed by GitHub
commit 4d8e3969ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 3 deletions

View File

@ -479,6 +479,7 @@ bool EnableBinaryProtocol = false;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
int ExecutorSlowStartInterval = 10; int ExecutorSlowStartInterval = 10;
bool EnableCostBasedConnectionEstablishment = true; bool EnableCostBasedConnectionEstablishment = true;
bool PreventIncompleteConnectionEstablishment = true;
/* /*
@ -663,6 +664,7 @@ static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementEx
static bool SendNextQuery(TaskPlacementExecution *placementExecution, static bool SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session); WorkerSession *session);
static void ConnectionStateMachine(WorkerSession *session); static void ConnectionStateMachine(WorkerSession *session);
static bool HasUnfinishedTaskForSession(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session); static void HandleMultiConnectionSuccess(WorkerSession *session);
static bool HasAnyConnectionFailure(WorkerPool *workerPool); static bool HasAnyConnectionFailure(WorkerPool *workerPool);
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session); static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
@ -688,6 +690,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static int GetEventSetSize(List *sessionList); static int GetEventSetSize(List *sessionList);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static int RebuildWaitEventSet(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived); eventCount, bool *cancellationReceived);
@ -2288,7 +2291,26 @@ RunDistributedExecution(DistributedExecution *execution)
/* always (re)build the wait event set the first time */ /* always (re)build the wait event set the first time */
execution->rebuildWaitEventSet = true; execution->rebuildWaitEventSet = true;
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) /*
* Iterate until all the tasks are finished. Once all the tasks
* are finished, ensure that that all the connection initializations
* are also finished. Otherwise, those connections are terminated
* abruptly before they are established (or failed). Instead, we let
* the ConnectionStateMachine() to properly handle them.
*
* Note that we could have the connections that are not established
* as a side effect of slow-start algorithm. At the time the algorithm
* decides to establish new connections, the execution might have tasks
* to finish. But, the execution might finish before the new connections
* are established.
*
* Note that the rules explained above could be overriden by any
* cancellation to the query. In that case, we terminate the execution
* irrespective of the current status of the tasks or the connections.
*/
while (!cancellationReceived &&
(execution->unfinishedTaskCount > 0 ||
HasIncompleteConnectionEstablishment(execution)))
{ {
WorkerPool *workerPool = NULL; WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList) foreach_ptr(workerPool, execution->workerList)
@ -2370,6 +2392,33 @@ RunDistributedExecution(DistributedExecution *execution)
} }
/*
* HasIncompleteConnectionEstablishment returns true if any of the connections
* that has been initiated by the executor is in initilization stage.
*/
static bool
HasIncompleteConnectionEstablishment(DistributedExecution *execution)
{
if (!PreventIncompleteConnectionEstablishment)
{
return false;
}
WorkerSession *session = NULL;
foreach_ptr(session, execution->sessionList)
{
MultiConnection *connection = session->connection;
if (connection->connectionState == MULTI_CONNECTION_INITIAL ||
connection->connectionState == MULTI_CONNECTION_CONNECTING)
{
return true;
}
}
return false;
}
/* /*
* RebuildWaitEventSet updates the waitEventSet for the distributed execution. * RebuildWaitEventSet updates the waitEventSet for the distributed execution.
* This happens when the connection set for the distributed execution is changed, * This happens when the connection set for the distributed execution is changed,
@ -3241,8 +3290,32 @@ ConnectionStateMachine(WorkerSession *session)
case MULTI_CONNECTION_CONNECTED: case MULTI_CONNECTION_CONNECTED:
{ {
/* connection is ready, run the transaction state machine */ if (HasUnfinishedTaskForSession(session))
{
/*
* Connection is ready, and we have unfinished tasks.
* So, run the transaction state machine.
*/
TransactionStateMachine(session); TransactionStateMachine(session);
}
else
{
/*
* Connection is ready, but we don't have any unfinished
* tasks that this session can execute.
*
* Note that we can be in a situation where the executor
* decides to establish a connection, but not need to
* use it at the time the connection is established. This could
* happen when the earlier connections manages to finish all the
* tasks after this connection
*
* As no tasks are ready to be executed at the moment, we
* mark the socket readable to get any notices if exists.
*/
UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE);
}
break; break;
} }
@ -3357,6 +3430,41 @@ ConnectionStateMachine(WorkerSession *session)
} }
/*
* HasUnfinishedTaskForSession gets a session and returns true if there
* are any tasks that this session can execute.
*/
static bool
HasUnfinishedTaskForSession(WorkerSession *session)
{
if (session->currentTask != NULL)
{
/* the session is executing a command right now */
return true;
}
dlist_head *sessionReadyTaskQueue = &(session->readyTaskQueue);
if (!dlist_is_empty(sessionReadyTaskQueue))
{
/* session has an assigned task, which is ready for execution */
return true;
}
WorkerPool *workerPool = session->workerPool;
dlist_head *poolReadyTaskQueue = &(workerPool->readyTaskQueue);
if (!dlist_is_empty(poolReadyTaskQueue))
{
/*
* Pool has unassigned tasks that can be executed
* by the input session.
*/
return true;
}
return false;
}
/* /*
* HandleMultiConnectionSuccess logs the established connection and updates * HandleMultiConnectionSuccess logs the established connection and updates
* connection's state. * connection's state.

View File

@ -1387,6 +1387,21 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_KB | GUC_STANDARD, GUC_UNIT_KB | GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.prevent_incomplete_connection_establishment",
gettext_noop("When enabled, the executor waits until all the connections "
"are successfully established."),
gettext_noop("Under some load, the executor may decide to establish some "
"extra connections to further parallelize the execution. However,"
"before the connection establishment is done, the execution might "
"have already finished. When this GUC is set to true, the execution "
"waits for such connections to be established."),
&PreventIncompleteConnectionEstablishment,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomEnumVariable( DefineCustomEnumVariable(
"citus.propagate_set_commands", "citus.propagate_set_commands",
gettext_noop("Sets which SET commands are propagated to workers."), gettext_noop("Sets which SET commands are propagated to workers."),

View File

@ -12,6 +12,7 @@ extern bool EnableBinaryProtocol;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment; extern bool EnableCostBasedConnectionEstablishment;
extern bool PreventIncompleteConnectionEstablishment;
extern bool ShouldRunTasksSequentially(List *taskList); extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);