diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 27782ee9e..0b66ff815 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -34,6 +34,7 @@ #include "distributed/version_compat.h" #include "mb/pg_wchar.h" #include "portability/instr_time.h" +#include "storage/ipc.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -318,12 +319,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } } + /* + * We can afford to skip establishing an optional connection. For + * non-optional connections, we first retry for some time. If we still + * cannot reserve the right to establish a connection, we prefer to + * error out. + */ + if (flags & OPTIONAL_CONNECTION) + { + if (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + return NULL; + } + } + else + { + WaitOrErrorForSharedConnection(hostname, port); + } + /* * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ - TryToIncrementSharedConnectionCounter(hostname, port); - connection = StartConnectionEstablishment(&key); dlist_push_tail(entry->connections, &connection->connectionNode); diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 63e7b285d..4ca623637 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -21,6 +21,7 @@ #include "access/htup_details.h" #include "catalog/pg_authid.h" #include "commands/dbcommands.h" +#include "distributed/cancel_utils.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" @@ -168,8 +169,64 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) /* - * Tries to increment the shared connection counter for the given nodeId and - * the current database in SharedConnStatsHash. + * WaitOrErrorForSharedConnection tries to increment the shared connection + * counter for the given hostname/port and the current database in + * SharedConnStatsHash. + * + * The function implements a retry mechanism. If the function cannot increment + * the counter withing the specificed amount of the time, it throws an error. + */ +void +WaitOrErrorForSharedConnection(const char *hostname, int port) +{ + int counter = 0; + + while (!TryToIncrementSharedConnectionCounter(hostname, port)) + { + int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + double timeoutMsec = 100.0; + + CHECK_FOR_INTERRUPTS(); + + int rc = WaitLatch(MyLatch, latchFlags, (long) timeoutMsec, PG_WAIT_EXTENSION); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + else if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + + if (IsHoldOffCancellationReceived()) + { + /* in case the interrupts are hold, we still want to cancel */ + ereport(ERROR, (errmsg("canceling statement due to user request"))); + } + } + else if (rc & WL_TIMEOUT) + { + ++counter; + if (counter == 10) + { + ereport(ERROR, (errmsg("citus.max_shared_pool_size connections are " + "already established to the node %s:%d," + "so cannot establish any more connections", + hostname, port), + errhint("consider increasing " + "citus.max_shared_pool_size"))); + } + } + } +} + + +/* + * TryToIncrementSharedConnectionCounter tries to increment the shared + * connection counter for the given nodeId and the current database in + * SharedConnStatsHash. * * The function first checks whether the number of connections is less than * citus.max_shared_pool_size. If so, the function increments the counter diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 330f2a410..8430dc61d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2386,11 +2386,27 @@ ManageWorkerPool(WorkerPool *workerPool) connectionFlags |= OUTSIDE_TRANSACTION; } + if (list_length(workerPool->sessionList) > 0 && !UseConnectionPerPlacement()) + { + /* + * The executor can finish the execution with a single connection, + * remaining are optional. If the executor can get more connections, + * it can increase the parallelism. + */ + connectionFlags |= OPTIONAL_CONNECTION; + } + /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, workerPool->nodeName, workerPool->nodePort, NULL, NULL); + if (!connection) + { + /* connection can only be NULL for optional connections */ + Assert((connectionFlags & connectionFlags)); + continue; + } /* * Assign the initial state in the connection state machine. The connection diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 015546158..09975fc03 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -52,7 +52,15 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, - OUTSIDE_TRANSACTION = 1 << 4 + OUTSIDE_TRANSACTION = 1 << 4, + + /* + * Some connections are optionally required such as when adaptive executor is + * executing a multi-shard command and requires the second (or further) connections + * per node. In that case, the connection manager may decide not to allow the + * connection. + */ + OPTIONAL_CONNECTION = 1 << 5 }; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index f3dd45833..b82927ad6 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -15,5 +15,6 @@ extern int MaxTrackedWorkerNodes; extern void InitializeSharedConnectionStats(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); +extern void WaitOrErrorForSharedConnection(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */