From e09811cfe096bdb823fa1bf4f9983891cc62a97e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 1 Apr 2020 13:53:23 +0200 Subject: [PATCH] When force_max_query_parallelization is enabled, do not wait --- .../connection/connection_management.c | 28 ++++++++--- .../connection/shared_connection_stats.c | 46 +++++++++++++++++++ .../distributed/executor/adaptive_executor.c | 9 ++++ .../distributed/connection_management.h | 9 +++- .../distributed/shared_connection_stats.h | 1 + .../expected/shared_connection_stats.out | 12 +++-- .../regress/sql/shared_connection_stats.sql | 7 ++- 7 files changed, 96 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 080511a7b..05d59590c 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -319,14 +319,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } } - /* - * We can afford to skip establishing an optional connection. For - * non-optional connections, we first retry for some time. If we still - * cannot reserve the right to establish a connection, we prefer to - * error out. - */ - if (flags & OPTIONAL_CONNECTION) + + if (flags & NEVER_WAIT_FOR_CONNECTION) { + /* + * The caller doesn't want the connection manager to wait + * until a connection slot is avaliable on the remote node. + * In the end, we might fail to establish connection to the + * remote node as it might not have any space in + * max_connections for this connection establishment. + * + * Still, we keep track of the connnection counter. + */ + IncrementSharedConnectionCounter(hostname, port); + } + else if (flags & OPTIONAL_CONNECTION) + { + /* + * We can afford to skip establishing an optional connection. For + * non-optional connections, we first retry for some time. If we still + * cannot reserve the right to establish a connection, we prefer to + * error out. + */ if (!TryToIncrementSharedConnectionCounter(hostname, port)) { return NULL; diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index d1936f209..e9050c657 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -315,6 +315,52 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) } +/* + * IncrementSharedConnectionCounter increments the shared counter + * for the given hostname and port. + */ +void +IncrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + if (GetMaxSharedPoolSize() == -1) + { + /* connection throttling disabled */ + return; + } + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(LW_EXCLUSIVE); + + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + + /* this worker node is removed or updated */ + if (!entryFound) + { + UnLockConnectionSharedMemory(); + + return; + } + + connectionEntry->connectionCount += 1; + + UnLockConnectionSharedMemory(); +} + + /* * DecrementSharedConnectionCounter decrements the shared counter * for the given hostname and port. diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 45328dab4..217445368 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2393,6 +2393,15 @@ ManageWorkerPool(WorkerPool *workerPool) */ connectionFlags |= OPTIONAL_CONNECTION; } + else if (UseConnectionPerPlacement()) + { + /* + * The executor can finish the execution with a single connection, + * remaining are optional. If the executor can get more connections, + * it can increase the parallelism. + */ + connectionFlags |= NEVER_WAIT_FOR_CONNECTION; + } /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 09975fc03..dc32b7e9d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -60,7 +60,14 @@ enum MultiConnectionMode * per node. In that case, the connection manager may decide not to allow the * connection. */ - OPTIONAL_CONNECTION = 1 << 5 + OPTIONAL_CONNECTION = 1 << 5, + + /* + * Via connection throttling, the connection establishments may be suspended + * until a connection slot is empty to the remote host. When this flag is passed, + * the connection manager skips waiting. + */ + NEVER_WAIT_FOR_CONNECTION = 1 << 6 }; diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index abb13dfe6..717797053 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -23,5 +23,6 @@ extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); +extern void IncrementSharedConnectionCounter(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/shared_connection_stats.out b/src/test/regress/expected/shared_connection_stats.out index 77d14e99f..450afda09 100644 --- a/src/test/regress/expected/shared_connection_stats.out +++ b/src/test/regress/expected/shared_connection_stats.out @@ -192,8 +192,8 @@ ORDER BY 0 (2 rows) --- now, decrease the shared pool size, and prevent --- establishing all the required connections +-- now, decrease the shared pool size, and still force +-- one connection per placement ALTER SYSTEM SET citus.max_shared_pool_size TO 5; SELECT pg_reload_conf(); pg_reload_conf @@ -211,8 +211,12 @@ BEGIN; SET LOCAL citus.node_connection_timeout TO 1000; SET LOCAL citus.connection_retry_timeout TO 2000; SET LOCAL citus.force_max_query_parallelization TO ON; --- TODO: This query got stuck --- SELECT count(*) FROM test; + SELECT count(*) FROM test; + count +--------------------------------------------------------------------- + 101 +(1 row) + COMMIT; -- pg_sleep forces almost 1 connection per placement -- now, some of the optional connections would be skipped, diff --git a/src/test/regress/sql/shared_connection_stats.sql b/src/test/regress/sql/shared_connection_stats.sql index 1d0d7873a..6b98c5578 100644 --- a/src/test/regress/sql/shared_connection_stats.sql +++ b/src/test/regress/sql/shared_connection_stats.sql @@ -117,8 +117,8 @@ WHERE ORDER BY hostname, port; --- now, decrease the shared pool size, and prevent --- establishing all the required connections +-- now, decrease the shared pool size, and still force +-- one connection per placement ALTER SYSTEM SET citus.max_shared_pool_size TO 5; SELECT pg_reload_conf(); SELECT pg_sleep(0.1); @@ -127,8 +127,7 @@ BEGIN; SET LOCAL citus.node_connection_timeout TO 1000; SET LOCAL citus.connection_retry_timeout TO 2000; SET LOCAL citus.force_max_query_parallelization TO ON; --- TODO: This query got stuck --- SELECT count(*) FROM test; + SELECT count(*) FROM test; COMMIT; -- pg_sleep forces almost 1 connection per placement