Use connection_management.c from within connection_cache.c.

This is a temporary step towards removing connection_cache.c.
pull/1005/head
Andres Freund 2016-12-01 02:06:11 -08:00
parent 168b652da6
commit 1ca88dbac7
5 changed files with 42 additions and 97 deletions

View File

@ -956,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
"commands")));
}
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
@ -966,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
/* iterate over placements in rounds, to ensure in-order execution */
while (tasksPending)
{

View File

@ -2,7 +2,7 @@
*
* connection_cache.c
*
* This file contains functions to implement a connection hash.
* Legacy connection caching layer. Will be removed entirely.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
@ -19,8 +19,10 @@
#include <string.h>
#include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/connection_cache.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "mb/pg_wchar.h"
#include "utils/builtins.h"
#include "utils/elog.h"
@ -29,15 +31,8 @@
#include "utils/memutils.h"
#include "utils/palloc.h"
/*
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
* The first call to GetOrEstablishConnection triggers hash creation.
*/
static HTAB *NodeConnectionHash = NULL;
/* local function forward declarations */
static HTAB * CreateNodeConnectionHash(void);
static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
@ -56,56 +51,20 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr
PGconn *
GetOrEstablishConnection(char *nodeName, int32 nodePort)
{
int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN;
PGconn *connection = NULL;
NodeConnectionKey nodeConnectionKey;
NodeConnectionEntry *nodeConnectionEntry = NULL;
bool entryFound = false;
bool needNewConnection = true;
char *userName = CurrentUserName();
MultiConnection *mconnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
/* check input */
if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH)
if (PQstatus(mconnection->conn) == CONNECTION_OK)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
connection = mconnection->conn;
}
/* if first call, initialize the connection hash */
if (NodeConnectionHash == NULL)
else
{
NodeConnectionHash = CreateNodeConnectionHash();
}
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1);
nodeConnectionKey.nodePort = nodePort;
strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_FIND, &entryFound);
if (entryFound)
{
connection = nodeConnectionEntry->connection;
if (PQstatus(connection) == CONNECTION_OK)
{
needNewConnection = false;
}
else
{
PurgeConnection(connection);
}
}
if (needNewConnection)
{
connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser);
if (connection != NULL)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_ENTER, &entryFound);
nodeConnectionEntry->connection = connection;
}
ReportConnectionError(mconnection, WARNING);
CloseConnection(mconnection);
connection = NULL;
}
return connection;
@ -123,6 +82,7 @@ PurgeConnection(PGconn *connection)
NodeConnectionKey nodeConnectionKey;
BuildKeyForConnection(connection, &nodeConnectionKey);
PurgeConnectionByKey(&nodeConnectionKey);
}
@ -170,26 +130,24 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
}
PGconn *
void
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
{
bool entryFound = false;
NodeConnectionEntry *nodeConnectionEntry = NULL;
PGconn *connection = NULL;
int connectionFlags = CACHED_CONNECTION;
MultiConnection *connection;
if (NodeConnectionHash != NULL)
connection =
StartNodeUserDatabaseConnection(
connectionFlags,
nodeConnectionKey->nodeName,
nodeConnectionKey->nodePort,
nodeConnectionKey->nodeUser,
NULL);
if (connection)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey,
HASH_REMOVE, &entryFound);
CloseConnection(connection);
}
if (entryFound)
{
connection = nodeConnectionEntry->connection;
PQfinish(nodeConnectionEntry->connection);
}
return connection;
}
@ -288,30 +246,6 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
}
/*
* CreateNodeConnectionHash returns a newly created hash table suitable for
* storing unlimited connections indexed by node name and port.
*/
static HTAB *
CreateNodeConnectionHash(void)
{
HTAB *nodeConnectionHash = NULL;
HASHCTL info;
int hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(NodeConnectionKey);
info.entrysize = sizeof(NodeConnectionEntry);
info.hash = tag_hash;
info.hcxt = CacheMemoryContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags);
return nodeConnectionHash;
}
/*
* ConnectToNode opens a connection to a remote PostgreSQL server. The function
* configures the connection's fallback application name to 'citus' and sets
@ -320,13 +254,16 @@ CreateNodeConnectionHash(void)
*
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
* and return NULL.
*
* XXX: We unfortunately can't easily layer this over connection_managment.c
* as callers close connections themselves using PQfinish().
*/
PGconn *
ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
{
const char *dbname = get_database_name(MyDatabaseId);
PGconn *connection = NULL;
const char *clientEncoding = GetDatabaseEncodingName();
const char *dbname = get_database_name(MyDatabaseId);
int attemptIndex = 0;
const char *keywordArray[] = {

View File

@ -170,7 +170,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
bool found;
dlist_iter iter;
/* do some minimal input checks */
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
key.port = port;
if (user)
{

View File

@ -56,7 +56,7 @@ typedef struct NodeConnectionEntry
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern void PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);

View File

@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer)
\set VERBOSITY terse
-- connect to non-existent host
SELECT initialize_remote_temp_table('dummy-host-name', 12345);
WARNING: connection failed to dummy-host-name:12345
WARNING: connection error: dummy-host-name:12345
initialize_remote_temp_table
------------------------------
f