Do not create additional WaitEventSet for RemoteSocketClosed checks

Before this commit, we created an additional WaitEventSet for
checking whether the remote socket is closed per connection -
only once at the start of the execution.

However, for certain workloads, such as pgbench select-only
workloads, the creation/deletion of the additional WaitEventSet
adds ~7% CPU overhead, which is also reflected on the benchmark
results.

With this commit, we use the same WaitEventSet for the purposes
of checking the remote socket at the start of the execution.

We use "rebuildWaitEventSet" flag so that the executor can re-use
the existing WaitEventSet.

As a result, we see the following improvements on PG 15:

main			 : 120051 tps, 0.532 ms latency avg.
avoid_wes_rebuild: 127119 tps, 0.503 ms latency avg.

And, on PG 14, as expected, there is no difference

main			 : 129191 tps, 0.495 ms latency avg.
avoid_wes_rebuild: 129480 tps, 0.494 ms latency avg.

But, note that PG 15 is slightly (~1.5%) slower than PG 14.
That is probably the overhead of checking the remote socket.
pull/6505/head
Onder Kalaci 2022-11-19 10:18:19 +01:00
parent d52da55ac0
commit feb5534c65
8 changed files with 158 additions and 105 deletions

View File

@ -670,7 +670,6 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection);
static bool RemoteSocketClosedForNewSession(WorkerSession *session);
static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
static int CalculateNewConnectionCount(WorkerPool *workerPool);
@ -687,6 +686,7 @@ static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void FreeExecutionWaitEvents(DistributedExecution *execution);
static void AddSessionToWaitEventSet(WorkerSession *session,
WaitEventSet *waitEventSet);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
@ -727,11 +727,13 @@ static bool ProcessSessionsWithFailedWaitEventSetOperations(
DistributedExecution *execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static void RebuildWaitEventSet(DistributedExecution *execution);
static void RebuildWaitEventSetForSessions(DistributedExecution *execution);
static void AddLatchWaitEventToExecution(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15
static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
static void RemoteSocketClosedForAnySession(DistributedExecution *execution);
static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
#endif
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
@ -2439,21 +2441,15 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
session->commandsSent = 0;
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
#if PG_VERSION_NUM >= PG_VERSION_15
/* always detect closed sockets */
UpdateConnectionWaitFlags(session, WL_SOCKET_CLOSED);
#endif
dlist_init(&session->pendingTaskQueue);
dlist_init(&session->readyTaskQueue);
/*
* Before using this connection in the distributed execution, we check
* whether the remote connection is closed/lost. This is common
* when we have a cached connection and remote server restarted
* (due to failover or restart etc.). We do this because we can
* retry connection a single time.
*/
if (RemoteSocketClosedForNewSession(session))
{
connection->connectionState = MULTI_CONNECTION_LOST;
}
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
{
/* keep track of how many connections are ready */
@ -2493,64 +2489,26 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
* the events, even ignores cancellation events. Future callers of this
* function should consider its limitations.
*/
static bool
RemoteSocketClosedForNewSession(WorkerSession *session)
{
bool socketClosed = false;
#if PG_VERSION_NUM >= PG_VERSION_15
static void
RemoteSocketClosedForAnySession(DistributedExecution *execution)
{
if (!WaitEventSetCanReportClosed())
{
/* we cannot detect for this OS */
return socketClosed;
return;
}
MultiConnection *connection = session->connection;
long timeout = 0;/* don't wait */
int eventSetSize = 2;
WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent));
/*
* Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check
* for cancellations. Everything else are going to be checked soon in the
* main event processing. At this point, our only goal is to understand
* whether the remote socket is closed or not.
*/
int originalWaitFlags = connection->waitFlags;
connection->waitFlags = WL_SOCKET_CLOSED;
WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEventsForSocketClosed(execution->events, eventCount);
}
AddSessionToWaitEventSet(session, waitEventSet);
/* always good to wait for postmaster death */
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ);
socketClosed =
ProcessWaitEventsForSocketClosed(events, eventCount);
/* we can at most receive a single event, which is WL_SOCKET_CLOSED */
Assert(eventCount <= 1);
FreeWaitEventSet(waitEventSet);
pfree(events);
/*
* We only searched for WL_SOCKET_CLOSED, and we processed the
* event already. Now, set back to the original flags.
*/
UpdateConnectionWaitFlags(session, originalWaitFlags);
session->latestUnconsumedWaitEvents = 0;
#endif
return socketClosed;
}
/*
* ShouldRunTasksSequentially returns true if each of the individual tasks
@ -2727,17 +2685,7 @@ RunDistributedExecution(DistributedExecution *execution)
&cancellationReceived);
}
if (execution->events != NULL)
{
pfree(execution->events);
execution->events = NULL;
}
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
FreeExecutionWaitEvents(execution);
CleanUpSessions(execution);
}
@ -2749,11 +2697,7 @@ RunDistributedExecution(DistributedExecution *execution)
*/
UnclaimAllSessionConnections(execution->sessionList);
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
FreeExecutionWaitEvents(execution);
PG_RE_THROW();
}
@ -2841,26 +2785,46 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution)
static void
RebuildWaitEventSet(DistributedExecution *execution)
{
if (execution->events != NULL)
{
/*
* The execution might take a while, so explicitly free at this point
* because we don't need anymore.
*/
pfree(execution->events);
execution->events = NULL;
}
RebuildWaitEventSetForSessions(execution);
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
AddLatchWaitEventToExecution(execution);
}
/*
* AddLatchWaitEventToExecution is a helper function that adds the latch
* wait event to the execution->waitEventSet. Note that this function assumes
* that execution->waitEventSet has already allocated enough slot for latch
* event.
*/
static void
AddLatchWaitEventToExecution(DistributedExecution *execution)
{
CitusAddWaitEventSetToSet(execution->waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
}
/*
* RebuildWaitEventSetForSessions re-creates the waitEventSet for the
* sessions involved in the distributed execution.
*
* Most of the time you need RebuildWaitEventSet() which also includes
* adds the Latch wait event to the set.
*/
static void
RebuildWaitEventSetForSessions(DistributedExecution *execution)
{
FreeExecutionWaitEvents(execution);
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
execution->eventSetSize = GetEventSetSize(execution->sessionList);
execution->events = palloc0(execution->eventSetSize * sizeof(WaitEvent));
CitusAddWaitEventSetToSet(execution->waitEventSet, WL_POSTMASTER_DEATH,
PGINVALID_SOCKET, NULL, NULL);
execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false;
}
@ -2922,7 +2886,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
* If WL_SOCKET_CLOSED is found, the function sets the underlying connection's
* state as MULTI_CONNECTION_LOST.
*/
static bool
static void
ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
{
int eventIndex = 0;
@ -2942,11 +2906,10 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED)
{
return true;
/* let the ConnectionStateMachine handle the rest */
session->connection->connectionState = MULTI_CONNECTION_LOST;
}
}
return false;
}
@ -3030,7 +2993,6 @@ ManageWorkerPool(WorkerPool *workerPool)
}
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->rebuildWaitEventSet = true;
}
@ -3353,6 +3315,7 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
workerPool->nodeName, workerPool->nodePort)));
List *newSessionsList = NIL;
for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++)
{
/* experimental: just to see the perf benefits of caching connections */
@ -3411,7 +3374,69 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
/* create a session for the connection */
WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection);
newSessionsList = lappend(newSessionsList, session);
}
if (list_length(newSessionsList) == 0)
{
/* nothing to do as no new connections happened */
return;
}
DistributedExecution *execution = workerPool->distributedExecution;
/*
* Although not ideal, there is a slight difference in the implementations
* of PG15+ and others.
*
* Recreating the WaitEventSet even once is prohibitively expensive (almost
* ~7% overhead for select-only pgbench). For all versions, the aim is to
* be able to create the WaitEventSet only once after any new connections
* are added to the execution. That is the main reason behind the implementation
* differences.
*
* For pre-PG15 versions, we leave the waitEventSet recreation to the main
* execution loop. For PG15+, we do it right here.
*
* We require this difference because for PG15+, there is a new type of
* WaitEvent (WL_SOCKET_CLOSED). We can provide this new event at this point,
* and check RemoteSocketClosedForAnySession(). For earlier versions, we have
* to defer the rebuildWaitEventSet as there is no other event to waitFor
* at this point. We could have forced to re-build, but that would mean we try to
* create waitEventSet without any actual events. That has some other implications
* such that we have to avoid certain optimizations of WaitEventSet creation.
*
* Instead, we prefer this slight difference, which in effect has almost no
* difference, but doing things in different points in time.
*/
#if PG_VERSION_NUM >= PG_VERSION_15
/* we added new connections, rebuild the waitEventSet */
RebuildWaitEventSetForSessions(execution);
/*
* If there are any closed sockets, mark connection lost such that
* we can re-connect.
*/
RemoteSocketClosedForAnySession(execution);
/*
* For RemoteSocketClosedForAnySession() purposes, we explicitly skip
* the latch because we want to handle all cancellations to be caught
* on the main execution loop, not here. We mostly skip cancellations
* on RemoteSocketClosedForAnySession() for simplicity. Handling
* cancellations on the main execution loop much easier to break out
* of the execution.
*/
AddLatchWaitEventToExecution(execution);
#else
execution->rebuildWaitEventSet = true;
#endif
WorkerSession *session = NULL;
foreach_ptr(session, newSessionsList)
{
/* immediately run the state machine to handle potential failure */
ConnectionStateMachine(session);
}
@ -5538,15 +5563,31 @@ BuildWaitEventSet(List *sessionList)
AddSessionToWaitEventSet(session, waitEventSet);
}
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL);
return waitEventSet;
}
/*
* FreeExecutionWaitEvents is a helper function that gets
* a DistributedExecution and frees events and waitEventSet.
*/
static void
FreeExecutionWaitEvents(DistributedExecution *execution)
{
if (execution->events != NULL)
{
pfree(execution->events);
execution->events = NULL;
}
if (execution->waitEventSet != NULL)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = NULL;
}
}
/*
* AddSessionToWaitEventSet is a helper function which adds the session to
* the waitEventSet. The function does certain checks before adding the session

View File

@ -129,10 +129,6 @@ BEGIN;
SELECT count(*) FROM socket_test_table;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: terminating connection due to administrator command
CONTEXT: while executing command on localhost:xxxxx
WARNING: terminating connection due to administrator command
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
-- repartition joins also can recover
SET citus.enable_repartition_joins TO on;

View File

@ -43,6 +43,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'),
DO $$
BEGIN
BEGIN
SET LOCAL client_min_messages TO ERROR;
SELECT COUNT(*) FROM lotsa_connections;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'connection to the remote node%%' THEN

View File

@ -179,8 +179,10 @@ select count(*) from test where a = 0;
COMMIT;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test where a = 0;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
@ -297,8 +299,10 @@ select count(*) from test;
COMMIT;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
@ -353,8 +357,10 @@ select count(*)/0 from test;
ERROR: division by zero
ROLLBACK;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist"
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();

View File

@ -136,6 +136,7 @@ BEGIN
BEGIN
-- we want to force remote execution
SET LOCAL citus.enable_local_execution TO false;
SET LOCAL client_min_messages TO ERROR;
SELECT COUNT(*) FROM test;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'connection to the remote node%%' THEN

View File

@ -29,6 +29,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'),
DO $$
BEGIN
BEGIN
SET LOCAL client_min_messages TO ERROR;
SELECT COUNT(*) FROM lotsa_connections;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'connection to the remote node%%' THEN

View File

@ -68,7 +68,9 @@ show citus.node_conninfo;
select count(*) from test where a = 0;
COMMIT;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test where a = 0;
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
@ -115,7 +117,9 @@ select count(*) from test;
COMMIT;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test;
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;
@ -138,7 +142,9 @@ select count(*)/0 from test;
ROLLBACK;
-- Should fail now, when transaction is finished
SET client_min_messages TO ERROR;
select count(*) from test;
RESET client_min_messages;
-- Reset it again
ALTER SYSTEM RESET citus.node_conninfo;

View File

@ -116,6 +116,7 @@ BEGIN
BEGIN
-- we want to force remote execution
SET LOCAL citus.enable_local_execution TO false;
SET LOCAL client_min_messages TO ERROR;
SELECT COUNT(*) FROM test;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'connection to the remote node%%' THEN