diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index d51d047a4..48084569d 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -956,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn "commands"))); } - XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; - shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ @@ -966,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); + XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD; + /* iterate over placements in rounds, to ensure in-order execution */ while (tasksPending) { diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index db252d8c6..f2a082651 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -2,7 +2,7 @@ * * connection_cache.c * - * This file contains functions to implement a connection hash. + * Legacy connection caching layer. Will be removed entirely. * * Copyright (c) 2014-2016, Citus Data, Inc. * @@ -19,8 +19,10 @@ #include #include "commands/dbcommands.h" +#include "distributed/connection_management.h" #include "distributed/connection_cache.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" #include "utils/elog.h" @@ -29,15 +31,8 @@ #include "utils/memutils.h" #include "utils/palloc.h" -/* - * NodeConnectionHash is the connection hash itself. It begins uninitialized. - * The first call to GetOrEstablishConnection triggers hash creation. - */ -static HTAB *NodeConnectionHash = NULL; - /* local function forward declarations */ -static HTAB * CreateNodeConnectionHash(void); static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError); @@ -56,56 +51,20 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort) { + int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN; PGconn *connection = NULL; - NodeConnectionKey nodeConnectionKey; - NodeConnectionEntry *nodeConnectionEntry = NULL; - bool entryFound = false; - bool needNewConnection = true; - char *userName = CurrentUserName(); + MultiConnection *mconnection = + GetNodeConnection(connectionFlags, nodeName, nodePort); - /* check input */ - if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH) + if (PQstatus(mconnection->conn) == CONNECTION_OK) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("hostname exceeds the maximum length of %d", - MAX_NODE_LENGTH))); + connection = mconnection->conn; } - - /* if first call, initialize the connection hash */ - if (NodeConnectionHash == NULL) + else { - NodeConnectionHash = CreateNodeConnectionHash(); - } - - memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey)); - strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1); - nodeConnectionKey.nodePort = nodePort; - strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN); - - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_FIND, &entryFound); - if (entryFound) - { - connection = nodeConnectionEntry->connection; - if (PQstatus(connection) == CONNECTION_OK) - { - needNewConnection = false; - } - else - { - PurgeConnection(connection); - } - } - - if (needNewConnection) - { - connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser); - if (connection != NULL) - { - nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey, - HASH_ENTER, &entryFound); - nodeConnectionEntry->connection = connection; - } + ReportConnectionError(mconnection, WARNING); + CloseConnection(mconnection); + connection = NULL; } return connection; @@ -123,6 +82,7 @@ PurgeConnection(PGconn *connection) NodeConnectionKey nodeConnectionKey; BuildKeyForConnection(connection, &nodeConnectionKey); + PurgeConnectionByKey(&nodeConnectionKey); } @@ -170,26 +130,24 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey) } -PGconn * +void PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey) { - bool entryFound = false; - NodeConnectionEntry *nodeConnectionEntry = NULL; - PGconn *connection = NULL; + int connectionFlags = CACHED_CONNECTION; + MultiConnection *connection; - if (NodeConnectionHash != NULL) + connection = + StartNodeUserDatabaseConnection( + connectionFlags, + nodeConnectionKey->nodeName, + nodeConnectionKey->nodePort, + nodeConnectionKey->nodeUser, + NULL); + + if (connection) { - nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey, - HASH_REMOVE, &entryFound); + CloseConnection(connection); } - - if (entryFound) - { - connection = nodeConnectionEntry->connection; - PQfinish(nodeConnectionEntry->connection); - } - - return connection; } @@ -288,30 +246,6 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError) } -/* - * CreateNodeConnectionHash returns a newly created hash table suitable for - * storing unlimited connections indexed by node name and port. - */ -static HTAB * -CreateNodeConnectionHash(void) -{ - HTAB *nodeConnectionHash = NULL; - HASHCTL info; - int hashFlags = 0; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeConnectionKey); - info.entrysize = sizeof(NodeConnectionEntry); - info.hash = tag_hash; - info.hcxt = CacheMemoryContext; - hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); - - nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags); - - return nodeConnectionHash; -} - - /* * ConnectToNode opens a connection to a remote PostgreSQL server. The function * configures the connection's fallback application name to 'citus' and sets @@ -320,13 +254,16 @@ CreateNodeConnectionHash(void) * * We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up * and return NULL. + * + * XXX: We unfortunately can't easily layer this over connection_managment.c + * as callers close connections themselves using PQfinish(). */ PGconn * ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) { + const char *dbname = get_database_name(MyDatabaseId); PGconn *connection = NULL; const char *clientEncoding = GetDatabaseEncodingName(); - const char *dbname = get_database_name(MyDatabaseId); int attemptIndex = 0; const char *keywordArray[] = { diff --git a/src/backend/distributed/utils/connection_management.c b/src/backend/distributed/utils/connection_management.c index 852d747dc..98d60b469 100644 --- a/src/backend/distributed/utils/connection_management.c +++ b/src/backend/distributed/utils/connection_management.c @@ -170,7 +170,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, bool found; dlist_iter iter; + /* do some minimal input checks */ strlcpy(key.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + key.port = port; if (user) { diff --git a/src/include/distributed/connection_cache.h b/src/include/distributed/connection_cache.h index 3d9fa1260..073b4290f 100644 --- a/src/include/distributed/connection_cache.h +++ b/src/include/distributed/connection_cache.h @@ -56,7 +56,7 @@ typedef struct NodeConnectionEntry extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort); extern void PurgeConnection(PGconn *connection); extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey); -extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); +extern void PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey); extern void WarnRemoteError(PGconn *connection, PGresult *result); extern void ReraiseRemoteError(PGconn *connection, PGresult *result); extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser); diff --git a/src/test/regress/expected/multi_connection_cache.out b/src/test/regress/expected/multi_connection_cache.out index 79cf59dd2..9ac26c029 100644 --- a/src/test/regress/expected/multi_connection_cache.out +++ b/src/test/regress/expected/multi_connection_cache.out @@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer) \set VERBOSITY terse -- connect to non-existent host SELECT initialize_remote_temp_table('dummy-host-name', 12345); -WARNING: connection failed to dummy-host-name:12345 +WARNING: connection error: dummy-host-name:12345 initialize_remote_temp_table ------------------------------ f