diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f8745d466..a36423881 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -329,13 +329,13 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) PQclear(queryResult); /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; } PG_CATCH(); { /* close the connection */ - PQfinish(masterConnection); + CloseConnectionByPGconn(masterConnection); masterConnection = NULL; PG_RE_THROW(); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d51d047a4..f2294fca0 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -111,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip Tuplestorestate *tupleStore); static PGconn * GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery); -static void PurgeConnectionForPlacement(ShardPlacement *placement); +static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement); static void RemoveXactConnection(PGconn *connection); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -277,7 +277,7 @@ InitTransactionStateForTask(Task *task) if (PQresultStatus(result) != PGRES_COMMAND_OK) { WarnRemoteError(connection, result); - PurgeConnection(connection); + CloseConnectionByPGconn(connection); connection = NULL; } @@ -794,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); continue; } @@ -852,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task, } else { - PurgeConnectionForPlacement(taskPlacement); + PurgeConnectionForPlacement(connection, taskPlacement); failedPlacementList = lappend(failedPlacementList, taskPlacement); @@ -956,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; - shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ @@ -966,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + /* iterate over placements in rounds, to ensure in-order execution */ while (tasksPending) { @@ -1234,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery) * for the transaction in addition to purging the connection cache's entry. */ static void -PurgeConnectionForPlacement(ShardPlacement *placement) +PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement) { - NodeConnectionKey nodeKey; - char *currentUser = CurrentUserName(); - - MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); - strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); - nodeKey.nodePort = placement->nodePort; - strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); - - PurgeConnectionByKey(&nodeKey); + CloseConnectionByPGconn(connection); /* * The following is logically identical to RemoveXactConnection, but since @@ -1256,7 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement) { NodeConnectionEntry *participantEntry = NULL; bool entryFound = false; + NodeConnectionKey nodeKey; + char *currentUser = CurrentUserName(); + MemSet(&nodeKey, 0, sizeof(NodeConnectionKey)); + strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1); + nodeKey.nodePort = placement->nodePort; + strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN); Assert(IsTransactionBlock()); /* the participant hash doesn't use the user field */ @@ -1862,7 +1860,7 @@ ExecuteTransactionEnd(bool commit) else { WarnRemoteError(connection, result); - PurgeConnection(participant->connection); + CloseConnectionByPGconn(participant->connection); participant->connection = NULL; } diff --git a/src/backend/distributed/master/master_citus_tools.c b/src/backend/distributed/master/master_citus_tools.c index d7d571e97..f8632d6a2 100644 --- a/src/backend/distributed/master/master_citus_tools.c +++ b/src/backend/distributed/master/master_citus_tools.c @@ -19,6 +19,7 @@ #include "access/htup_details.h" #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" @@ -269,7 +270,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor { StoreErrorMessage(connection, queryResultString); statusArray[commandIndex] = false; - PQfinish(connection); + CloseConnectionByPGconn(connection); connectionArray[commandIndex] = NULL; finishedCount++; } @@ -298,7 +299,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor finishedCount++; statusArray[commandIndex] = success; connectionArray[commandIndex] = NULL; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } @@ -509,7 +510,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, PQclear(queryResult); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_CATCH(); @@ -517,7 +518,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString, StoreErrorMessage(nodeConnection, queryResultString); /* close the connection */ - PQfinish(nodeConnection); + CloseConnectionByPGconn(nodeConnection); nodeConnection = NULL; } PG_END_TRY(); diff --git a/src/backend/distributed/test/connection_cache.c b/src/backend/distributed/test/connection_cache.c index 8c46d0dd5..e88204c5a 100644 --- a/src/backend/distributed/test/connection_cache.c +++ b/src/backend/distributed/test/connection_cache.c @@ -21,6 +21,7 @@ #include "catalog/pg_type.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ #include "utils/elog.h" @@ -124,7 +125,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } @@ -148,7 +149,7 @@ connect_and_purge_connection(PG_FUNCTION_ARGS) PG_RETURN_BOOL(false); } - PurgeConnection(connection); + CloseConnectionByPGconn(connection); PG_RETURN_BOOL(true); } diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index c94b77abc..9f3d15c0f 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -16,6 +16,7 @@ #include "distributed/colocation_utils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" @@ -361,6 +362,6 @@ CloseConnections(List *connectionList) (TransactionConnection *) lfirst(connectionCell); PGconn *connection = transactionConnection->connection; - PQfinish(connection); + CloseConnectionByPGconn(connection); } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ba5cebdb4..faca72802 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -233,12 +233,12 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char PQgetResult(workerConnection); /* we no longer need this connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); } PG_CATCH(); { /* close the connection */ - PQfinish(workerConnection); + CloseConnectionByPGconn(workerConnection); PG_RE_THROW(); } @@ -280,7 +280,7 @@ RemoveWorkerTransaction(char *nodeName, int32 nodePort) PGconn *connection = transactionConnection->connection; /* closing the connection will rollback all uncommited transactions */ - PQfinish(connection); + CloseConnectionByPGconn(connection); workerConnectionList = list_delete(workerConnectionList, transactionConnection); } diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index db252d8c6..7db3c35e4 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -2,7 +2,7 @@ * * connection_cache.c * - * This file contains functions to implement a connection hash. + * Legacy connection caching layer. Will be removed entirely. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -19,8 +19,10 @@ #include #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/connection_cache.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" #include "utils/elog.h" @@ -29,15 +31,8 @@ #include "utils/memutils.h" #include "utils/palloc.h" -/* - * NodeConnectionHash is the connection hash itself. It begins uninitialized. - * The first call to GetOrEstablishConnection triggers hash creation. - */ -static HTAB *NodeConnectionHash = NULL; - /* local function forward declarations */ -static HTAB * CreateNodeConnectionHash(void); static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); @@ -56,77 +51,26 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort) { + int connectionFlags = SESSION_LIFESPAN; PGconn *connection = NULL; - NodeConnectionKey nodeConnectionKey; - NodeConnectionEntry *nodeConnectionEntry = NULL; - bool entryFound = false; - bool needNewConnection = true; - char *userName = CurrentUserName(); + MultiConnection *multiConnection = + GetNodeConnection(connectionFlags, nodeName, nodePort); - /* check input */ - if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); + connection = multiConnection->pgConn; } - - /* if first call, initialize the connection hash */ - if (NodeConnectionHash == NULL) + else { - NodeConnectionHash = CreateNodeConnectionHash(); - } - - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = nodePort; - strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); - - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_FIND, &entryFound); - if (entryFound) - { - connection = nodeConnectionEntry->connection; - if (PQstatus(connection) == CONNECTION_OK) - { - needNewConnection = false; - } - else - { - PurgeConnection(connection); - } - } - - if (needNewConnection) - { - connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser); - if (connection != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_ENTER, &entryFound); - nodeConnectionEntry->connection = connection; - } + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; } -/* - * PurgeConnection removes the given connection from the connection hash and - * closes it using PQfinish. If our hash does not contain the given connection, - * this method simply prints a warning and exits. - */ -void -PurgeConnection(PGconn *connection) -{ - NodeConnectionKey nodeConnectionKey; - - BuildKeyForConnection(connection, &nodeConnectionKey); - PurgeConnectionByKey(&nodeConnectionKey); -} - - /* * Utility method to simplify populating a connection cache key with relevant * fields from a provided connection. @@ -170,29 +114,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) } -PGconn * -PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) -{ - bool entryFound = false; - NodeConnectionEntry *nodeConnectionEntry = NULL; - PGconn *connection = NULL; - - if (NodeConnectionHash != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, - HASH_REMOVE, &entryFound); - } - - if (entryFound) - { - connection = nodeConnectionEntry->connection; - PQfinish(nodeConnectionEntry->connection); - } - - return connection; -} - - /* * WarnRemoteError retrieves error fields from a remote result and produces an * error report at the WARNING level after amending the error with a CONTEXT @@ -261,13 +182,11 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } /* - * If requested, actually raise an error. This necessitates purging the - * connection so it doesn't remain in the hash in an invalid state. + * If requested, actually raise an error. */ if (raiseError) { errorLevel = ERROR; - PurgeConnection(connection); } if (sqlState == ERRCODE_CONNECTION_FAILURE) @@ -288,79 +207,34 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } -/* - * CreateNodeConnectionHash returns a newly created hash table suitable for - * storing unlimited connections indexed by node name and port. - */ -static HTAB * -CreateNodeConnectionHash(void) -{ - HTAB *nodeConnectionHash = NULL; - HASHCTL info; - int hashFlags = 0; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hash = tag_hash; - info.hcxt = CacheMemoryContext; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags); - - return nodeConnectionHash; -} - - /* * ConnectToNode opens a connection to a remote PostgreSQL server. The function * configures the connection's fallback application name to 'citus' and sets * the remote encoding to match the local one. All parameters are required to * be non NULL. * - * We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up - * and return NULL. + * This is only a thin layer over connection_management.[ch], and will be + * removed soon. */ PGconn * ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) { + /* don't want already established connections */ + int connectionFlags = FORCE_NEW_CONNECTION; PGconn *connection = NULL; - const char *clientEncoding = GetDatabaseEncodingName(); - const char *dbname = get_database_name(MyDatabaseId); - int attemptIndex = 0; + MultiConnection *multiConnection = + GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, nodeUser, + NULL); - const char *keywordArray[] = { - "host", "port", "fallback_application_name", - "client_encoding", "connect_timeout", "dbname", "user", NULL - }; - char nodePortString[12]; - const char *valueArray[] = { - nodeName, nodePortString, "citus", clientEncoding, - CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, nodeUser, NULL - }; - - sprintf(nodePortString, "%d", nodePort); - - Assert(sizeof(keywordArray) == sizeof(valueArray)); - - for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) + if (PQstatus(multiConnection->pgConn) == CONNECTION_OK) { - connection = PQconnectdbParams(keywordArray, valueArray, false); - if (PQstatus(connection) == CONNECTION_OK) - { - break; - } - else - { - /* warn if still erroring on final attempt */ - if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1) - { - WarnRemoteError(connection, NULL); - } - - PQfinish(connection); - connection = NULL; - } + connection = multiConnection->pgConn; + } + else + { + ReportConnectionError(multiConnection, WARNING); + CloseConnection(multiConnection); + connection = NULL; } return connection; diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 3d9fa1260..363042a94 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -16,21 +16,8 @@ #include "c.h" #include "libpq-fe.h" -#include "nodes/pg_list.h" #include "utils/hsearch.h" - -/* maximum duration to wait for connection */ -#define CLIENT_CONNECT_TIMEOUT_SECONDS "5" - -/* maximum (textual) lengths of hostname and port */ -#define MAX_NODE_LENGTH 255 -#define MAX_PORT_LENGTH 10 - -/* times to attempt connection (or reconnection) */ -#define MAX_CONNECT_ATTEMPTS 2 - -/* SQL statement for testing */ -#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$" +#include "distributed/connection_management.h" /* * NodeConnectionKey acts as the key to index into the (process-local) hash @@ -54,9 +41,7 @@ typedef struct NodeConnectionEntry /* function declarations for obtaining and using a connection */ extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); -extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); -extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/test/regress/expected/multi_connection_cache.out b/src/test/regress/expected/multi_connection_cache.out index 79cf59dd2..9ac26c029 100644 --- a/src/test/regress/expected/multi_connection_cache.out +++ b/src/test/regress/expected/multi_connection_cache.out @@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer) \set VERBOSITY terse -- connect to non-existent host SELECT initialize_remote_temp_table('dummy-host-name', 12345); -WARNING: connection failed to dummy-host-name:12345 +WARNING: connection error: dummy-host-name:12345 initialize_remote_temp_table ------------------------------ f