Naive implementation for Optional connection concept

preventConflictingFlags
Onder Kalaci 2020-03-25 14:30:24 +01:00
parent 66e7b822aa
commit a385c44f15
5 changed files with 104 additions and 5 deletions

View File

@ -34,6 +34,7 @@
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "storage/ipc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -318,12 +319,28 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
} }
} }
/*
* We can afford to skip establishing an optional connection. For
* non-optional connections, we first retry for some time. If we still
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (flags & OPTIONAL_CONNECTION)
{
if (!TryToIncrementSharedConnectionCounter(hostname, port))
{
return NULL;
}
}
else
{
WaitOrErrorForSharedConnection(hostname, port);
}
/* /*
* Either no caching desired, or no pre-established, non-claimed, * Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment. * connection present. Initiate connection establishment.
*/ */
TryToIncrementSharedConnectionCounter(hostname, port);
connection = StartConnectionEstablishment(&key); connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);

View File

@ -21,6 +21,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
@ -168,8 +169,64 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
/* /*
* Tries to increment the shared connection counter for the given nodeId and * WaitOrErrorForSharedConnection tries to increment the shared connection
* the current database in SharedConnStatsHash. * counter for the given hostname/port and the current database in
* SharedConnStatsHash.
*
* The function implements a retry mechanism. If the function cannot increment
* the counter withing the specificed amount of the time, it throws an error.
*/
void
WaitOrErrorForSharedConnection(const char *hostname, int port)
{
int counter = 0;
while (!TryToIncrementSharedConnectionCounter(hostname, port))
{
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
double timeoutMsec = 100.0;
CHECK_FOR_INTERRUPTS();
int rc = WaitLatch(MyLatch, latchFlags, (long) timeoutMsec, PG_WAIT_EXTENSION);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
else if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (IsHoldOffCancellationReceived())
{
/* in case the interrupts are hold, we still want to cancel */
ereport(ERROR, (errmsg("canceling statement due to user request")));
}
}
else if (rc & WL_TIMEOUT)
{
++counter;
if (counter == 10)
{
ereport(ERROR, (errmsg("citus.max_shared_pool_size connections are "
"already established to the node %s:%d,"
"so cannot establish any more connections",
hostname, port),
errhint("consider increasing "
"citus.max_shared_pool_size")));
}
}
}
}
/*
* TryToIncrementSharedConnectionCounter tries to increment the shared
* connection counter for the given nodeId and the current database in
* SharedConnStatsHash.
* *
* The function first checks whether the number of connections is less than * The function first checks whether the number of connections is less than
* citus.max_shared_pool_size. If so, the function increments the counter * citus.max_shared_pool_size. If so, the function increments the counter

View File

@ -2386,11 +2386,27 @@ ManageWorkerPool(WorkerPool *workerPool)
connectionFlags |= OUTSIDE_TRANSACTION; connectionFlags |= OUTSIDE_TRANSACTION;
} }
if (list_length(workerPool->sessionList) > 0 && !UseConnectionPerPlacement())
{
/*
* The executor can finish the execution with a single connection,
* remaining are optional. If the executor can get more connections,
* it can increase the parallelism.
*/
connectionFlags |= OPTIONAL_CONNECTION;
}
/* open a new connection to the worker */ /* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
workerPool->nodeName, workerPool->nodeName,
workerPool->nodePort, workerPool->nodePort,
NULL, NULL); NULL, NULL);
if (!connection)
{
/* connection can only be NULL for optional connections */
Assert((connectionFlags & connectionFlags));
continue;
}
/* /*
* Assign the initial state in the connection state machine. The connection * Assign the initial state in the connection state machine. The connection

View File

@ -52,7 +52,15 @@ enum MultiConnectionMode
/* open a connection per (co-located set of) placement(s) */ /* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3, CONNECTION_PER_PLACEMENT = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4 OUTSIDE_TRANSACTION = 1 << 4,
/*
* Some connections are optionally required such as when adaptive executor is
* executing a multi-shard command and requires the second (or further) connections
* per node. In that case, the connection manager may decide not to allow the
* connection.
*/
OPTIONAL_CONNECTION = 1 << 5
}; };

View File

@ -15,5 +15,6 @@ extern int MaxTrackedWorkerNodes;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */ #endif /* SHARED_CONNECTION_STATS_H */