pull/6259/head
Onder Kalaci 2022-08-30 18:51:05 +03:00
parent 17f3c51a07
commit eca9fd358d
4 changed files with 341 additions and 69 deletions

View File

@ -58,9 +58,13 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn, static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key); ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags); static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
static List * EnsureAndGetHealthyConnections(dlist_head *connections, bool static List * EnsureTxStateAndGetHealthyConnections(dlist_head *connections,
checkTransactionState); bool checkTransactionState);
static bool RemoteSocketClosed(MultiConnection *connection); static bool RemoteSocketClosed(MultiConnection *connection);
#if PG_VERSION_NUM >= PG_VERSION_15
static bool ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount,
bool *socketClosed);
#endif
static void ErrorIfMultipleMetadataConnectionExists(List *connections); static void ErrorIfMultipleMetadataConnectionExists(List *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
@ -338,7 +342,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
{ {
bool checkTransactionState = !(flags & OUTSIDE_TRANSACTION); bool checkTransactionState = !(flags & OUTSIDE_TRANSACTION);
List *healthyConnections = List *healthyConnections =
EnsureAndGetHealthyConnections(entry->connections, checkTransactionState); EnsureTxStateAndGetHealthyConnections(entry->connections,
checkTransactionState);
/* check connection cache for a connection that's not already in use */ /* check connection cache for a connection that's not already in use */
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
@ -530,18 +535,19 @@ FindAvailableConnection(List *connections, uint32 flags)
/* /*
* EnsureAndGetHealthyConnections is a helper function that goes over the * EnsureTxStateAndGetHealthyConnections is a helper function that goes
* input connections, and: * over the input connections, and:
* - Errors for the connections that are not healthy but still part of a * - Errors for the connections whose remote transaction is broken
* remote transaction when checkTransactionState=true * when checkTransactionState=true
* - Skips connections that are not healthy, marks them to be closed * - Skips connections that are not healthy, marks those to be closed
* at the end of the transaction such that they do not linger around * at the end of the transaction such that they do not linger around
* *
* And, finally returns the list (unless already errored out) of connections * And, finally returns the list (unless already errored out) of connections
* that can be reused. * that can are healthy and can be reused.
*/ */
static List * static List *
EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionState) EnsureTxStateAndGetHealthyConnections(dlist_head *connections,
bool checkTransactionState)
{ {
List *healthyConnections = NIL; List *healthyConnections = NIL;
@ -550,16 +556,6 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta
{ {
MultiConnection *connection = MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur); dlist_container(MultiConnection, connectionNode, iter.cur);
bool healthyConnection = true;
if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
connection->connectionState != MULTI_CONNECTION_CONNECTED ||
RemoteSocketClosed(connection))
{
/* connection is not healthy */
healthyConnection = false;
}
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;
bool inRemoteTrasaction = bool inRemoteTrasaction =
@ -574,24 +570,23 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta
{ {
transaction->transactionFailed = true; transaction->transactionFailed = true;
} }
if (transaction->transactionFailed &&
(transaction->transactionCritical ||
!dlist_is_empty(&connection->referencedPlacements)))
{
/*
* If a connection is executing a critical transaction or accessed any
* placements, we should not continue the execution.
*/
ReportConnectionError(connection, ERROR);
}
} }
if (healthyConnection) if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 ||
{ PQstatus(connection->pgConn) == CONNECTION_BAD ||
healthyConnections = lappend(healthyConnections, connection); connection->connectionState == MULTI_CONNECTION_FAILED ||
} RemoteSocketClosed(connection))
else if (checkTransactionState && inRemoteTrasaction &&
transaction->transactionFailed &&
(transaction->transactionCritical ||
!dlist_is_empty(&connection->referencedPlacements)))
{
/*
* If a connection is executing a critical transaction or accessed any
* placements, we should not continue the execution.
*/
ReportConnectionError(connection, ERROR);
}
else
{ {
/* /*
* If a connection is not usable and we do not need to throw an error, * If a connection is not usable and we do not need to throw an error,
@ -599,6 +594,11 @@ EnsureAndGetHealthyConnections(dlist_head *connections, bool checkTransactionSta
*/ */
connection->forceCloseAtTransactionEnd = true; connection->forceCloseAtTransactionEnd = true;
} }
else
{
/* connection is healthy */
healthyConnections = lappend(healthyConnections, connection);
}
} }
return healthyConnections; return healthyConnections;
@ -623,9 +623,7 @@ RemoteSocketClosed(MultiConnection *connection)
return false; return false;
} }
int eventCount = 0;
WaitEvent events[3]; WaitEvent events[3];
WaitEventSet *waitEventSet = WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, 3); CreateWaitEventSet(CurrentMemoryContext, 3);
int sock = PQsocket(connection->pgConn); int sock = PQsocket(connection->pgConn);
@ -656,13 +654,45 @@ RemoteSocketClosed(MultiConnection *connection)
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
NULL); NULL);
long timeout = 0; /* do not wait at all */ bool retry = false;
int eventIndex = 0; do {
long timeout = 0; /* do not wait at all */
int eventCount =
WaitEventSetWait(waitEventSet, timeout, events,
3, WAIT_EVENT_CLIENT_READ);
retry: retry = ProcessWaitEventsForSocketClose(events, eventCount, &socketClosed);
eventCount = if (socketClosed)
WaitEventSetWait(waitEventSet, timeout, events, {
3, WAIT_EVENT_CLIENT_READ); /* socket is closed, no further information needed */
break;
}
} while (retry);
FreeWaitEventSet(waitEventSet);
#endif
return socketClosed;
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* ProcessWaitEventsForSocketClose goes over the events and mainly looks for
* WL_SOCKET_CLOSED event. When found, socketClosed is set to true.
*
* The function also returns true if we are required to re-check the events. We
* might need to re-check if latch is set. A latch event might be preventing
* other events from being reported. Poll and check again.
*/
static bool
ProcessWaitEventsForSocketClose(WaitEvent *events, int eventCount, bool *socketClosed)
{
*socketClosed = false;
int eventIndex = 0;
for (; eventIndex < eventCount; eventIndex++) for (; eventIndex < eventCount; eventIndex++)
{ {
WaitEvent *event = &events[eventIndex]; WaitEvent *event = &events[eventIndex];
@ -674,33 +704,27 @@ retry:
if (event->events & WL_SOCKET_CLOSED) if (event->events & WL_SOCKET_CLOSED)
{ {
socketClosed = true; *socketClosed = true;
/* we found what we are searching for */ /* we found what we are searching for */
break; return false;
} }
if (event->events & WL_LATCH_SET) if (event->events & WL_LATCH_SET)
{ {
/*
* A latch event might be preventing other events from being
* reported. Reset it and poll again. No need to restore it
* because no code should expect latches to survive across
* CHECK_FOR_INTERRUPTS().
*/
ResetLatch(MyLatch); ResetLatch(MyLatch);
goto retry; /* the caller should retry */
return true;
} }
} }
FreeWaitEventSet(waitEventSet); return false;
#endif
return socketClosed;
} }
#endif
/* /*
* ErrorIfMultipleMetadataConnectionExists throws an error if the * ErrorIfMultipleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections. * input connection dlist contains more than one metadata connections.
@ -790,7 +814,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
bool checkTransactionState = true; bool checkTransactionState = true;
List *healthyConnections = List *healthyConnections =
EnsureAndGetHealthyConnections(entry->connections, checkTransactionState); EnsureTxStateAndGetHealthyConnections(entry->connections, checkTransactionState);
int flags = 0; int flags = 0;
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags); MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
@ -1026,7 +1050,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
*waitCount = 0; *waitCount = 0;
} }
WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); WaitEventSet *waitEventSet = CreateWaitEventSet(CurrentMemoryContext,
eventSetSize);
EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet), EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet),
waitEventSet); waitEventSet);
@ -1034,7 +1059,8 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
* Put the wait events for the signal latch and postmaster death at the end such that * Put the wait events for the signal latch and postmaster death at the end such that
* event index + pendingConnectionsStartIndex = the connection index in the array. * event index + pendingConnectionsStartIndex = the connection index in the array.
*/ */
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
NULL);
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
numEventsAdded += 2; numEventsAdded += 2;
@ -1066,8 +1092,9 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
errmsg("connection establishment for node %s:%d failed", errmsg("connection establishment for node %s:%d failed",
connectionState->connection->hostname, connectionState->connection->hostname,
connectionState->connection->port), connectionState->connection->port),
errhint("Check both the local and remote server logs for the " errhint(
"connection establishment errors."))); "Check both the local and remote server logs for the "
"connection establishment errors.")));
} }
numEventsAdded++; numEventsAdded++;
@ -1225,12 +1252,14 @@ FinishConnectionListEstablishment(List *multiConnectionList)
if (!success) if (!success)
{ {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d " errmsg(
"failed", connection->hostname, "connection establishment for node %s:%d "
connection->port), "failed", connection->hostname,
errhint("Check both the local and remote server " connection->port),
"logs for the connection establishment " errhint(
"errors."))); "Check both the local and remote server "
"logs for the connection establishment "
"errors.")));
} }
} }
@ -1515,7 +1544,8 @@ FindOrCreateConnParamsEntry(ConnectionHashKey *key)
} }
/* if not found or not valid, compute them from GUC, runtime, etc. */ /* if not found or not valid, compute them from GUC, runtime, etc. */
GetConnParams(key, &entry->keywords, &entry->values, &entry->runtimeParamStart, GetConnParams(key, &entry->keywords, &entry->values,
&entry->runtimeParamStart,
ConnectionContext); ConnectionContext);
entry->isValid = true; entry->isValid = true;

View File

@ -0,0 +1,148 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
CREATE SCHEMA socket_close;
SET search_path TO socket_close;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE socket_test_table(id bigserial, value text);
SELECT create_distributed_table('socket_test_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
-- first, simulate that we only have one cached connection per node
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- kill all the cached backends on the workers
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
1
1
(2 rows)
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
0
0
(2 rows)
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- now, use 16 connections per worker, we can still recover all connections
SET citus.max_adaptive_executor_pool_size TO 16;
SET citus.max_cached_conns_per_worker TO 16;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
16
16
(2 rows)
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
0
0
(2 rows)
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
101
(1 row)
-- now, get back to sane defaults
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO OFF;
-- we can recover for modification queries as well
-- single row INSERT
INSERT INTO socket_test_table VALUES (1);
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
result
---------------------------------------------------------------------
1
1
(2 rows)
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
0
0
(2 rows)
INSERT INTO socket_test_table VALUES (1);
-- single row UPDATE
UPDATE socket_test_table SET value = 15 WHERE id = 1;
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
result
---------------------------------------------------------------------
0
1
(2 rows)
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
0
0
(2 rows)
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- we cannot recover in a transaction block
BEGIN;
SELECT count(*) FROM socket_test_table;
count
---------------------------------------------------------------------
103
(1 row)
-- kill all the cached backends on the workers
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
result
---------------------------------------------------------------------
1
1
(2 rows)
SELECT count(*) FROM socket_test_table;
ERROR: terminating connection due to administrator command
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
SET client_min_messages TO ERROR;
DROP SCHEMA socket_close CASCADE;

View File

@ -0,0 +1,9 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -0,0 +1,85 @@
--
-- PG15+ test as WL_SOCKET_CLOSED exposed for PG15+
--
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
CREATE SCHEMA socket_close;
SET search_path TO socket_close;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE TABLE socket_test_table(id bigserial, value text);
SELECT create_distributed_table('socket_test_table', 'id');
INSERT INTO socket_test_table (value) SELECT i::text FROM generate_series(0,100)i;
-- first, simulate that we only have one cached connection per node
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
-- even though the cached connections closed, the execution recovers and establishes new connections
SELECT count(*) FROM socket_test_table;
-- now, use 16 connections per worker, we can still recover all connections
SET citus.max_adaptive_executor_pool_size TO 16;
SET citus.max_cached_conns_per_worker TO 16;
SET citus.force_max_query_parallelization TO ON;
SELECT count(*) FROM socket_test_table;
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
SELECT count(*) FROM socket_test_table;
-- now, get back to sane defaults
SET citus.max_adaptive_executor_pool_size TO 1;
SET citus.max_cached_conns_per_worker TO 1;
SET citus.force_max_query_parallelization TO OFF;
-- we can recover for modification queries as well
-- single row INSERT
INSERT INTO socket_test_table VALUES (1);
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
INSERT INTO socket_test_table VALUES (1);
-- single row UPDATE
UPDATE socket_test_table SET value = 15 WHERE id = 1;
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$) ORDER BY result;
-- show that none remains
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pid FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
UPDATE socket_test_table SET value = 15 WHERE id = 1;
-- we cannot recover in a transaction block
BEGIN;
SELECT count(*) FROM socket_test_table;
-- kill all the cached backends on the workers
SELECT result FROM run_command_on_workers($$SELECT count(*) FROM (SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query ilike '%socket_test_table%' AND pid !=pg_backend_pid()) as foo$$);
SELECT count(*) FROM socket_test_table;
ROLLBACK;
SET client_min_messages TO ERROR;
DROP SCHEMA socket_close CASCADE;