From 14f73e78b09f25fe42c88b41d99b62803fc3191a Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 7 Oct 2016 15:29:47 -0700 Subject: [PATCH] Introduce association between placements and connections. --- src/backend/distributed/shared_library_init.c | 2 + .../transaction/transaction_management.c | 27 ++ .../distributed/utils/placement_connection.c | 361 ++++++++++++++++++ .../distributed/placement_connection.h | 97 +++++ 4 files changed, 487 insertions(+) create mode 100644 src/backend/distributed/utils/placement_connection.c create mode 100644 src/include/distributed/placement_connection.h diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 6be2ba898..48a634c88 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -32,6 +32,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" +#include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/task_tracker.h" #include "distributed/transaction_management.h" @@ -156,6 +157,7 @@ _PG_init(void) /* initialize coordinated transaction management */ InitializeTransactionManagement(); InitializeConnectionManagement(); + InitPlacementConnectionManagement(); /* initialize transaction callbacks */ RegisterRouterExecutorXactCallbacks(); diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 269aebd57..36d030fa3 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/transaction_management.h" +#include "distributed/placement_connection.h" #include "utils/hsearch.h" @@ -120,6 +121,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AtEOXact_Connections(true); } @@ -146,6 +148,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* close connections etc. */ if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) { + ResetPlacementConnectionManagement(); AtEOXact_Connections(false); } @@ -163,6 +166,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_PRE_COMMIT: { + bool using2PC = MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC; + if (subXactAbortAttempted) { subXactAbortAttempted = false; @@ -178,6 +183,22 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) break; } + /* + * TODO: It's probably a good idea to force constraints and + * such to 'immediate' here. Deferred triggers might try to + * send stuff to the remote side, which'd not be good. Doing + * so remotely would also catch a class of errors where + * committing fails, which can lead to divergence when not + * using 2PC. + */ + + /* + * Check whether the coordinated transaction is in a state we want + * to persist, or whether we want to error out. This handles the + * case that iteratively executed commands marked all placements + * as invalid. + */ + CheckForFailedPlacements(true, using2PC); if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { @@ -194,6 +215,12 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CoordinatedRemoteTransactionsCommit(); CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED; } + + /* + * Check again whether shards/placement successfully + * committed. This handles failure at COMMIT/PREPARE time. + */ + CheckForFailedPlacements(false, using2PC); } break; diff --git a/src/backend/distributed/utils/placement_connection.c b/src/backend/distributed/utils/placement_connection.c new file mode 100644 index 000000000..3ef16a92b --- /dev/null +++ b/src/backend/distributed/utils/placement_connection.c @@ -0,0 +1,361 @@ +/*------------------------------------------------------------------------- + * + * placement_connection.c + * Per-Placement connection & transaction handling + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "libpq-fe.h" + +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/connection_management.h" +#include "distributed/placement_connection.h" +#include "distributed/metadata_cache.h" +#include "distributed/hash_helpers.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" + + +HTAB *ConnectionPlacementHash = NULL; +HTAB *ConnectionShardHash = NULL; + + +static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement); + + +/* + * GetPlacementConnection establishes a connection for a placement. + * + * See StartPlacementConnection for details. + */ +MultiConnection * +GetPlacementConnection(uint32 flags, ShardPlacement *placement) +{ + MultiConnection *connection = StartPlacementConnection(flags, placement); + + FinishConnectionEstablishment(connection); + return connection; +} + + +/* + * StartPlacementConnection() initiates a connection to a remote node, + * associated with the placement and transaction. + * + * The connection is established as the current user & database. + * + * See StartNodeUserDatabaseConnection for details. + * + * Flags have the corresponding meaning from StartNodeUserDatabaseConnection, + * except that two additional flags have an effect: + * - FOR_DML - signal that connection is going to be used for DML (modifications) + * - FOR_DDL - signal that connection is going to be used for DDL + * + * Only one connection associated with the placement may have FOR_DML or + * FOR_DDL set. This restriction prevents deadlocks and wrong results due to + * in-progress transactions. + */ +MultiConnection * +StartPlacementConnection(uint32 flags, ShardPlacement *placement) +{ + ConnectionPlacementHashKey key; + ConnectionPlacementHashEntry *placementEntry = NULL; + MemoryContext oldContext = NULL; + bool found = false; + ConnectionReference *returnConnectionReference = NULL; + ListCell *referenceCell = NULL; + + key.placementid = placement->placementId; + + /* FIXME: not implemented */ + Assert(flags & NEW_CONNECTION); + + /* + * Lookup relevant hash entry. We always enter. If only a cached + * connection is desired, and there's none, we'll simply leave the + * connection list empty. + */ + + placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found); + if (!found) + { + placementEntry->connectionReferences = NIL; + placementEntry->failed = false; + } + + + /* + * Check whether any of the connections already associated with the + * placement can be reused, or violates FOR_DML/FOR_DDL constraints. + */ + foreach(referenceCell, placementEntry->connectionReferences) + { + ConnectionReference *connectionReference = NULL; + bool useConnection = false; + MultiConnection *connection = NULL; + + connectionReference = (ConnectionReference *) lfirst(referenceCell); + connection = connectionReference->connection; + + /* use the connection, unless in a state that's not useful for us */ + if (connection->claimedExclusively || + !((flags & CACHED_CONNECTION)) || + returnConnectionReference != NULL) + { + useConnection = false; + } + else + { + useConnection = true; + } + + /* + * If not using the connection, verify that FOR_DML/DDL flags are + * compatible. + */ + if (useConnection) + { + returnConnectionReference = connectionReference; + } + else if (connectionReference->hadDDL) + { + /* XXX: errcode & errmsg */ + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when other " + "placement executed DDL"))); + } + else if (connectionReference->hadDML) + { + /* XXX: errcode & errmsg */ + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot establish new placement connection when other " + "placement executed DML"))); + } + } + + /* no connection available, done if a new connection isn't desirable */ + if (!(flags & NEW_CONNECTION)) + { + return NULL; + } + + + /* + * Either no caching desired, or no connection present. Start connection + * establishment. + */ + if (returnConnectionReference == NULL) + { + MultiConnection *connection = StartNodeConnection(flags, placement->nodeName, + placement->nodePort); + + oldContext = MemoryContextSwitchTo(ConnectionContext); + returnConnectionReference = + (ConnectionReference *) palloc(sizeof(ConnectionReference)); + returnConnectionReference->connection = connection; + returnConnectionReference->hadDDL = false; + returnConnectionReference->hadDML = false; + placementEntry->connectionReferences = + lappend(placementEntry->connectionReferences, returnConnectionReference); + MemoryContextSwitchTo(oldContext); + + AssociatePlacementWithShard(placementEntry, placement); + } + + if (flags & FOR_DDL) + { + returnConnectionReference->hadDDL = true; + } + if (flags & FOR_DML) + { + returnConnectionReference->hadDML = true; + } + if (flags & CRITICAL_CONNECTION) + { + RemoteTransaction *transaction = + &returnConnectionReference->connection->remoteTransaction; + transaction->criticalTransaction = true; + } + + return returnConnectionReference->connection; +} + + +void +InitPlacementConnectionManagement(void) +{ + HASHCTL info; + uint32 hashFlags = 0; + + /* create (placementid) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionPlacementHashKey); + info.entrysize = sizeof(ConnectionPlacementHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionPlacementHash = hash_create("citus connection cache (placementid)", + 64, &info, hashFlags); + + /* create (shardId) -> [ConnectionShardHashEntry] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ConnectionShardHashKey); + info.entrysize = sizeof(ConnectionShardHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + ConnectionShardHash = hash_create("citus connection cache (shardid)", + 64, &info, hashFlags); +} + + +/* + * Disassociate connections from placements and shards. This will be called at + * the end of XACT_EVENT_COMMIT and XACT_EVENT_ABORT. + */ +void +ResetPlacementConnectionManagement(void) +{ + /* Simply delete all entries*/ + hash_delete_all(ConnectionPlacementHash); + hash_delete_all(ConnectionShardHash); +} + + +/* + * Check which placements have to be marked as invalid, and/or whether + * sufficiently many placements have failed to abort the entire coordinated + * transaction. + * + * This will usually be called twice. Once before the remote commit is done, + * and once after. This is so we can abort before executing remote commits, + * and so we can handle remote transactions that failed during commit. + * + * When preCommit or using2PC is true, failures on transactions marked as + * critical will abort the entire coordinated transaction. Otherwise we can't + * anymore, because some remote transactions might have already committed. + */ +void +CheckForFailedPlacements(bool preCommit, bool using2PC) +{ + HASH_SEQ_STATUS status; + ConnectionShardHashEntry *shardEntry = NULL; + + hash_seq_init(&status, ConnectionShardHash); + while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0) + { + ListCell *placementCell = NULL; + int failures = 0; + int successes = 0; + + foreach(placementCell, shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + (ConnectionPlacementHashEntry *) lfirst(placementCell); + ListCell *referenceCell = NULL; + + foreach(referenceCell, placementEntry->connectionReferences) + { + ConnectionReference *reference = + (ConnectionReference *) lfirst(referenceCell); + MultiConnection *connection = reference->connection; + + /* + * If neither DDL nor DML were executed, there's no need for + * invalidation. + */ + if (!reference->hadDDL && !reference->hadDML) + { + continue; + } + + if (connection->remoteTransaction.transactionFailed) + { + placementEntry->failed = true; + + /* + * Raise an error if failure was on a required connection, + * unless we're post-commit and not using 2PC. In that + * case escalating failures here could leave inconsistent + * shards in place, which are not marked as invalid. + * + * XXX: should we warn? + */ + if (preCommit || using2PC) + { + /* to raise ERROR if a required connection */ + MarkRemoteTransactionFailed(connection, true); + } + } + } + + if (placementEntry->failed) + { + failures++; + } + else + { + successes++; + } + } + + if (failures > 0 && successes == 0) + { + /* + * FIXME: arguably we should only error out here if we're + * pre-commit or using 2PC. Otherwise we can end up with a state + * where parts of the transaction is committed and others aren't, + * without correspondingly marking things as invalid (which we + * can't, as we would have already committed). + */ + + /* FIXME: better message */ + ereport(ERROR, (errmsg("could not commit transaction on any active nodes"))); + } + + foreach(placementCell, shardEntry->placementConnections) + { + ConnectionPlacementHashEntry *placementEntry = + (ConnectionPlacementHashEntry *) lfirst(placementCell); + + if (placementEntry->failed) + { + UpdateShardPlacementState(placementEntry->key.placementid, FILE_INACTIVE); + } + } + } +} + + +/* Record shard->placement relation */ +static void +AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry, + ShardPlacement *placement) +{ + ConnectionShardHashKey shardKey; + ConnectionShardHashEntry *shardEntry = NULL; + bool found = false; + MemoryContext oldContext = NULL; + shardKey.shardId = placement->shardId; + shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found); + if (!found) + { + shardEntry->placementConnections = NIL; + } + + oldContext = MemoryContextSwitchTo(ConnectionContext); + shardEntry->placementConnections = + list_append_unique_ptr(shardEntry->placementConnections, placementEntry); + MemoryContextSwitchTo(oldContext); +} diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h new file mode 100644 index 000000000..eeb47b31f --- /dev/null +++ b/src/include/distributed/placement_connection.h @@ -0,0 +1,97 @@ +/*------------------------------------------------------------------------- + * placement_connection.h + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PLACEMENT_CONNECTION_H +#define PLACEMENT_CONNECTION_H + + +#include "distributed/connection_management.h" +#include "utils/hsearch.h" + + +/* forward declare, to avoid dependency on ShardPlacement definition */ +struct ShardPlacement; + +/* + * Hash table mapping placements to a list of connections. + * + * This stores a list of connections for each placement, because multiple + * connections to the same placement may exist at the same time. E.g. a + * real-time executor query may reference the same placement in several + * sub-tasks. + * + * We keep track about a connection having executed DML or DDL, since we can + * only ever allow a single transaction to do either to prevent deadlocks and + * consistency violations (e.g. read-your-own-writes). + */ + +/* hash key */ +typedef struct ConnectionPlacementHashKey +{ + uint32 placementid; +} ConnectionPlacementHashKey; + +/* information about a connection reference to a table */ +typedef struct ConnectionReference +{ + MultiConnection *connection; + bool hadDML; + bool hadDDL; +} ConnectionReference; + +/* hash entry */ +typedef struct ConnectionPlacementHashEntry +{ + ConnectionPlacementHashKey key; + + bool failed; + + List *connectionReferences; +} ConnectionPlacementHashEntry; + +/* hash table */ +extern HTAB *ConnectionPlacementHash; + + +/* + * Hash table mapping shard ids to placements. + * + * This is used to track whether placements of a shard have to be marked + * invalid after a failure, or whether a coordinated transaction has to be + * aborted, to avoid all placements of a shard to be marked invalid. + */ + +/* hash key */ +typedef struct ConnectionShardHashKey +{ + uint64 shardId; +} ConnectionShardHashKey; + +/* hash entry */ +typedef struct ConnectionShardHashEntry +{ + ConnectionShardHashKey *key; + List *placementConnections; +} ConnectionShardHashEntry; + +/* hash table itself */ +extern HTAB *ConnectionShardHash; + + +/* Higher level connection handling API. */ +extern MultiConnection * GetPlacementConnection(uint32 flags, + struct ShardPlacement *placement); +extern MultiConnection * StartPlacementConnection(uint32 flags, + struct ShardPlacement *placement); + +extern void CheckForFailedPlacements(bool preCommit, bool using2PC); + +extern void InitPlacementConnectionManagement(void); +extern void ResetPlacementConnectionManagement(void); + +#endif /* PLACEMENT_CONNECTION_H */