Introduce association between placements and connections.

pull/775/head
Andres Freund 2016-10-07 15:29:47 -07:00
parent 6ffa0976cf
commit 14f73e78b0
4 changed files with 487 additions and 0 deletions

View File

@ -32,6 +32,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/task_tracker.h"
#include "distributed/transaction_management.h"
@ -156,6 +157,7 @@ _PG_init(void)
/* initialize coordinated transaction management */
InitializeTransactionManagement();
InitializeConnectionManagement();
InitPlacementConnectionManagement();
/* initialize transaction callbacks */
RegisterRouterExecutorXactCallbacks();

View File

@ -21,6 +21,7 @@
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
@ -120,6 +121,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
ResetPlacementConnectionManagement();
AtEOXact_Connections(true);
}
@ -146,6 +148,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{
ResetPlacementConnectionManagement();
AtEOXact_Connections(false);
}
@ -163,6 +166,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_PRE_COMMIT:
{
bool using2PC = MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC;
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
@ -178,6 +183,22 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
break;
}
/*
* TODO: It's probably a good idea to force constraints and
* such to 'immediate' here. Deferred triggers might try to
* send stuff to the remote side, which'd not be good. Doing
* so remotely would also catch a class of errors where
* committing fails, which can lead to divergence when not
* using 2PC.
*/
/*
* Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the
* case that iteratively executed commands marked all placements
* as invalid.
*/
CheckForFailedPlacements(true, using2PC);
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
@ -194,6 +215,12 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
CoordinatedRemoteTransactionsCommit();
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
}
/*
* Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time.
*/
CheckForFailedPlacements(false, using2PC);
}
break;

View File

@ -0,0 +1,361 @@
/*-------------------------------------------------------------------------
*
* placement_connection.c
* Per-Placement connection & transaction handling
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "distributed/connection_management.h"
#include "distributed/placement_connection.h"
#include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
HTAB *ConnectionPlacementHash = NULL;
HTAB *ConnectionShardHash = NULL;
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
/*
* GetPlacementConnection establishes a connection for a placement.
*
* See StartPlacementConnection for details.
*/
MultiConnection *
GetPlacementConnection(uint32 flags, ShardPlacement *placement)
{
MultiConnection *connection = StartPlacementConnection(flags, placement);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartPlacementConnection() initiates a connection to a remote node,
* associated with the placement and transaction.
*
* The connection is established as the current user & database.
*
* See StartNodeUserDatabaseConnection for details.
*
* Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
* except that two additional flags have an effect:
* - FOR_DML - signal that connection is going to be used for DML (modifications)
* - FOR_DDL - signal that connection is going to be used for DDL
*
* Only one connection associated with the placement may have FOR_DML or
* FOR_DDL set. This restriction prevents deadlocks and wrong results due to
* in-progress transactions.
*/
MultiConnection *
StartPlacementConnection(uint32 flags, ShardPlacement *placement)
{
ConnectionPlacementHashKey key;
ConnectionPlacementHashEntry *placementEntry = NULL;
MemoryContext oldContext = NULL;
bool found = false;
ConnectionReference *returnConnectionReference = NULL;
ListCell *referenceCell = NULL;
key.placementid = placement->placementId;
/* FIXME: not implemented */
Assert(flags & NEW_CONNECTION);
/*
* Lookup relevant hash entry. We always enter. If only a cached
* connection is desired, and there's none, we'll simply leave the
* connection list empty.
*/
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
if (!found)
{
placementEntry->connectionReferences = NIL;
placementEntry->failed = false;
}
/*
* Check whether any of the connections already associated with the
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
*/
foreach(referenceCell, placementEntry->connectionReferences)
{
ConnectionReference *connectionReference = NULL;
bool useConnection = false;
MultiConnection *connection = NULL;
connectionReference = (ConnectionReference *) lfirst(referenceCell);
connection = connectionReference->connection;
/* use the connection, unless in a state that's not useful for us */
if (connection->claimedExclusively ||
!((flags & CACHED_CONNECTION)) ||
returnConnectionReference != NULL)
{
useConnection = false;
}
else
{
useConnection = true;
}
/*
* If not using the connection, verify that FOR_DML/DDL flags are
* compatible.
*/
if (useConnection)
{
returnConnectionReference = connectionReference;
}
else if (connectionReference->hadDDL)
{
/* XXX: errcode & errmsg */
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when other "
"placement executed DDL")));
}
else if (connectionReference->hadDML)
{
/* XXX: errcode & errmsg */
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when other "
"placement executed DML")));
}
}
/* no connection available, done if a new connection isn't desirable */
if (!(flags & NEW_CONNECTION))
{
return NULL;
}
/*
* Either no caching desired, or no connection present. Start connection
* establishment.
*/
if (returnConnectionReference == NULL)
{
MultiConnection *connection = StartNodeConnection(flags, placement->nodeName,
placement->nodePort);
oldContext = MemoryContextSwitchTo(ConnectionContext);
returnConnectionReference =
(ConnectionReference *) palloc(sizeof(ConnectionReference));
returnConnectionReference->connection = connection;
returnConnectionReference->hadDDL = false;
returnConnectionReference->hadDML = false;
placementEntry->connectionReferences =
lappend(placementEntry->connectionReferences, returnConnectionReference);
MemoryContextSwitchTo(oldContext);
AssociatePlacementWithShard(placementEntry, placement);
}
if (flags & FOR_DDL)
{
returnConnectionReference->hadDDL = true;
}
if (flags & FOR_DML)
{
returnConnectionReference->hadDML = true;
}
if (flags & CRITICAL_CONNECTION)
{
RemoteTransaction *transaction =
&returnConnectionReference->connection->remoteTransaction;
transaction->criticalTransaction = true;
}
return returnConnectionReference->connection;
}
void
InitPlacementConnectionManagement(void)
{
HASHCTL info;
uint32 hashFlags = 0;
/* create (placementid) -> [connection] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionPlacementHashKey);
info.entrysize = sizeof(ConnectionPlacementHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
64, &info, hashFlags);
/* create (shardId) -> [ConnectionShardHashEntry] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionShardHashKey);
info.entrysize = sizeof(ConnectionShardHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags);
}
/*
* Disassociate connections from placements and shards. This will be called at
* the end of XACT_EVENT_COMMIT and XACT_EVENT_ABORT.
*/
void
ResetPlacementConnectionManagement(void)
{
/* Simply delete all entries*/
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
}
/*
* Check which placements have to be marked as invalid, and/or whether
* sufficiently many placements have failed to abort the entire coordinated
* transaction.
*
* This will usually be called twice. Once before the remote commit is done,
* and once after. This is so we can abort before executing remote commits,
* and so we can handle remote transactions that failed during commit.
*
* When preCommit or using2PC is true, failures on transactions marked as
* critical will abort the entire coordinated transaction. Otherwise we can't
* anymore, because some remote transactions might have already committed.
*/
void
CheckForFailedPlacements(bool preCommit, bool using2PC)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
ListCell *placementCell = NULL;
int failures = 0;
int successes = 0;
foreach(placementCell, shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
(ConnectionPlacementHashEntry *) lfirst(placementCell);
ListCell *referenceCell = NULL;
foreach(referenceCell, placementEntry->connectionReferences)
{
ConnectionReference *reference =
(ConnectionReference *) lfirst(referenceCell);
MultiConnection *connection = reference->connection;
/*
* If neither DDL nor DML were executed, there's no need for
* invalidation.
*/
if (!reference->hadDDL && !reference->hadDML)
{
continue;
}
if (connection->remoteTransaction.transactionFailed)
{
placementEntry->failed = true;
/*
* Raise an error if failure was on a required connection,
* unless we're post-commit and not using 2PC. In that
* case escalating failures here could leave inconsistent
* shards in place, which are not marked as invalid.
*
* XXX: should we warn?
*/
if (preCommit || using2PC)
{
/* to raise ERROR if a required connection */
MarkRemoteTransactionFailed(connection, true);
}
}
}
if (placementEntry->failed)
{
failures++;
}
else
{
successes++;
}
}
if (failures > 0 && successes == 0)
{
/*
* FIXME: arguably we should only error out here if we're
* pre-commit or using 2PC. Otherwise we can end up with a state
* where parts of the transaction is committed and others aren't,
* without correspondingly marking things as invalid (which we
* can't, as we would have already committed).
*/
/* FIXME: better message */
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
}
foreach(placementCell, shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
(ConnectionPlacementHashEntry *) lfirst(placementCell);
if (placementEntry->failed)
{
UpdateShardPlacementState(placementEntry->key.placementid, FILE_INACTIVE);
}
}
}
}
/* Record shard->placement relation */
static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement)
{
ConnectionShardHashKey shardKey;
ConnectionShardHashEntry *shardEntry = NULL;
bool found = false;
MemoryContext oldContext = NULL;
shardKey.shardId = placement->shardId;
shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found);
if (!found)
{
shardEntry->placementConnections = NIL;
}
oldContext = MemoryContextSwitchTo(ConnectionContext);
shardEntry->placementConnections =
list_append_unique_ptr(shardEntry->placementConnections, placementEntry);
MemoryContextSwitchTo(oldContext);
}

View File

@ -0,0 +1,97 @@
/*-------------------------------------------------------------------------
* placement_connection.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PLACEMENT_CONNECTION_H
#define PLACEMENT_CONNECTION_H
#include "distributed/connection_management.h"
#include "utils/hsearch.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
/*
* Hash table mapping placements to a list of connections.
*
* This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. a
* real-time executor query may reference the same placement in several
* sub-tasks.
*
* We keep track about a connection having executed DML or DDL, since we can
* only ever allow a single transaction to do either to prevent deadlocks and
* consistency violations (e.g. read-your-own-writes).
*/
/* hash key */
typedef struct ConnectionPlacementHashKey
{
uint32 placementid;
} ConnectionPlacementHashKey;
/* information about a connection reference to a table */
typedef struct ConnectionReference
{
MultiConnection *connection;
bool hadDML;
bool hadDDL;
} ConnectionReference;
/* hash entry */
typedef struct ConnectionPlacementHashEntry
{
ConnectionPlacementHashKey key;
bool failed;
List *connectionReferences;
} ConnectionPlacementHashEntry;
/* hash table */
extern HTAB *ConnectionPlacementHash;
/*
* Hash table mapping shard ids to placements.
*
* This is used to track whether placements of a shard have to be marked
* invalid after a failure, or whether a coordinated transaction has to be
* aborted, to avoid all placements of a shard to be marked invalid.
*/
/* hash key */
typedef struct ConnectionShardHashKey
{
uint64 shardId;
} ConnectionShardHashKey;
/* hash entry */
typedef struct ConnectionShardHashEntry
{
ConnectionShardHashKey *key;
List *placementConnections;
} ConnectionShardHashEntry;
/* hash table itself */
extern HTAB *ConnectionShardHash;
/* Higher level connection handling API. */
extern MultiConnection * GetPlacementConnection(uint32 flags,
struct ShardPlacement *placement);
extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement);
extern void CheckForFailedPlacements(bool preCommit, bool using2PC);
extern void InitPlacementConnectionManagement(void);
extern void ResetPlacementConnectionManagement(void);
#endif /* PLACEMENT_CONNECTION_H */