When force_max_query_parallelization is enabled, do not wait

preventConflictingFlags
Onder Kalaci 2020-04-01 13:53:23 +02:00
parent 7f578c9c68
commit e09811cfe0
7 changed files with 96 additions and 16 deletions

View File

@ -319,14 +319,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
}
}
/*
* We can afford to skip establishing an optional connection. For
* non-optional connections, we first retry for some time. If we still
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (flags & OPTIONAL_CONNECTION)
if (flags & NEVER_WAIT_FOR_CONNECTION)
{
/*
* The caller doesn't want the connection manager to wait
* until a connection slot is avaliable on the remote node.
* In the end, we might fail to establish connection to the
* remote node as it might not have any space in
* max_connections for this connection establishment.
*
* Still, we keep track of the connnection counter.
*/
IncrementSharedConnectionCounter(hostname, port);
}
else if (flags & OPTIONAL_CONNECTION)
{
/*
* We can afford to skip establishing an optional connection. For
* non-optional connections, we first retry for some time. If we still
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (!TryToIncrementSharedConnectionCounter(hostname, port))
{
return NULL;

View File

@ -315,6 +315,52 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
}
/*
* IncrementSharedConnectionCounter increments the shared counter
* for the given hostname and port.
*/
void
IncrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == -1)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
/* this worker node is removed or updated */
if (!entryFound)
{
UnLockConnectionSharedMemory();
return;
}
connectionEntry->connectionCount += 1;
UnLockConnectionSharedMemory();
}
/*
* DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port.

View File

@ -2393,6 +2393,15 @@ ManageWorkerPool(WorkerPool *workerPool)
*/
connectionFlags |= OPTIONAL_CONNECTION;
}
else if (UseConnectionPerPlacement())
{
/*
* The executor can finish the execution with a single connection,
* remaining are optional. If the executor can get more connections,
* it can increase the parallelism.
*/
connectionFlags |= NEVER_WAIT_FOR_CONNECTION;
}
/* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,

View File

@ -60,7 +60,14 @@ enum MultiConnectionMode
* per node. In that case, the connection manager may decide not to allow the
* connection.
*/
OPTIONAL_CONNECTION = 1 << 5
OPTIONAL_CONNECTION = 1 << 5,
/*
* Via connection throttling, the connection establishments may be suspended
* until a connection slot is empty to the remote host. When this flag is passed,
* the connection manager skips waiting.
*/
NEVER_WAIT_FOR_CONNECTION = 1 << 6
};

View File

@ -23,5 +23,6 @@ extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */

View File

@ -192,8 +192,8 @@ ORDER BY
0
(2 rows)
-- now, decrease the shared pool size, and prevent
-- establishing all the required connections
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
pg_reload_conf
@ -211,8 +211,12 @@ BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.connection_retry_timeout TO 2000;
SET LOCAL citus.force_max_query_parallelization TO ON;
-- TODO: This query got stuck
-- SELECT count(*) FROM test;
SELECT count(*) FROM test;
count
---------------------------------------------------------------------
101
(1 row)
COMMIT;
-- pg_sleep forces almost 1 connection per placement
-- now, some of the optional connections would be skipped,

View File

@ -117,8 +117,8 @@ WHERE
ORDER BY
hostname, port;
-- now, decrease the shared pool size, and prevent
-- establishing all the required connections
-- now, decrease the shared pool size, and still force
-- one connection per placement
ALTER SYSTEM SET citus.max_shared_pool_size TO 5;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
@ -127,8 +127,7 @@ BEGIN;
SET LOCAL citus.node_connection_timeout TO 1000;
SET LOCAL citus.connection_retry_timeout TO 2000;
SET LOCAL citus.force_max_query_parallelization TO ON;
-- TODO: This query got stuck
-- SELECT count(*) FROM test;
SELECT count(*) FROM test;
COMMIT;
-- pg_sleep forces almost 1 connection per placement