Get rid of PG_TRY/PG_CATCH

preventConflictingFlags
Onder Kalaci 2020-04-08 16:09:08 +02:00
parent 2a58d89b90
commit d1ac5c5e00
2 changed files with 21 additions and 41 deletions

View File

@ -48,7 +48,8 @@ MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize); static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -354,43 +355,19 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* Either no caching desired, or no pre-established, non-claimed, * Either no caching desired, or no pre-established, non-claimed,
* connection present. Initiate connection establishment. * connection present. Initiate connection establishment.
*/ */
volatile MultiConnection *connection = NULL; MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
PG_TRY(); sizeof(MultiConnection));
{
connection = StartConnectionEstablishment(&key);
}
PG_CATCH();
{
/*
* Something went wrong, make sure to decrement
* here otherwise we'd leak the increment we have done for this
* connection attempt.
*/
if (connection->pgConn == NULL)
{
DecrementSharedConnectionCounter(hostname, port);
}
else
{
CitusPQFinish((MultiConnection *) connection);
}
PG_RE_THROW(); /* we've already incremented the counter above */
} connection->sharedCounterIncremented = true;
PG_END_TRY();
dlist_push_tail(entry->connections, &connection->connectionNode);
/* StartConnectionEstablishment(connection, &key);
* Cast volatile MultiConnection pointer to MultiConnection pointer
* for ease of use.
*/
MultiConnection *newConnection = (MultiConnection *) connection;
dlist_push_tail(entry->connections, &newConnection->connectionNode); ResetShardPlacementAssociation(connection);
ResetShardPlacementAssociation(newConnection); return connection;
return newConnection;
} }
@ -969,7 +946,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
/* /*
* CitusPQFinish is a wrapper around * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection
* counters.
*/ */
static void static void
CitusPQFinish(MultiConnection *connection) CitusPQFinish(MultiConnection *connection)
@ -978,8 +956,13 @@ CitusPQFinish(MultiConnection *connection)
{ {
PQfinish(connection->pgConn); PQfinish(connection->pgConn);
connection->pgConn = NULL; connection->pgConn = NULL;
}
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->sharedCounterIncremented)
{
DecrementSharedConnectionCounter(connection->hostname, connection->port); DecrementSharedConnectionCounter(connection->hostname, connection->port);
connection->sharedCounterIncremented = false;
} }
} }
@ -1060,8 +1043,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
* Asynchronously establish connection to a remote node, but don't wait for * Asynchronously establish connection to a remote node, but don't wait for
* that to finish. DNS lookups etc. are performed synchronously though. * that to finish. DNS lookups etc. are performed synchronously though.
*/ */
static MultiConnection * static void
StartConnectionEstablishment(ConnectionHashKey *key) StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
{ {
bool found = false; bool found = false;
static uint64 connectionId = 1; static uint64 connectionId = 1;
@ -1083,9 +1066,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
entry->isValid = true; entry->isValid = true;
} }
MultiConnection *connection = MemoryContextAllocZero(ConnectionContext,
sizeof(MultiConnection));
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port; connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->database, key->database, NAMEDATALEN);
@ -1105,8 +1085,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
PQsetnonblocking(connection->pgConn, true); PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeProcessor(connection); SetCitusNoticeProcessor(connection);
return connection;
} }

View File

@ -129,6 +129,8 @@ typedef struct MultiConnection
/* number of bytes sent to PQputCopyData() since last flush */ /* number of bytes sent to PQputCopyData() since last flush */
uint64 copyBytesWrittenSinceLastFlush; uint64 copyBytesWrittenSinceLastFlush;
bool sharedCounterIncremented;
} MultiConnection; } MultiConnection;