diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 80fba5e30..8369878b7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -670,7 +670,6 @@ static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int nodePort); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection); -static bool RemoteSocketClosedForNewSession(WorkerSession *session); static void ManageWorkerPool(WorkerPool *workerPool); static bool ShouldWaitForSlowStart(WorkerPool *workerPool); static int CalculateNewConnectionCount(WorkerPool *workerPool); @@ -687,6 +686,7 @@ static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void FreeExecutionWaitEvents(DistributedExecution *execution); static void AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); @@ -727,11 +727,13 @@ static bool ProcessSessionsWithFailedWaitEventSetOperations( DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static void RebuildWaitEventSet(DistributedExecution *execution); +static void RebuildWaitEventSetForSessions(DistributedExecution *execution); +static void AddLatchWaitEventToExecution(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventCount, bool *cancellationReceived); #if PG_VERSION_NUM >= PG_VERSION_15 - -static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); +static void RemoteSocketClosedForAnySession(DistributedExecution *execution); +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); #endif static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); @@ -2439,21 +2441,15 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) session->commandsSent = 0; session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* always detect closed sockets */ + UpdateConnectionWaitFlags(session, WL_SOCKET_CLOSED); +#endif + dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); - /* - * Before using this connection in the distributed execution, we check - * whether the remote connection is closed/lost. This is common - * when we have a cached connection and remote server restarted - * (due to failover or restart etc.). We do this because we can - * retry connection a single time. - */ - if (RemoteSocketClosedForNewSession(session)) - { - connection->connectionState = MULTI_CONNECTION_LOST; - } - if (connection->connectionState == MULTI_CONNECTION_CONNECTED) { /* keep track of how many connections are ready */ @@ -2493,64 +2489,26 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) * the events, even ignores cancellation events. Future callers of this * function should consider its limitations. */ -static bool -RemoteSocketClosedForNewSession(WorkerSession *session) -{ - bool socketClosed = false; - #if PG_VERSION_NUM >= PG_VERSION_15 - +static void +RemoteSocketClosedForAnySession(DistributedExecution *execution) +{ if (!WaitEventSetCanReportClosed()) { /* we cannot detect for this OS */ - return socketClosed; + return; } - MultiConnection *connection = session->connection; long timeout = 0;/* don't wait */ - int eventSetSize = 2; - WaitEvent *events = palloc0(eventSetSize * sizeof(WaitEvent)); - /* - * Only wait for WL_SOCKET_CLOSED and postmaster death, do not even check - * for cancellations. Everything else are going to be checked soon in the - * main event processing. At this point, our only goal is to understand - * whether the remote socket is closed or not. - */ - int originalWaitFlags = connection->waitFlags; - connection->waitFlags = WL_SOCKET_CLOSED; - WaitEventSet *waitEventSet = - CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, + execution->eventSetSize, WAIT_EVENT_CLIENT_READ); + ProcessWaitEventsForSocketClosed(execution->events, eventCount); +} - AddSessionToWaitEventSet(session, waitEventSet); - - /* always good to wait for postmaster death */ - CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, - NULL); - - int eventCount = WaitEventSetWait(waitEventSet, timeout, events, - eventSetSize, WAIT_EVENT_CLIENT_READ); - socketClosed = - ProcessWaitEventsForSocketClosed(events, eventCount); - - /* we can at most receive a single event, which is WL_SOCKET_CLOSED */ - Assert(eventCount <= 1); - - FreeWaitEventSet(waitEventSet); - pfree(events); - - /* - * We only searched for WL_SOCKET_CLOSED, and we processed the - * event already. Now, set back to the original flags. - */ - UpdateConnectionWaitFlags(session, originalWaitFlags); - session->latestUnconsumedWaitEvents = 0; #endif - return socketClosed; -} - /* * ShouldRunTasksSequentially returns true if each of the individual tasks @@ -2727,17 +2685,7 @@ RunDistributedExecution(DistributedExecution *execution) &cancellationReceived); } - if (execution->events != NULL) - { - pfree(execution->events); - execution->events = NULL; - } - - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + FreeExecutionWaitEvents(execution); CleanUpSessions(execution); } @@ -2749,11 +2697,7 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + FreeExecutionWaitEvents(execution); PG_RE_THROW(); } @@ -2841,26 +2785,46 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution) static void RebuildWaitEventSet(DistributedExecution *execution) { - if (execution->events != NULL) - { - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(execution->events); - execution->events = NULL; - } + RebuildWaitEventSetForSessions(execution); - if (execution->waitEventSet != NULL) - { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; - } + AddLatchWaitEventToExecution(execution); +} + + +/* + * AddLatchWaitEventToExecution is a helper function that adds the latch + * wait event to the execution->waitEventSet. Note that this function assumes + * that execution->waitEventSet has already allocated enough slot for latch + * event. + */ +static void +AddLatchWaitEventToExecution(DistributedExecution *execution) +{ + CitusAddWaitEventSetToSet(execution->waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); +} + + +/* + * RebuildWaitEventSetForSessions re-creates the waitEventSet for the + * sessions involved in the distributed execution. + * + * Most of the time you need RebuildWaitEventSet() which also includes + * adds the Latch wait event to the set. + */ +static void +RebuildWaitEventSetForSessions(DistributedExecution *execution) +{ + FreeExecutionWaitEvents(execution); execution->waitEventSet = BuildWaitEventSet(execution->sessionList); + execution->eventSetSize = GetEventSetSize(execution->sessionList); execution->events = palloc0(execution->eventSetSize * sizeof(WaitEvent)); + CitusAddWaitEventSetToSet(execution->waitEventSet, WL_POSTMASTER_DEATH, + PGINVALID_SOCKET, NULL, NULL); + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; } @@ -2922,7 +2886,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC * If WL_SOCKET_CLOSED is found, the function sets the underlying connection's * state as MULTI_CONNECTION_LOST. */ -static bool +static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) { int eventIndex = 0; @@ -2942,11 +2906,10 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) { - return true; + /* let the ConnectionStateMachine handle the rest */ + session->connection->connectionState = MULTI_CONNECTION_LOST; } } - - return false; } @@ -3030,7 +2993,6 @@ ManageWorkerPool(WorkerPool *workerPool) } INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->rebuildWaitEventSet = true; } @@ -3353,6 +3315,7 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, workerPool->nodeName, workerPool->nodePort))); + List *newSessionsList = NIL; for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) { /* experimental: just to see the perf benefits of caching connections */ @@ -3411,7 +3374,69 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, /* create a session for the connection */ WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); + newSessionsList = lappend(newSessionsList, session); + } + if (list_length(newSessionsList) == 0) + { + /* nothing to do as no new connections happened */ + return; + } + + DistributedExecution *execution = workerPool->distributedExecution; + + + /* + * Although not ideal, there is a slight difference in the implementations + * of PG15+ and others. + * + * Recreating the WaitEventSet even once is prohibitively expensive (almost + * ~7% overhead for select-only pgbench). For all versions, the aim is to + * be able to create the WaitEventSet only once after any new connections + * are added to the execution. That is the main reason behind the implementation + * differences. + * + * For pre-PG15 versions, we leave the waitEventSet recreation to the main + * execution loop. For PG15+, we do it right here. + * + * We require this difference because for PG15+, there is a new type of + * WaitEvent (WL_SOCKET_CLOSED). We can provide this new event at this point, + * and check RemoteSocketClosedForAnySession(). For earlier versions, we have + * to defer the rebuildWaitEventSet as there is no other event to waitFor + * at this point. We could have forced to re-build, but that would mean we try to + * create waitEventSet without any actual events. That has some other implications + * such that we have to avoid certain optimizations of WaitEventSet creation. + * + * Instead, we prefer this slight difference, which in effect has almost no + * difference, but doing things in different points in time. + */ +#if PG_VERSION_NUM >= PG_VERSION_15 + + /* we added new connections, rebuild the waitEventSet */ + RebuildWaitEventSetForSessions(execution); + + /* + * If there are any closed sockets, mark connection lost such that + * we can re-connect. + */ + RemoteSocketClosedForAnySession(execution); + + /* + * For RemoteSocketClosedForAnySession() purposes, we explicitly skip + * the latch because we want to handle all cancellations to be caught + * on the main execution loop, not here. We mostly skip cancellations + * on RemoteSocketClosedForAnySession() for simplicity. Handling + * cancellations on the main execution loop much easier to break out + * of the execution. + */ + AddLatchWaitEventToExecution(execution); +#else + execution->rebuildWaitEventSet = true; +#endif + + WorkerSession *session = NULL; + foreach_ptr(session, newSessionsList) + { /* immediately run the state machine to handle potential failure */ ConnectionStateMachine(session); } @@ -5538,15 +5563,31 @@ BuildWaitEventSet(List *sessionList) AddSessionToWaitEventSet(session, waitEventSet); } - CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, - NULL); - CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, - NULL); - return waitEventSet; } +/* + * FreeExecutionWaitEvents is a helper function that gets + * a DistributedExecution and frees events and waitEventSet. + */ +static void +FreeExecutionWaitEvents(DistributedExecution *execution) +{ + if (execution->events != NULL) + { + pfree(execution->events); + execution->events = NULL; + } + + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } +} + + /* * AddSessionToWaitEventSet is a helper function which adds the session to * the waitEventSet. The function does certain checks before adding the session diff --git a/src/test/regress/expected/detect_conn_close.out b/src/test/regress/expected/detect_conn_close.out index 1bc264f65..ad758f32e 100644 --- a/src/test/regress/expected/detect_conn_close.out +++ b/src/test/regress/expected/detect_conn_close.out @@ -129,10 +129,6 @@ BEGIN; SELECT count(*) FROM socket_test_table; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -WARNING: terminating connection due to administrator command -CONTEXT: while executing command on localhost:xxxxx -WARNING: terminating connection due to administrator command -CONTEXT: while executing command on localhost:xxxxx ROLLBACK; -- repartition joins also can recover SET citus.enable_repartition_joins TO on; diff --git a/src/test/regress/expected/multi_poolinfo_usage.out b/src/test/regress/expected/multi_poolinfo_usage.out index 05ebdc7cd..c5e97ec95 100644 --- a/src/test/regress/expected/multi_poolinfo_usage.out +++ b/src/test/regress/expected/multi_poolinfo_usage.out @@ -43,6 +43,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'), DO $$ BEGIN BEGIN + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM lotsa_connections; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/expected/node_conninfo_reload.out b/src/test/regress/expected/node_conninfo_reload.out index d7b853226..d2e33d950 100644 --- a/src/test/regress/expected/node_conninfo_reload.out +++ b/src/test/regress/expected/node_conninfo_reload.out @@ -179,8 +179,10 @@ select count(*) from test where a = 0; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test where a = 0; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -297,8 +299,10 @@ select count(*) from test; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -353,8 +357,10 @@ select count(*)/0 from test; ERROR: division by zero ROLLBACK; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; ERROR: connection to the remote node localhost:xxxxx failed with the following error: invalid sslmode value: "doesnotexist" +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); diff --git a/src/test/regress/expected/single_node_enterprise.out b/src/test/regress/expected/single_node_enterprise.out index e6155ba4e..6f828830e 100644 --- a/src/test/regress/expected/single_node_enterprise.out +++ b/src/test/regress/expected/single_node_enterprise.out @@ -136,6 +136,7 @@ BEGIN BEGIN -- we want to force remote execution SET LOCAL citus.enable_local_execution TO false; + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM test; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/sql/multi_poolinfo_usage.sql b/src/test/regress/sql/multi_poolinfo_usage.sql index 1261c9965..da039cfca 100644 --- a/src/test/regress/sql/multi_poolinfo_usage.sql +++ b/src/test/regress/sql/multi_poolinfo_usage.sql @@ -29,6 +29,7 @@ INSERT INTO pg_dist_poolinfo VALUES (:worker_1_id, 'host=failhost'), DO $$ BEGIN BEGIN + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM lotsa_connections; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index 3790f2c98..42ba8c9b1 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -68,7 +68,9 @@ show citus.node_conninfo; select count(*) from test where a = 0; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test where a = 0; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; select pg_reload_conf(); @@ -115,7 +117,9 @@ select count(*) from test; COMMIT; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; @@ -138,7 +142,9 @@ select count(*)/0 from test; ROLLBACK; -- Should fail now, when transaction is finished +SET client_min_messages TO ERROR; select count(*) from test; +RESET client_min_messages; -- Reset it again ALTER SYSTEM RESET citus.node_conninfo; diff --git a/src/test/regress/sql/single_node_enterprise.sql b/src/test/regress/sql/single_node_enterprise.sql index 76615b852..249a13cd2 100644 --- a/src/test/regress/sql/single_node_enterprise.sql +++ b/src/test/regress/sql/single_node_enterprise.sql @@ -116,6 +116,7 @@ BEGIN BEGIN -- we want to force remote execution SET LOCAL citus.enable_local_execution TO false; + SET LOCAL client_min_messages TO ERROR; SELECT COUNT(*) FROM test; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'connection to the remote node%%' THEN