mirror of https://github.com/citusdata/citus.git
Wait until all connections are successfully established
Comment from the code: /* * Iterate until all the tasks are finished. Once all the tasks * are finished, ensure that that all the connection initializations * are also finished. Otherwise, those connections are terminated * abruptly before they are established (or failed). Instead, we let * the ConnectionStateMachine() to properly handle them. * * Note that we could have the connections that are not established * as a side effect of slow-start algorithm. At the time the algorithm * decides to establish new connections, the execution might have tasks * to finish. But, the execution might finish before the new connections * are established. */ Note that the abruptly terminated connections lead to the following errors: 2020-11-16 21:09:09.800 CET [16633] LOG: could not accept SSL connection: Connection reset by peer 2020-11-16 21:09:09.872 CET [16657] LOG: could not accept SSL connection: Undefined error: 0 2020-11-16 21:09:09.894 CET [16667] LOG: could not accept SSL connection: Connection reset by peer To easily reproduce the issue: - Create a single node Citus - Add the coordinator to the metadata - Create a distributed table with shards on the coordinator - f.sql: select count(*) from test; - pgbench -f /tmp/f.sql postgres -T 12 -c 40 -P 1 or pgbench -f /tmp/f.sql postgres -T 12 -c 40 -P 1 -Cpull/4923/head
parent
61977a3c09
commit
926069a859
|
@ -479,6 +479,7 @@ bool EnableBinaryProtocol = false;
|
||||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||||
int ExecutorSlowStartInterval = 10;
|
int ExecutorSlowStartInterval = 10;
|
||||||
bool EnableCostBasedConnectionEstablishment = true;
|
bool EnableCostBasedConnectionEstablishment = true;
|
||||||
|
bool PreventIncompleteConnectionEstablishment = true;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -663,6 +664,7 @@ static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementEx
|
||||||
static bool SendNextQuery(TaskPlacementExecution *placementExecution,
|
static bool SendNextQuery(TaskPlacementExecution *placementExecution,
|
||||||
WorkerSession *session);
|
WorkerSession *session);
|
||||||
static void ConnectionStateMachine(WorkerSession *session);
|
static void ConnectionStateMachine(WorkerSession *session);
|
||||||
|
static bool HasUnfinishedTaskForSession(WorkerSession *session);
|
||||||
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
static void HandleMultiConnectionSuccess(WorkerSession *session);
|
||||||
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
|
||||||
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
|
||||||
|
@ -688,6 +690,7 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
static int GetEventSetSize(List *sessionList);
|
static int GetEventSetSize(List *sessionList);
|
||||||
|
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
||||||
static int RebuildWaitEventSet(DistributedExecution *execution);
|
static int RebuildWaitEventSet(DistributedExecution *execution);
|
||||||
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||||
eventCount, bool *cancellationReceived);
|
eventCount, bool *cancellationReceived);
|
||||||
|
@ -2288,7 +2291,26 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
/* always (re)build the wait event set the first time */
|
/* always (re)build the wait event set the first time */
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
|
/*
|
||||||
|
* Iterate until all the tasks are finished. Once all the tasks
|
||||||
|
* are finished, ensure that that all the connection initializations
|
||||||
|
* are also finished. Otherwise, those connections are terminated
|
||||||
|
* abruptly before they are established (or failed). Instead, we let
|
||||||
|
* the ConnectionStateMachine() to properly handle them.
|
||||||
|
*
|
||||||
|
* Note that we could have the connections that are not established
|
||||||
|
* as a side effect of slow-start algorithm. At the time the algorithm
|
||||||
|
* decides to establish new connections, the execution might have tasks
|
||||||
|
* to finish. But, the execution might finish before the new connections
|
||||||
|
* are established.
|
||||||
|
*
|
||||||
|
* Note that the rules explained above could be overriden by any
|
||||||
|
* cancellation to the query. In that case, we terminate the execution
|
||||||
|
* irrespective of the current status of the tasks or the connections.
|
||||||
|
*/
|
||||||
|
while (!cancellationReceived &&
|
||||||
|
(execution->unfinishedTaskCount > 0 ||
|
||||||
|
HasIncompleteConnectionEstablishment(execution)))
|
||||||
{
|
{
|
||||||
WorkerPool *workerPool = NULL;
|
WorkerPool *workerPool = NULL;
|
||||||
foreach_ptr(workerPool, execution->workerList)
|
foreach_ptr(workerPool, execution->workerList)
|
||||||
|
@ -2370,6 +2392,33 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasIncompleteConnectionEstablishment returns true if any of the connections
|
||||||
|
* that has been initiated by the executor is in initilization stage.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasIncompleteConnectionEstablishment(DistributedExecution *execution)
|
||||||
|
{
|
||||||
|
if (!PreventIncompleteConnectionEstablishment)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerSession *session = NULL;
|
||||||
|
foreach_ptr(session, execution->sessionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = session->connection;
|
||||||
|
if (connection->connectionState == MULTI_CONNECTION_INITIAL ||
|
||||||
|
connection->connectionState == MULTI_CONNECTION_CONNECTING)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RebuildWaitEventSet updates the waitEventSet for the distributed execution.
|
* RebuildWaitEventSet updates the waitEventSet for the distributed execution.
|
||||||
* This happens when the connection set for the distributed execution is changed,
|
* This happens when the connection set for the distributed execution is changed,
|
||||||
|
@ -3241,8 +3290,32 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
|
|
||||||
case MULTI_CONNECTION_CONNECTED:
|
case MULTI_CONNECTION_CONNECTED:
|
||||||
{
|
{
|
||||||
/* connection is ready, run the transaction state machine */
|
if (HasUnfinishedTaskForSession(session))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Connection is ready, and we have unfinished tasks.
|
||||||
|
* So, run the transaction state machine.
|
||||||
|
*/
|
||||||
TransactionStateMachine(session);
|
TransactionStateMachine(session);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Connection is ready, but we don't have any unfinished
|
||||||
|
* tasks that this session can execute.
|
||||||
|
*
|
||||||
|
* Note that we can be in a situation where the executor
|
||||||
|
* decides to establish a connection, but not need to
|
||||||
|
* use it at the time the connection is established. This could
|
||||||
|
* happen when the earlier connections manages to finish all the
|
||||||
|
* tasks after this connection
|
||||||
|
*
|
||||||
|
* As no tasks are ready to be executed at the moment, we
|
||||||
|
* mark the socket readable to get any notices if exists.
|
||||||
|
*/
|
||||||
|
UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3357,6 +3430,41 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasUnfinishedTaskForSession gets a session and returns true if there
|
||||||
|
* are any tasks that this session can execute.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
HasUnfinishedTaskForSession(WorkerSession *session)
|
||||||
|
{
|
||||||
|
if (session->currentTask != NULL)
|
||||||
|
{
|
||||||
|
/* the session is executing a command right now */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
dlist_head *sessionReadyTaskQueue = &(session->readyTaskQueue);
|
||||||
|
if (!dlist_is_empty(sessionReadyTaskQueue))
|
||||||
|
{
|
||||||
|
/* session has an assigned task, which is ready for execution */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerPool *workerPool = session->workerPool;
|
||||||
|
dlist_head *poolReadyTaskQueue = &(workerPool->readyTaskQueue);
|
||||||
|
if (!dlist_is_empty(poolReadyTaskQueue))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Pool has unassigned tasks that can be executed
|
||||||
|
* by the input session.
|
||||||
|
*/
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HandleMultiConnectionSuccess logs the established connection and updates
|
* HandleMultiConnectionSuccess logs the established connection and updates
|
||||||
* connection's state.
|
* connection's state.
|
||||||
|
|
|
@ -1387,6 +1387,21 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_UNIT_KB | GUC_STANDARD,
|
GUC_UNIT_KB | GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.prevent_incomplete_connection_establishment",
|
||||||
|
gettext_noop("When enabled, the executor waits until all the connections "
|
||||||
|
"are successfully established."),
|
||||||
|
gettext_noop("Under some load, the executor may decide to establish some "
|
||||||
|
"extra connections to further parallelize the execution. However,"
|
||||||
|
"before the connection establishment is done, the execution might "
|
||||||
|
"have already finished. When this GUC is set to true, the execution "
|
||||||
|
"waits for such connections to be established."),
|
||||||
|
&PreventIncompleteConnectionEstablishment,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
DefineCustomEnumVariable(
|
||||||
"citus.propagate_set_commands",
|
"citus.propagate_set_commands",
|
||||||
gettext_noop("Sets which SET commands are propagated to workers."),
|
gettext_noop("Sets which SET commands are propagated to workers."),
|
||||||
|
|
|
@ -12,6 +12,7 @@ extern bool EnableBinaryProtocol;
|
||||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||||
extern int ExecutorSlowStartInterval;
|
extern int ExecutorSlowStartInterval;
|
||||||
extern bool EnableCostBasedConnectionEstablishment;
|
extern bool EnableCostBasedConnectionEstablishment;
|
||||||
|
extern bool PreventIncompleteConnectionEstablishment;
|
||||||
|
|
||||||
extern bool ShouldRunTasksSequentially(List *taskList);
|
extern bool ShouldRunTasksSequentially(List *taskList);
|
||||||
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
|
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
|
||||||
|
|
Loading…
Reference in New Issue