From feb5534c659cb9e87f9fe72b4802718358776f67 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Sat, 19 Nov 2022 10:18:19 +0100 Subject: [PATCH] Do not create additional WaitEventSet for RemoteSocketClosed checks Before this commit, we created an additional WaitEventSet for checking whether the remote socket is closed per connection - only once at the start of the execution. However, for certain workloads, such as pgbench select-only workloads, the creation/deletion of the additional WaitEventSet adds ~7% CPU overhead, which is also reflected on the benchmark results. With this commit, we use the same WaitEventSet for the purposes of checking the remote socket at the start of the execution. We use "rebuildWaitEventSet" flag so that the executor can re-use the existing WaitEventSet. As a result, we see the following improvements on PG 15: main : 120051 tps, 0.532 ms latency avg. avoid_wes_rebuild: 127119 tps, 0.503 ms latency avg. And, on PG 14, as expected, there is no difference main : 129191 tps, 0.495 ms latency avg. avoid_wes_rebuild: 129480 tps, 0.494 ms latency avg. But, note that PG 15 is slightly (~1.5%) slower than PG 14. That is probably the overhead of checking the remote socket. --- .../distributed/executor/adaptive_executor.c | 243 ++++++++++-------- .../regress/expected/detect_conn_close.out | 4 - .../regress/expected/multi_poolinfo_usage.out | 1 + .../regress/expected/node_conninfo_reload.out | 6 + .../expected/single_node_enterprise.out | 1 + src/test/regress/sql/multi_poolinfo_usage.sql | 1 + src/test/regress/sql/node_conninfo_reload.sql | 6 + .../regress/sql/single_node_enterprise.sql | 1 + 8 files changed, 158 insertions(+), 105 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 80fba5e30..8369878b7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -670,7 +670,6 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); -static bool RemoteSocketClosedForNewSession(WorkerSession *session); static void ManageWorkerPool(WorkerPool *workerPool); static bool ShouldWaitForSlowStart(WorkerPool *workerPool); static int CalculateNewConnectionCount(WorkerPool *workerPool); @@ -687,6 +686,7 @@ static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void FreeExecutionWaitEvents(DistributedExecution *execution); static void AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); @@ -727,11 +727,13 @@ static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static void RebuildWaitEventSet(DistributedExecution *execution); +static void RebuildWaitEventSetForSessions(DistributedExecution *execution); +static void AddLatchWaitEventToExecution(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 - -static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); +static void RemoteSocketClosedForAnySession(DistributedExecution *execution); +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); #endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); @@ -2439,21 +2441,15 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) session->commandsSent = 0; session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* always detect closed sockets */ + UpdateConnectionWaitFlags(session, WL_SOCKET_CLOSED); +#endif + dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); - /* - * Before using this connection in the distributed execution, we check - * whether the remote connection is closed/lost. This is common - * when we have a cached connection and remote server restarted - * (due to failover or restart etc.). We do this because we can - * retry connection a single time. - */ - if (RemoteSocketClosedForNewSession(session)) - { - connection->connectionState = MULTI_CONNECTION_LOST; - } - if (connection->connectionState == MULTI_CONNECTION_CONNECTED) { /* keep track of how many connections are ready */ @@ -2493,64 +2489,26 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) * the events, even ignores cancellation events. Future callers of this * function should consider its limitations. */ -static bool -RemoteSocketClosedForNewSession(WorkerSession *session) -{ - bool socketClosed = false; - #if PG_VERSION_NUM >= PG_VERSION_15 - +static void +RemoteSocketClosedForAnySession(DistributedExecution *execution) +{ if (!WaitEventSetCanReportClosed()) { /* we cannot detect for this OS */ - return socketClosed; + return; } - MultiConnection *connection = session->connection; long timeout = 0;/* don't wait */ - int eventSetSize = 2; - WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent)); - /* - * Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check - * for cancellations. Everything else are going to be checked soon in the - * main event processing. At this point, our only goal is to understand - * whether the remote socket is closed or not. - */ - int originalWaitFlags = connection->waitFlags; - connection->waitFlags = WL_SOCKET_CLOSED; - WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, + execution->eventSetSize, WAIT_EVENT_CLIENT_READ); + ProcessWaitEventsForSocketClosed(execution->events, eventCount); +} - AddSessionToWaitEventSet(session, waitEventSet); - - /* always good to wait for postmaster death */ - CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, - NULL); - - int eventCount = WaitEventSetWait(waitEventSet, timeout, events, - eventSetSize, WAIT_EVENT_CLIENT_READ); - socketClosed = - ProcessWaitEventsForSocketClosed(events, eventCount); - - /* we can at most receive a single event, which is WL_SOCKET_CLOSED */ - Assert(eventCount <= 1); - - FreeWaitEventSet(waitEventSet); - pfree(events); - - /* - * We only searched for WL_SOCKET_CLOSED, and we processed the - * event already. Now, set back to the original flags. - */ - UpdateConnectionWaitFlags(session, originalWaitFlags); - session->latestUnconsumedWaitEvents = 0; #endif - return socketClosed; -} - /* * ShouldRunTasksSequentially returns true if each of the individual tasks @@ -2727,17 +2685,7 @@ RunDistributedExecution(DistributedExecution *execution) &cancellationReceived); } - if (execution->events != NULL) - { - pfree(execution->events); - execution->events = NULL; - } - - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + FreeExecutionWaitEvents(execution); CleanUpSessions(execution); } @@ -2749,11 +2697,7 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + FreeExecutionWaitEvents(execution); PG_RE_THROW(); } @@ -2841,26 +2785,46 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution) static void RebuildWaitEventSet(DistributedExecution *execution) { - if (execution->events != NULL) - { - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(execution->events); - execution->events = NULL; - } + RebuildWaitEventSetForSessions(execution); - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + AddLatchWaitEventToExecution(execution); +} + + +/* + * AddLatchWaitEventToExecution is a helper function that adds the latch + * wait event to the execution->waitEventSet. Note that this function assumes + * that execution->waitEventSet has already allocated enough slot for latch + * event. + */ +static void +AddLatchWaitEventToExecution(DistributedExecution *execution) +{ + CitusAddWaitEventSetToSet(execution->waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); +} + + +/* + * RebuildWaitEventSetForSessions re-creates the waitEventSet for the + * sessions involved in the distributed execution. + * + * Most of the time you need RebuildWaitEventSet() which also includes + * adds the Latch wait event to the set. + */ +static void +RebuildWaitEventSetForSessions(DistributedExecution *execution) +{ + FreeExecutionWaitEvents(execution); execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->eventSetSize = GetEventSetSize(execution->sessionList); execution->events = palloc0(execution->eventSetSize * sizeof(WaitEvent)); + CitusAddWaitEventSetToSet(execution->waitEventSet, WL_POSTMASTER_DEATH, + PGINVALID_SOCKET, NULL, NULL); + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; } @@ -2922,7 +2886,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC * If WL_SOCKET_CLOSED is found, the function sets the underlying connection's * state as MULTI_CONNECTION_LOST. */ -static bool +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) { int eventIndex = 0; @@ -2942,11 +2906,10 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) { - return true; + /* let the ConnectionStateMachine handle the rest */ + session->connection->connectionState = MULTI_CONNECTION_LOST; } } - - return false; } @@ -3030,7 +2993,6 @@ ManageWorkerPool(WorkerPool *workerPool) } INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->rebuildWaitEventSet = true; } @@ -3353,6 +3315,7 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, workerPool->nodeName, workerPool->nodePort))); + List *newSessionsList = NIL; for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { /* experimental: just to see the perf benefits of caching connections */ @@ -3411,7 +3374,69 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, /* create a session for the connection */ WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); + newSessionsList = lappend(newSessionsList, session); + } + if (list_length(newSessionsList) == 0) + { + /* nothing to do as no new connections happened */ + return; + } + + DistributedExecution *execution = workerPool->distributedExecution; + + + /* + * Although not ideal, there is a slight difference in the implementations + * of PG15+ and others. + * + * Recreating the WaitEventSet even once is prohibitively expensive (almost + * ~7% overhead for select-only pgbench). For all versions, the aim is to + * be able to create the WaitEventSet only once after any new connections + * are added to the execution. That is the main reason behind the implementation + * differences. + * + * For pre-PG15 versions, we leave the waitEventSet recreation to the main + * execution loop. For PG15+, we do it right here. + * + * We require this difference because for PG15+, there is a new type of + * WaitEvent (WL_SOCKET_CLOSED). We can provide this new event at this point, + * and check RemoteSocketClosedForAnySession(). For earlier versions, we have + * to defer the rebuildWaitEventSet as there is no other event to waitFor + * at this point. We could have forced to re-build, but that would mean we try to + * create waitEventSet without any actual events. That has some other implications + * such that we have to avoid certain optimizations of WaitEventSet creation. + * + * Instead, we prefer this slight difference, which in effect has almost no + * difference, but doing things in different points in time. + */ +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* we added new connections, rebuild the waitEventSet */ + RebuildWaitEventSetForSessions(execution); + + /* + * If there are any closed sockets, mark connection lost such that + * we can re-connect. + */ + RemoteSocketClosedForAnySession(execution); + + /* + * For RemoteSocketClosedForAnySession() purposes, we explicitly skip + * the latch because we want to handle all cancellations to be caught + * on the main execution loop, not here. We mostly skip cancellations + * on RemoteSocketClosedForAnySession() for simplicity. Handling + * cancellations on the main execution loop much easier to break out + * of the execution. + */ + AddLatchWaitEventToExecution(execution); +#else + execution->rebuildWaitEventSet = true; +#endif + + WorkerSession *session = NULL; + foreach_ptr(session, newSessionsList) + { /* immediately run the state machine to handle potential failure */ ConnectionStateMachine(session); } @@ -5538,15 +5563,31 @@ BuildWaitEventSet(List *sessionList) AddSessionToWaitEventSet(session, waitEventSet); } - CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, - NULL); - CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, - NULL); - return waitEventSet; } +/* + * FreeExecutionWaitEvents is a helper function that gets + * a DistributedExecution and frees events and waitEventSet. + */ +static void +FreeExecutionWaitEvents(DistributedExecution *execution) +{ + if (execution->events != NULL) + { + pfree(execution->events); + execution->events = NULL; + } + + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } +} + + /* * AddSessionToWaitEventSet is a helper function which adds the session to * the waitEventSet. The function does certain checks before adding the session diff --git a/src/test/regress/expected/detect_conn_close.out b/src/test/regress/expected/detect_conn_close.out index 1bc264f65..ad758f32e 100644 --- a/src/test/regress/expected/detect_conn_close.out +++ b/src/test/regress/expected/detect_conn_close.out @@ -129,10 +129,6 @@ BEGIN; SELECT count(*) FROM socket_test_table; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -WARNING: terminating connection due to administrator command -CONTEXT: while executing command on localhost:xxxxx -WARNING: terminating connection due to administrator command -CONTEXT: while executing command on localhost:xxxxx ROLLBACK; -- repartition joins also can recover SET citus.enable_repartition_joins TO on; diff --git a/src/test/regress/expected/multi_poolinfo_usage.out b/src/test/regress/expected/multi_poolinfo_usage.out index 05ebdc7cd..c5e97ec95 100644 --- a/src/test/regress/expected/multi_poolinfo_usage.out +++ b/src/test/regress/expected/multi_poolinfo_usage.out @@ -43,6 +43,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'), DO $$ BEGIN BEGIN + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM lotsa_connections; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/expected/node_conninfo_reload.out b/src/test/regress/expected/node_conninfo_reload.out index d7b853226..d2e33d950 100644 --- a/src/test/regress/expected/node_conninfo_reload.out +++ b/src/test/regress/expected/node_conninfo_reload.out @@ -179,8 +179,10 @@ select count(*) from test where a = 0; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test where a = 0; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -297,8 +299,10 @@ select count(*) from test; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -353,8 +357,10 @@ select count(*)/0 from test; ERROR: division by zero ROLLBACK; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); diff --git a/src/test/regress/expected/single_node_enterprise.out b/src/test/regress/expected/single_node_enterprise.out index e6155ba4e..6f828830e 100644 --- a/src/test/regress/expected/single_node_enterprise.out +++ b/src/test/regress/expected/single_node_enterprise.out @@ -136,6 +136,7 @@ BEGIN BEGIN -- we want to force remote execution SET LOCAL citus.enable_local_execution TO false; + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM test; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/sql/multi_poolinfo_usage.sql b/src/test/regress/sql/multi_poolinfo_usage.sql index 1261c9965..da039cfca 100644 --- a/src/test/regress/sql/multi_poolinfo_usage.sql +++ b/src/test/regress/sql/multi_poolinfo_usage.sql @@ -29,6 +29,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'), DO $$ BEGIN BEGIN + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM lotsa_connections; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index 3790f2c98..42ba8c9b1 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -68,7 +68,9 @@ show citus.node_conninfo; select count(*) from test where a = 0; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test where a = 0; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -115,7 +117,9 @@ select count(*) from test; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; @@ -138,7 +142,9 @@ select count(*)/0 from test; ROLLBACK; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; diff --git a/src/test/regress/sql/single_node_enterprise.sql b/src/test/regress/sql/single_node_enterprise.sql index 76615b852..249a13cd2 100644 --- a/src/test/regress/sql/single_node_enterprise.sql +++ b/src/test/regress/sql/single_node_enterprise.sql @@ -116,6 +116,7 @@ BEGIN BEGIN -- we want to force remote execution SET LOCAL citus.enable_local_execution TO false; + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM test; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN