Centralized shard/placement connection and state management.

Currently there are several places in citus that map placements to
connections and that manage placement health. Centralize this
knowledge.  Because of the centralized knowledge about which
connection has previously been used for which shard/placement, this
also provides the basis for relaxing restrictions around combining
various forms of DDL/DML.

Connections for a placement can now be acquired using
GetPlacementConnection(). If the connection is used for DML or DDL the
FOR_DDL/DML flags should be used respectively.  If an individual
remote transaction fails (but the transaction on the master succeeds)
and FOR_DDL/DML have been specified, the placement is marked as
invalid, unless that'd mark all placements for a shard as invalid.
pull/1079/head
Andres Freund 2017-01-02 03:58:22 -08:00
parent aca3770364
commit bfa742d794
6 changed files with 734 additions and 2 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h"
#include "mb/pg_wchar.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@ -234,6 +235,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection);
if (flags & SESSION_LIFESPAN)
{
@ -333,6 +335,7 @@ CloseNodeConnections(char *nodeName, int nodePort)
/* same for transaction state */
CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
/* we leave the per-host entry alive */
pfree(connection);
@ -366,8 +369,9 @@ CloseConnection(MultiConnection *connection)
/* unlink from list of open connections */
dlist_delete(&connection->connectionNode);
/* same for transaction state */
/* same for transaction state and shard/placement machinery */
CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
/* we leave the per-host entry alive */
pfree(connection);
@ -692,6 +696,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{
/* reset per-transaction state */
ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
UnclaimConnection(connection);
}

View File

@ -0,0 +1,660 @@
/*-------------------------------------------------------------------------
*
* placement_connection.c
* Per placement connection handling.
*
* Copyright (c) 2016-2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
/*
* A connection reference is used to register that a connection has been used
* to read or modify a shard placement as a particular user.
*/
typedef struct ConnectionReference
{
/* identity information about the connection */
uint64 shardId;
uint64 placementId;
/*
* The user used to read/modify the placement. We cannot reuse connections
* that were performed using a different role, since it would not have the
* right permissions.
*/
const char *userName;
/* the connection */
MultiConnection *connection;
/*
* Information about what the connection is used for. There can only be
* one connection executing DDL/DML for a placement to avoid deadlock
* issues/read-your-own-writes violations. The difference between DDL/DML
* currently is only used to emit more precise error messages.
*/
bool hadDML;
bool hadDDL;
/* membership in ConnectionPlacementHashKey->connectionReferences */
dlist_node placementNode;
/* membership in MultiConnection->referencedPlacements */
dlist_node connectionNode;
} ConnectionReference;
/*
* Hash table mapping placements to a list of connections.
*
* This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. a
* real-time executor query may reference the same placement in several
* sub-tasks.
*
* We keep track about a connection having executed DML or DDL, since we can
* only ever allow a single transaction to do either to prevent deadlocks and
* consistency violations (e.g. read-your-own-writes).
*/
/* hash key */
typedef struct ConnectionPlacementHashKey
{
uint64 placementId;
} ConnectionPlacementHashKey;
/* hash entry */
typedef struct ConnectionPlacementHashEntry
{
ConnectionPlacementHashKey key;
/* did any remote transactions fail? */
bool failed;
/* list of connections to remote nodes */
dlist_head connectionReferences;
/* if non-NULL, connection executing DML/DDL*/
ConnectionReference *modifyingConnection;
/* membership in ConnectionShardHashEntry->placementConnections */
dlist_node shardNode;
} ConnectionPlacementHashEntry;
/* hash table */
static HTAB *ConnectionPlacementHash;
/*
* Hash table mapping shard ids to placements.
*
* This is used to track whether placements of a shard have to be marked
* invalid after a failure, or whether a coordinated transaction has to be
* aborted, to avoid all placements of a shard to be marked invalid.
*/
/* hash key */
typedef struct ConnectionShardHashKey
{
uint64 shardId;
} ConnectionShardHashKey;
/* hash entry */
typedef struct ConnectionShardHashEntry
{
ConnectionShardHashKey key;
dlist_head placementConnections;
} ConnectionShardHashEntry;
/* hash table */
static HTAB *ConnectionShardHash;
static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *
placementEntry);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit,
bool using2PC);
/*
* GetPlacementConnection establishes a connection for a placement.
*
* See StartPlacementConnection for details.
*/
MultiConnection *
GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartPlacementConnection initiates a connection to a remote node,
* associated with the placement and transaction.
*
* The connection is established for the current database. If userName is NULL
* the current user is used, otherwise the provided one.
*
* See StartNodeUserDatabaseConnection for details.
*
* Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
* except that two additional flags have an effect:
* - FOR_DML - signal that connection is going to be used for DML (modifications)
* - FOR_DDL - signal that connection is going to be used for DDL
*
* Only one connection associated with the placement may have FOR_DML or
* FOR_DDL set. This restriction prevents deadlocks and wrong results due to
* in-progress transactions.
*/
MultiConnection *
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
ConnectionPlacementHashKey key;
ConnectionPlacementHashEntry *placementEntry = NULL;
ConnectionReference *returnConnectionReference = NULL;
char *freeUserName = NULL;
bool found = false;
if (userName == NULL)
{
userName = freeUserName = CurrentUserName();
}
key.placementId = placement->placementId;
/* lookup relevant hash entry */
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
if (!found)
{
dlist_init(&placementEntry->connectionReferences);
placementEntry->failed = false;
placementEntry->modifyingConnection = NULL;
}
/*
* Check whether any of the connections already associated with the
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
*/
returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry);
/*
* Either no caching desired, or no connection present. Start connection
* establishment. Allocations are performed in transaction context, so we
* don't have to care about freeing in case of an early disconnect.
*/
if (returnConnectionReference == NULL)
{
MultiConnection *connection = StartNodeConnection(flags, placement->nodeName,
placement->nodePort);
returnConnectionReference = (ConnectionReference *)
MemoryContextAlloc(TopTransactionContext,
sizeof(ConnectionReference));
returnConnectionReference->connection = connection;
returnConnectionReference->hadDDL = false;
returnConnectionReference->hadDML = false;
returnConnectionReference->shardId = placement->shardId;
returnConnectionReference->placementId = placement->placementId;
returnConnectionReference->userName =
MemoryContextStrdup(TopTransactionContext, userName);
dlist_push_tail(&placementEntry->connectionReferences,
&returnConnectionReference->placementNode);
/* record association with shard, for invalidation */
AssociatePlacementWithShard(placementEntry, placement);
/* record association with connection, to handle connection closure */
dlist_push_tail(&connection->referencedPlacements,
&returnConnectionReference->connectionNode);
}
if (flags & FOR_DDL)
{
placementEntry->modifyingConnection = returnConnectionReference;
returnConnectionReference->hadDDL = true;
}
if (flags & FOR_DML)
{
placementEntry->modifyingConnection = returnConnectionReference;
returnConnectionReference->hadDML = true;
}
if (freeUserName)
{
pfree(freeUserName);
}
return returnConnectionReference->connection;
}
/*
* CheckExistingConnections check whether any of the existing connections is
* usable. If so, return it, otherwise return NULL.
*
* A connection is usable if it is not in use, the user matches, DDL/DML usage
* match and cached connection are allowed. If no existing connection
* matches, but a new connection would conflict (e.g. a connection already
* exists but isn't usable, and the desired connection needs to execute
* DML/DML) an error is thrown.
*/
static ConnectionReference *
CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *placementEntry)
{
dlist_iter it;
/*
* If there's a connection that has executed DML/DDL, always return it if
* possible. That's because we only can execute DML/DDL over that
* connection. Checking this first also allows us to detect conflicts due
* to opening a second connection that wants to execute DML/DDL.
*/
if (placementEntry->modifyingConnection)
{
ConnectionReference *connectionReference = placementEntry->modifyingConnection;
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
if (connectionReference->hadDDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DDL has been "
"executed on existing placement connection")));
}
else if (connectionReference->hadDML)
{
/* we'd not see the other connection's contents */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DML has been "
"executed on existing placement connection")));
}
else
{
/* modifying xact should have executed DML/DDL */
Assert(false);
}
}
/*
* Search existing connections for a reusable connection.
*/
dlist_foreach(it, &placementEntry->connectionReferences)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, placementNode, it.cur);
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
/*
* If not using the connection, verify that FOR_DML/DDL flags are
* compatible.
*/
if (flags & FOR_DDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DDL when "
"other connections exist")));
}
else if (flags & FOR_DML)
{
/*
* We could allow this case (as presumably the SELECT is done, but
* it'd be a bit prone to programming errors, so we disallow for
* now.
*/
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DML when "
"other connections exist")));
}
}
/* establish a new connection */
return NULL;
}
/*
* CanUseExistingConnection is a helper function for CheckExistingConnections()
* that checks whether an existing connection can be reused.
*/
static bool
CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference)
{
MultiConnection *connection = connectionReference->connection;
if (!connection)
{
/* if already closed connection obviously not usable */
return false;
}
else if (connection->claimedExclusively)
{
/* already used */
return false;
}
else if (flags & FORCE_NEW_CONNECTION)
{
/* no connection reuse desired */
return false;
}
else if (strcmp(connectionReference->userName, userName) != 0)
{
/* connection for different user, check for conflict */
return false;
}
else
{
return true;
}
}
/*
* AssociatePlacementWithShard records shard->placement relation in
* ConnectionShardHash.
*
* That association is later used, in CheckForFailedPlacements, to invalidate
* shard placements if necessary.
*/
static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement)
{
ConnectionShardHashKey shardKey;
ConnectionShardHashEntry *shardEntry = NULL;
bool found = false;
dlist_iter placementIter;
shardKey.shardId = placement->shardId;
shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found);
if (!found)
{
dlist_init(&shardEntry->placementConnections);
}
/*
* Check if placement is already associated with shard (happens if there's
* multiple connections for a placement). There'll usually only be few
* placement per shard, so the price of iterating isn't large.
*/
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->key.placementId == placement->placementId)
{
return;
}
}
/* otherwise add */
dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode);
}
/*
* CloseShardPlacementAssociation handles a connection being closed before
* transaction end.
*
* This should only be called by connection_management.c.
*/
void
CloseShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_iter placementIter;
/* set connection to NULL for all references to the connection */
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *reference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
reference->connection = NULL;
/*
* Note that we don't reset ConnectionPlacementHashEntry's
* modifyingConnection here, that'd more complicated than it seems
* worth. That means we'll error out spuriously if a DML/DDL
* executing connection is closed earlier in a transaction.
*/
}
}
/*
* ResetShardPlacementAssociation resets the association of connections to
* shard placements at the end of a transaction.
*
* This should only be called by connection_management.c.
*/
void
ResetShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_init(&connection->referencedPlacements);
}
/*
* ResetPlacementConnectionManagement() disassociates connections from
* placements and shards. This will be called at the end of XACT_EVENT_COMMIT
* and XACT_EVENT_ABORT.
*/
void
ResetPlacementConnectionManagement(void)
{
/* Simply delete all entries */
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
/*
* NB: memory for ConnectionReference structs and subordinate data is
* deleted by virtue of being allocated in TopTransactionContext.
*/
}
/*
* CheckForFailedPlacements checks which placements have to be marked as
* invalid, and/or whether sufficiently many placements have failed to abort
* the entire coordinated transaction.
*
* This will usually be called twice. Once before the remote commit is done,
* and once after. This is so we can abort before executing remote commits,
* and so we can handle remote transactions that failed during commit.
*
* When preCommit or using2PC is true, failures on transactions marked as
* critical will abort the entire coordinated transaction. If not we can't
* roll back, because some remote transactions might have already committed.
*/
void
CheckForFailedPlacements(bool preCommit, bool using2PC)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
int successes = 0;
int attempts = 0;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
attempts++;
if (CheckShardPlacements(shardEntry, preCommit, using2PC))
{
successes++;
}
}
/*
* If no shards could be modified at all, error out. Doesn't matter if
* we're post-commit - there's nothing to invalidate.
*/
if (attempts > 0 && successes == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active node")));
}
}
/*
* CheckShardPlacements is a helper function for CheckForFailedPlacements that
* performs the per-shard work.
*/
static bool
CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
bool preCommit, bool using2PC)
{
int failures = 0;
int successes = 0;
dlist_iter placementIter;
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
dlist_iter referenceIter;
dlist_foreach(referenceIter, &placementEntry->connectionReferences)
{
ConnectionReference *reference =
dlist_container(ConnectionReference, placementNode, referenceIter.cur);
MultiConnection *connection = reference->connection;
/*
* If neither DDL nor DML were executed, there's no need for
* invalidation.
*/
if (!reference->hadDDL && !reference->hadDML)
{
continue;
}
/*
* Failed if connection was closed, or remote transaction was
* unsuccessful.
*/
if (!connection || connection->remoteTransaction.transactionFailed)
{
placementEntry->failed = true;
}
}
if (placementEntry->failed)
{
failures++;
}
else
{
successes++;
}
}
if (failures > 0 && successes == 0)
{
int elevel = 0;
/*
* Only error out if we're pre-commit or using 2PC. Can't error
* otherwise as we can end up with a state where some shard
* modifications have already committed successfully. If no
* modifications at all succeed, CheckForFailedPlacements() will error
* out. This sucks.
*/
if (preCommit || using2PC)
{
elevel = ERROR;
}
else
{
elevel = WARNING;
}
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node",
shardEntry->key.shardId)));
return false;
}
/* mark all failed placements invalid */
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->failed)
{
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE);
}
}
return true;
}
/*
* InitPlacementConnectionManagement performs initialization of the
* infrastructure in this file at server start.
*/
void
InitPlacementConnectionManagement(void)
{
HASHCTL info;
uint32 hashFlags = 0;
/* create (placementId) -> [ConnectionReference] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionPlacementHashKey);
info.entrysize = sizeof(ConnectionPlacementHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
64, &info, hashFlags);
/* create (shardId) -> [ConnectionShardHashEntry] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionShardHashKey);
info.entrysize = sizeof(ConnectionShardHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags);
}

View File

@ -33,6 +33,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/task_tracker.h"
#include "distributed/transaction_management.h"
@ -160,6 +161,7 @@ _PG_init(void)
/* initialize coordinated transaction management */
InitializeTransactionManagement();
InitializeConnectionManagement();
InitPlacementConnectionManagement();
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)

View File

@ -24,6 +24,7 @@
#include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
@ -160,6 +161,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
ResetPlacementConnectionManagement();
AfterXactConnectionHandling(true);
}
@ -196,6 +198,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
ResetPlacementConnectionManagement();
AfterXactConnectionHandling(false);
}
@ -233,6 +236,21 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
break;
}
/*
* TODO: It'd probably be a good idea to force constraints and
* such to 'immediate' here. Deferred triggers might try to send
* stuff to the remote side, which'd not be good. Doing so
* remotely would also catch a class of errors where committing
* fails, which can lead to divergence when not using 2PC.
*/
/*
* Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the
* case that iteratively executed commands marked all placements
* as invalid.
*/
CheckForFailedPlacements(true, CurrentTransactionUse2PC);
if (CurrentTransactionUse2PC)
{
@ -257,6 +275,13 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* (e.g. if the remote commit failed).
*/
RouterExecutorPreCommitCheck();
/*
*
* Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time.
*/
CheckForFailedPlacements(false, CurrentTransactionUse2PC);
}
break;

View File

@ -33,7 +33,11 @@ enum MultiConnectionMode
FORCE_NEW_CONNECTION = 1 << 0,
/* mark returned connection as having session lifespan */
SESSION_LIFESPAN = 1 << 1
SESSION_LIFESPAN = 1 << 1,
FOR_DDL = 1 << 2,
FOR_DML = 1 << 3
};
@ -68,6 +72,9 @@ typedef struct MultiConnection
/* membership in list of in-progress transactions */
dlist_node transactionNode;
/* list of all placements referenced by this connection */
dlist_head referencedPlacements;
} MultiConnection;

View File

@ -0,0 +1,33 @@
/*-------------------------------------------------------------------------
* placement_connection.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PLACEMENT_CONNECTION_H
#define PLACEMENT_CONNECTION_H
#include "distributed/connection_management.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
extern MultiConnection * GetPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern void ResetPlacementConnectionManagement(void);
extern void CheckForFailedPlacements(bool preCommit, bool using2PC);
extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection);
extern void InitPlacementConnectionManagement(void);
#endif /* PLACEMENT_CONNECTION_H */