From d1ac5c5e003cc2f1bf0c1d6ef1fe40b9658d8ca5 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 8 Apr 2020 16:09:08 +0200 Subject: [PATCH] Get rid of PG_TRY/PG_CATCH --- .../connection/connection_management.c | 60 ++++++------------- .../distributed/connection_management.h | 2 + 2 files changed, 21 insertions(+), 41 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 5dc221e36..a9a701988 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -48,7 +48,8 @@ MemoryContext ConnectionContext = NULL; static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); -static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); +static void StartConnectionEstablishment(MultiConnection *connectionn, + ConnectionHashKey *key); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -354,43 +355,19 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, * Either no caching desired, or no pre-established, non-claimed, * connection present. Initiate connection establishment. */ - volatile MultiConnection *connection = NULL; - PG_TRY(); - { - connection = StartConnectionEstablishment(&key); - } - PG_CATCH(); - { - /* - * Something went wrong, make sure to decrement - * here otherwise we'd leak the increment we have done for this - * connection attempt. - */ - if (connection->pgConn == NULL) - { - DecrementSharedConnectionCounter(hostname, port); - } - else - { - CitusPQFinish((MultiConnection *) connection); - } + MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, + sizeof(MultiConnection)); - PG_RE_THROW(); - } - PG_END_TRY(); + /* we've already incremented the counter above */ + connection->sharedCounterIncremented = true; + dlist_push_tail(entry->connections, &connection->connectionNode); - /* - * Cast volatile MultiConnection pointer to MultiConnection pointer - * for ease of use. - */ - MultiConnection *newConnection = (MultiConnection *) connection; + StartConnectionEstablishment(connection, &key); - dlist_push_tail(entry->connections, &newConnection->connectionNode); + ResetShardPlacementAssociation(connection); - ResetShardPlacementAssociation(newConnection); - - return newConnection; + return connection; } @@ -969,7 +946,8 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) /* - * CitusPQFinish is a wrapper around + * CitusPQFinish is a wrapper around PQfinish and does book keeping on shared connection + * counters. */ static void CitusPQFinish(MultiConnection *connection) @@ -978,8 +956,13 @@ CitusPQFinish(MultiConnection *connection) { PQfinish(connection->pgConn); connection->pgConn = NULL; + } + /* behave idempotently, there is no gurantee that CitusPQFinish() is called once */ + if (connection->sharedCounterIncremented) + { DecrementSharedConnectionCounter(connection->hostname, connection->port); + connection->sharedCounterIncremented = false; } } @@ -1060,8 +1043,8 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) * Asynchronously establish connection to a remote node, but don't wait for * that to finish. DNS lookups etc. are performed synchronously though. */ -static MultiConnection * -StartConnectionEstablishment(ConnectionHashKey *key) +static void +StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key) { bool found = false; static uint64 connectionId = 1; @@ -1083,9 +1066,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) entry->isValid = true; } - MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, - sizeof(MultiConnection)); - strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); @@ -1105,8 +1085,6 @@ StartConnectionEstablishment(ConnectionHashKey *key) PQsetnonblocking(connection->pgConn, true); SetCitusNoticeProcessor(connection); - - return connection; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 2255063b7..abb6119b4 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -129,6 +129,8 @@ typedef struct MultiConnection /* number of bytes sent to PQputCopyData() since last flush */ uint64 copyBytesWrittenSinceLastFlush; + + bool sharedCounterIncremented; } MultiConnection;