pull/1005/merge
Marco Slot 2016-12-01 15:42:27 +00:00 committed by GitHub
commit 9a73820f3d
6 changed files with 165 additions and 242 deletions

View File

@ -111,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
Tuplestorestate *tupleStore);
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
bool isModificationQuery);
static void PurgeConnectionForPlacement(ShardPlacement *placement);
static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
static void RemoveXactConnection(PGconn *connection);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
@ -794,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
PurgeConnectionForPlacement(taskPlacement);
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
@ -852,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
}
else
{
PurgeConnectionForPlacement(taskPlacement);
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
@ -1234,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
* for the transaction in addition to purging the connection cache's entry.
*/
static void
PurgeConnectionForPlacement(ShardPlacement *placement)
PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement)
{
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
PurgeConnectionByKey(&nodeKey);
PurgeConnection(connection);
/*
* The following is logically identical to RemoveXactConnection, but since
@ -1256,6 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
{
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
Assert(IsTransactionBlock());

View File

@ -69,7 +69,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(true);
AfterXactResetConnections(true);
}
Assert(!subXactAbortAttempted);
@ -83,7 +83,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
AfterXactConnectionHandling(false);
AfterXactResetConnections(false);
}
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;

View File

@ -51,19 +51,17 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr
PGconn *
GetOrEstablishConnection(char *nodeName, int32 nodePort)
{
int connectionFlags = NEW_CONNECTION | CACHED_CONNECTION | SESSION_LIFESPAN;
PGconn *connection = NULL;
MultiConnection *mconnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
MultiConnection *multiConnection = GetNodeConnection(nodeName, nodePort, 0);
if (PQstatus(mconnection->conn) == CONNECTION_OK)
if (PQstatus(multiConnection->conn) == CONNECTION_OK)
{
connection = mconnection->conn;
connection = multiConnection->conn;
}
else
{
ReportConnectionError(mconnection, WARNING);
CloseConnection(mconnection);
ReportConnectionError(multiConnection, WARNING);
CloseConnectionByPGconn(multiConnection->conn);
connection = NULL;
}
@ -79,11 +77,7 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
void
PurgeConnection(PGconn *connection)
{
NodeConnectionKey nodeConnectionKey;
BuildKeyForConnection(connection, &nodeConnectionKey);
PurgeConnectionByKey(&nodeConnectionKey);
CloseConnectionByPGconn(connection);
}
@ -130,27 +124,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
}
void
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
{
int connectionFlags = CACHED_CONNECTION;
MultiConnection *connection;
connection =
StartNodeUserDatabaseConnection(
connectionFlags,
nodeConnectionKey->nodeName,
nodeConnectionKey->nodePort,
nodeConnectionKey->nodeUser,
NULL);
if (connection)
{
CloseConnection(connection);
}
}
/*
* WarnRemoteError retrieves error fields from a remote result and produces an
* error report at the WARNING level after amending the error with a CONTEXT

View File

@ -34,7 +34,8 @@ MemoryContext ConnectionContext = NULL;
static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static void AfterXactResetHostConnections(ConnectionHashEntry *entry, bool isCommit);
/*
@ -71,32 +72,6 @@ InitializeConnectionManagement(void)
}
/*
* Perform connection management activity after the end of a transaction. Both
* COMMIT and ABORT paths are handled here.
*
* This is called by Citus' global transaction callback.
*/
void
AfterXactConnectionHandling(bool isCommit)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
AfterXactHostConnectionHandling(entry, isCommit);
/*
* NB: We leave the hash entry in place, even if there's no individual
* connections in it anymore. There seems no benefit in deleting it,
* and it'll save a bit of work in the next transaction.
*/
}
}
/*
* GetNodeConnection() establishes a connection to remote node, using default
* user and database.
@ -104,9 +79,9 @@ AfterXactConnectionHandling(bool isCommit)
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeConnection(uint32 flags, const char *hostname, int32 port)
GetNodeConnection(const char *hostname, int32 port, uint32 flags)
{
return GetNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
return GetNodeUserDatabaseConnection(hostname, port, NULL, NULL, flags);
}
@ -117,9 +92,9 @@ GetNodeConnection(uint32 flags, const char *hostname, int32 port)
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
StartNodeConnection(uint32 flags, const char *hostname, int32 port)
StartNodeConnection(const char *hostname, int32 port, uint32 flags)
{
return StartNodeUserDatabaseConnection(flags, hostname, port, NULL, NULL);
return StartNodeUserDatabaseConnection(hostname, port, NULL, NULL, flags);
}
@ -129,12 +104,12 @@ StartNodeConnection(uint32 flags, const char *hostname, int32 port)
* See StartNodeUserDatabaseConnection for details.
*/
MultiConnection *
GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
GetNodeUserDatabaseConnection(const char *hostname, int32 port, const
char *user, const char *database, uint32 flags)
{
MultiConnection *connection;
connection = StartNodeUserDatabaseConnection(flags, hostname, port, user, database);
connection = StartNodeUserDatabaseConnection(hostname, port, user, database, flags);
FinishConnectionEstablishment(connection);
@ -145,30 +120,18 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, co
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*
* If user or database are NULL, the current session's defaults are used. The
* following flags influence connection establishment behaviour:
* - NEW_CONNECTION - it is permitted to establish a new connection
* - CACHED_CONNECTION - it is permitted to re-use an established connection
* - SESSION_LIFESPAN - the connection should persist after transaction end
* - FOR_DML - only meaningful for placement associated connections
* - FOR_DDL - only meaningful for placement associated connections
* - CRITICAL_CONNECTION - transaction failures on this connection fail the entire
* coordinated transaction
*
* The returned connection has only been initiated, not fully
* established. That's useful to allow parallel connection establishment. If
* that's not desired use the Get* variant.
*/
MultiConnection *
StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const
char *user, const char *database)
StartNodeUserDatabaseConnection(const char *hostname, int32 port, const
char *user, const char *database, uint32 flags)
{
ConnectionHashKey key;
ConnectionHashEntry *entry = NULL;
MultiConnection *connection;
MemoryContext oldContext;
bool found;
dlist_iter iter;
MultiConnection *connection = NULL;
bool found = false;
/* do some minimal input checks */
strlcpy(key.hostname, hostname, MAX_NODE_LENGTH);
@ -197,17 +160,9 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
strlcpy(key.database, get_database_name(MyDatabaseId), NAMEDATALEN);
}
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
{
CurrentCoordinatedTransactionState = COORD_TRANS_IDLE;
}
/*
* Lookup relevant hash entry. We always enter. If only a cached
* connection is desired, and there's none, we'll simply leave the
* connection list empty.
* Lookup relevant hash entry or enter a new one.
*/
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
{
@ -216,68 +171,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_init(entry->connections);
}
/* if desired, check whether there's a usable connection */
if (flags & CACHED_CONNECTION)
connection = FindAvailableConnection(entry->connections, flags);
if (connection == NULL)
{
/* check connection cache for a connection that's not already in use */
dlist_foreach(iter, entry->connections)
{
connection = dlist_container(MultiConnection, node, iter.cur);
/* don't return claimed connections */
if (connection->claimedExclusively)
{
continue;
}
/*
* If we're not allowed to open new connections right now, and the
* current connection hasn't yet been used in this transaction, we
* can't use it.
*/
if (!connection->activeInTransaction &&
XactModificationLevel > XACT_MODIFICATION_DATA)
{
continue;
}
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
connection->activeInTransaction = true;
/*
* One could argue for erroring out when the connection is in a
* failed state. But that'd be a bad idea for two reasons:
*
* 1) Generally starting a connection might fail, after calling
* this function, so calling code needs to handle that anyway.
* 2) This might be used in code that transparently handles
* connection failure.
*/
return connection;
}
}
/* no connection available, done if a new connection isn't desirable */
if (!(flags & NEW_CONNECTION))
{
return NULL;
}
/*
* Check whether we're right now allowed to open new connections.
*
* FIXME: This should be removed soon, once all connections go through
* this API.
*/
if (XactModificationLevel > XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
MemoryContext oldContext = NULL;
/*
* Either no caching desired, or no pre-established, non-claimed,
@ -289,39 +186,83 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_push_tail(entry->connections, &connection->node);
MemoryContextSwitchTo(oldContext);
if (flags & SESSION_LIFESPAN)
{
connection->sessionLifespan = true;
}
if (flags & IN_TRANSACTION)
{
connection->activeInTransaction = true;
}
if (flags & CLAIM_EXCLUSIVELY)
{
ClaimConnectionExclusively(connection);
}
return connection;
}
/*
* FindAvailableConnection finds an available connection from the given list
* of connections. A connection is available if it has not been claimed
* exclusively and, if the IN_TRANSACTION flag is not set, is not in a
* transaction.
*/
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
{
dlist_iter iter;
/* check connection cache for a connection that's not already in use */
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, node, iter.cur);
/* don't return claimed connections */
if (connection->claimedExclusively)
{
continue;
}
/*
* Don't return connections that are active in the coordinated transaction if
* not explicitly requested.
*/
if (!(flags & IN_TRANSACTION) && connection->activeInTransaction)
{
continue;
}
return connection;
}
return NULL;
}
/*
* Close a previously established connection.
*/
void
CloseConnection(MultiConnection *connection)
CloseConnectionByPGconn(PGconn *pqConn)
{
ConnectionHashKey key;
bool found;
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
/* close connection */
PQfinish(connection->conn);
connection->conn = NULL;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
dlist_head *connections = entry->connections;
dlist_iter iter;
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
strlcpy(key.user, connection->user, NAMEDATALEN);
strlcpy(key.database, connection->database, NAMEDATALEN);
/* check connection cache for a connection that's not already in use */
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, node, iter.cur);
hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (found)
if (connection->conn == pqConn)
{
/* unlink from list */
dlist_delete(&connection->node);
@ -329,10 +270,7 @@ CloseConnection(MultiConnection *connection)
/* we leave the per-host entry alive */
pfree(connection);
}
else
{
/* XXX: we could error out instead */
ereport(WARNING, (errmsg("closing untracked connection")));
}
}
}
@ -560,13 +498,40 @@ StartConnectionEstablishment(ConnectionHashKey *key)
/*
* Close all remote connections if necessary anymore (i.e. not session
* lifetime), or if in a failed state.
* Perform connection management activity after the end of a transaction. Both
* COMMIT and ABORT paths are handled here.
*
* This is called by Citus' global transaction callback.
*/
void
AfterXactResetConnections(bool isCommit)
{
HASH_SEQ_STATUS status;
ConnectionHashEntry *entry;
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
AfterXactResetHostConnections(entry, isCommit);
/*
* NB: We leave the hash entry in place, even if there's no individual
* connections in it anymore. There seems no benefit in deleting it,
* and it'll save a bit of work in the next transaction.
*/
}
}
/*
* Close all remote connections if necessary anymore (i.e. not cached),
* or if in a failed state.
*/
static void
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
AfterXactResetHostConnections(ConnectionHashEntry *entry, bool isCommit)
{
dlist_mutable_iter iter;
bool cachedConnection = false;
dlist_foreach_modify(iter, entry->connections)
{
@ -585,11 +550,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
}
/*
* Only let a connection life longer than a single transaction if
* instructed to do so by the caller. We also skip doing so if
* it's in a state that wouldn't allow us to run queries again.
* Close connection if there was an error or we already cached
* a connection for this node.
*/
if (!connection->sessionLifespan ||
if (cachedConnection ||
PQstatus(connection->conn) != CONNECTION_OK ||
PQtransactionStatus(connection->conn) != PQTRANS_IDLE)
{
@ -603,10 +567,12 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
}
else
{
/* reset per-transaction state */
/* reset connection state */
UnclaimConnection(connection);
connection->activeInTransaction = false;
UnclaimConnection(connection);
/* close remaining connections */
cachedConnection = true;
}
}
}

View File

@ -56,7 +56,6 @@ typedef struct NodeConnectionEntry
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
extern void PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);

View File

@ -30,23 +30,11 @@ struct MemoryContextData;
*/
enum MultiConnectionMode
{
/* allow establishment of new connections */
NEW_CONNECTION = 1 << 0,
/* connection should be part of the coordinated transaction */
IN_TRANSACTION = 1 << 1,
/* allow use of pre-established connections */
CACHED_CONNECTION = 1 << 1,
/* mark returned connection having session lifespan */
SESSION_LIFESPAN = 1 << 2,
/* the connection will be used for DML */
FOR_DML = 1 << 3,
/* the connection will be used for DDL */
FOR_DDL = 1 << 4,
/* failures on this connection will fail entire coordinated transaction */
CRITICAL_CONNECTION = 1 << 5
/* connection should be claimed exclusively for the caller */
CLAIM_EXCLUSIVELY = 1 << 2,
};
@ -64,13 +52,10 @@ typedef struct MultiConnection
/* underlying libpq connection */
struct pg_conn *conn;
/* is the connection intended to be kept after transaction end */
bool sessionLifespan;
/* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively;
/* has the connection been used in the current coordinated transaction? */
/* is the connection currently part of the coordinated transaction */
bool activeInTransaction;
/* time connection establishment was started, for timeout */
@ -112,24 +97,25 @@ extern HTAB *ConnectionHash;
extern struct MemoryContextData *ConnectionContext;
extern void AfterXactConnectionHandling(bool isCommit);
extern void AfterXactResetConnections(bool isCommit);
extern void InitializeConnectionManagement(void);
/* Low-level connection establishment APIs */
extern MultiConnection * GetNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
int32 port);
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user, const
char *database);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname,
extern MultiConnection * GetNodeConnection(const char *hostname, int32 port,
uint32 flags);
extern MultiConnection * StartNodeConnection(const char *hostname, int32 port,
uint32 flags);
extern MultiConnection * GetNodeUserDatabaseConnection(const char *hostname, int32 port,
const char *user,
const char *database,
uint32 flags);
extern MultiConnection * StartNodeUserDatabaseConnection(const char *hostname,
int32 port,
const char *user,
const char *database);
extern void CloseConnection(MultiConnection *connection);
const char *database,
uint32 flags);
extern void CloseConnectionByPGconn(struct pg_conn *pqConn);
/* dealing with a connection */
extern void FinishConnectionEstablishment(MultiConnection *connection);