Merge pull request #1079 from citusdata/feature/shardstate

Centralized placement ->connection mapping & placement health management
pull/1108/head
Andres Freund 2017-01-09 13:28:22 -08:00 committed by GitHub
commit d8cdb552e5
10 changed files with 940 additions and 546 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -234,6 +235,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = StartConnectionEstablishment(&key); connection = StartConnectionEstablishment(&key);
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection);
if (flags & SESSION_LIFESPAN) if (flags & SESSION_LIFESPAN)
{ {
@ -333,6 +335,7 @@ CloseNodeConnections(char *nodeName, int nodePort)
/* same for transaction state */ /* same for transaction state */
CloseRemoteTransaction(connection); CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
/* we leave the per-host entry alive */ /* we leave the per-host entry alive */
pfree(connection); pfree(connection);
@ -366,8 +369,9 @@ CloseConnection(MultiConnection *connection)
/* unlink from list of open connections */ /* unlink from list of open connections */
dlist_delete(&connection->connectionNode); dlist_delete(&connection->connectionNode);
/* same for transaction state */ /* same for transaction state and shard/placement machinery */
CloseRemoteTransaction(connection); CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
/* we leave the per-host entry alive */ /* we leave the per-host entry alive */
pfree(connection); pfree(connection);
@ -692,6 +696,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{ {
/* reset per-transaction state */ /* reset per-transaction state */
ResetRemoteTransaction(connection); ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
UnclaimConnection(connection); UnclaimConnection(connection);
} }

View File

@ -0,0 +1,660 @@
/*-------------------------------------------------------------------------
*
* placement_connection.c
* Per placement connection handling.
*
* Copyright (c) 2016-2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
/*
* A connection reference is used to register that a connection has been used
* to read or modify a shard placement as a particular user.
*/
typedef struct ConnectionReference
{
/* identity information about the connection */
uint64 shardId;
uint64 placementId;
/*
* The user used to read/modify the placement. We cannot reuse connections
* that were performed using a different role, since it would not have the
* right permissions.
*/
const char *userName;
/* the connection */
MultiConnection *connection;
/*
* Information about what the connection is used for. There can only be
* one connection executing DDL/DML for a placement to avoid deadlock
* issues/read-your-own-writes violations. The difference between DDL/DML
* currently is only used to emit more precise error messages.
*/
bool hadDML;
bool hadDDL;
/* membership in ConnectionPlacementHashKey->connectionReferences */
dlist_node placementNode;
/* membership in MultiConnection->referencedPlacements */
dlist_node connectionNode;
} ConnectionReference;
/*
* Hash table mapping placements to a list of connections.
*
* This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. a
* real-time executor query may reference the same placement in several
* sub-tasks.
*
* We keep track about a connection having executed DML or DDL, since we can
* only ever allow a single transaction to do either to prevent deadlocks and
* consistency violations (e.g. read-your-own-writes).
*/
/* hash key */
typedef struct ConnectionPlacementHashKey
{
uint64 placementId;
} ConnectionPlacementHashKey;
/* hash entry */
typedef struct ConnectionPlacementHashEntry
{
ConnectionPlacementHashKey key;
/* did any remote transactions fail? */
bool failed;
/* list of connections to remote nodes */
dlist_head connectionReferences;
/* if non-NULL, connection executing DML/DDL*/
ConnectionReference *modifyingConnection;
/* membership in ConnectionShardHashEntry->placementConnections */
dlist_node shardNode;
} ConnectionPlacementHashEntry;
/* hash table */
static HTAB *ConnectionPlacementHash;
/*
* Hash table mapping shard ids to placements.
*
* This is used to track whether placements of a shard have to be marked
* invalid after a failure, or whether a coordinated transaction has to be
* aborted, to avoid all placements of a shard to be marked invalid.
*/
/* hash key */
typedef struct ConnectionShardHashKey
{
uint64 shardId;
} ConnectionShardHashKey;
/* hash entry */
typedef struct ConnectionShardHashEntry
{
ConnectionShardHashKey key;
dlist_head placementConnections;
} ConnectionShardHashEntry;
/* hash table */
static HTAB *ConnectionShardHash;
static ConnectionReference * CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *
placementEntry);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool CheckShardPlacements(ConnectionShardHashEntry *shardEntry, bool preCommit,
bool using2PC);
/*
* GetPlacementConnection establishes a connection for a placement.
*
* See StartPlacementConnection for details.
*/
MultiConnection *
GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartPlacementConnection initiates a connection to a remote node,
* associated with the placement and transaction.
*
* The connection is established for the current database. If userName is NULL
* the current user is used, otherwise the provided one.
*
* See StartNodeUserDatabaseConnection for details.
*
* Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
* except that two additional flags have an effect:
* - FOR_DML - signal that connection is going to be used for DML (modifications)
* - FOR_DDL - signal that connection is going to be used for DDL
*
* Only one connection associated with the placement may have FOR_DML or
* FOR_DDL set. This restriction prevents deadlocks and wrong results due to
* in-progress transactions.
*/
MultiConnection *
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
ConnectionPlacementHashKey key;
ConnectionPlacementHashEntry *placementEntry = NULL;
ConnectionReference *returnConnectionReference = NULL;
char *freeUserName = NULL;
bool found = false;
if (userName == NULL)
{
userName = freeUserName = CurrentUserName();
}
key.placementId = placement->placementId;
/* lookup relevant hash entry */
placementEntry = hash_search(ConnectionPlacementHash, &key, HASH_ENTER, &found);
if (!found)
{
dlist_init(&placementEntry->connectionReferences);
placementEntry->failed = false;
placementEntry->modifyingConnection = NULL;
}
/*
* Check whether any of the connections already associated with the
* placement can be reused, or violates FOR_DML/FOR_DDL constraints.
*/
returnConnectionReference = CheckExistingConnections(flags, userName, placementEntry);
/*
* Either no caching desired, or no connection present. Start connection
* establishment. Allocations are performed in transaction context, so we
* don't have to care about freeing in case of an early disconnect.
*/
if (returnConnectionReference == NULL)
{
MultiConnection *connection = StartNodeConnection(flags, placement->nodeName,
placement->nodePort);
returnConnectionReference = (ConnectionReference *)
MemoryContextAlloc(TopTransactionContext,
sizeof(ConnectionReference));
returnConnectionReference->connection = connection;
returnConnectionReference->hadDDL = false;
returnConnectionReference->hadDML = false;
returnConnectionReference->shardId = placement->shardId;
returnConnectionReference->placementId = placement->placementId;
returnConnectionReference->userName =
MemoryContextStrdup(TopTransactionContext, userName);
dlist_push_tail(&placementEntry->connectionReferences,
&returnConnectionReference->placementNode);
/* record association with shard, for invalidation */
AssociatePlacementWithShard(placementEntry, placement);
/* record association with connection, to handle connection closure */
dlist_push_tail(&connection->referencedPlacements,
&returnConnectionReference->connectionNode);
}
if (flags & FOR_DDL)
{
placementEntry->modifyingConnection = returnConnectionReference;
returnConnectionReference->hadDDL = true;
}
if (flags & FOR_DML)
{
placementEntry->modifyingConnection = returnConnectionReference;
returnConnectionReference->hadDML = true;
}
if (freeUserName)
{
pfree(freeUserName);
}
return returnConnectionReference->connection;
}
/*
* CheckExistingConnections check whether any of the existing connections is
* usable. If so, return it, otherwise return NULL.
*
* A connection is usable if it is not in use, the user matches, DDL/DML usage
* match and cached connection are allowed. If no existing connection
* matches, but a new connection would conflict (e.g. a connection already
* exists but isn't usable, and the desired connection needs to execute
* DML/DML) an error is thrown.
*/
static ConnectionReference *
CheckExistingConnections(uint32 flags, const char *userName,
ConnectionPlacementHashEntry *placementEntry)
{
dlist_iter it;
/*
* If there's a connection that has executed DML/DDL, always return it if
* possible. That's because we only can execute DML/DDL over that
* connection. Checking this first also allows us to detect conflicts due
* to opening a second connection that wants to execute DML/DDL.
*/
if (placementEntry->modifyingConnection)
{
ConnectionReference *connectionReference = placementEntry->modifyingConnection;
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
if (connectionReference->hadDDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DDL has been "
"executed on existing placement connection")));
}
else if (connectionReference->hadDML)
{
/* we'd not see the other connection's contents */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection when DML has been "
"executed on existing placement connection")));
}
else
{
/* modifying xact should have executed DML/DDL */
Assert(false);
}
}
/*
* Search existing connections for a reusable connection.
*/
dlist_foreach(it, &placementEntry->connectionReferences)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, placementNode, it.cur);
if (CanUseExistingConnection(flags, userName, connectionReference))
{
return connectionReference;
}
/*
* If not using the connection, verify that FOR_DML/DDL flags are
* compatible.
*/
if (flags & FOR_DDL)
{
/* would deadlock otherwise */
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DDL when "
"other connections exist")));
}
else if (flags & FOR_DML)
{
/*
* We could allow this case (as presumably the SELECT is done, but
* it'd be a bit prone to programming errors, so we disallow for
* now.
*/
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot establish new placement connection for DML when "
"other connections exist")));
}
}
/* establish a new connection */
return NULL;
}
/*
* CanUseExistingConnection is a helper function for CheckExistingConnections()
* that checks whether an existing connection can be reused.
*/
static bool
CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference)
{
MultiConnection *connection = connectionReference->connection;
if (!connection)
{
/* if already closed connection obviously not usable */
return false;
}
else if (connection->claimedExclusively)
{
/* already used */
return false;
}
else if (flags & FORCE_NEW_CONNECTION)
{
/* no connection reuse desired */
return false;
}
else if (strcmp(connectionReference->userName, userName) != 0)
{
/* connection for different user, check for conflict */
return false;
}
else
{
return true;
}
}
/*
* AssociatePlacementWithShard records shard->placement relation in
* ConnectionShardHash.
*
* That association is later used, in CheckForFailedPlacements, to invalidate
* shard placements if necessary.
*/
static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement)
{
ConnectionShardHashKey shardKey;
ConnectionShardHashEntry *shardEntry = NULL;
bool found = false;
dlist_iter placementIter;
shardKey.shardId = placement->shardId;
shardEntry = hash_search(ConnectionShardHash, &shardKey, HASH_ENTER, &found);
if (!found)
{
dlist_init(&shardEntry->placementConnections);
}
/*
* Check if placement is already associated with shard (happens if there's
* multiple connections for a placement). There'll usually only be few
* placement per shard, so the price of iterating isn't large.
*/
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->key.placementId == placement->placementId)
{
return;
}
}
/* otherwise add */
dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode);
}
/*
* CloseShardPlacementAssociation handles a connection being closed before
* transaction end.
*
* This should only be called by connection_management.c.
*/
void
CloseShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_iter placementIter;
/* set connection to NULL for all references to the connection */
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *reference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
reference->connection = NULL;
/*
* Note that we don't reset ConnectionPlacementHashEntry's
* modifyingConnection here, that'd more complicated than it seems
* worth. That means we'll error out spuriously if a DML/DDL
* executing connection is closed earlier in a transaction.
*/
}
}
/*
* ResetShardPlacementAssociation resets the association of connections to
* shard placements at the end of a transaction.
*
* This should only be called by connection_management.c.
*/
void
ResetShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_init(&connection->referencedPlacements);
}
/*
* ResetPlacementConnectionManagement() disassociates connections from
* placements and shards. This will be called at the end of XACT_EVENT_COMMIT
* and XACT_EVENT_ABORT.
*/
void
ResetPlacementConnectionManagement(void)
{
/* Simply delete all entries */
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
/*
* NB: memory for ConnectionReference structs and subordinate data is
* deleted by virtue of being allocated in TopTransactionContext.
*/
}
/*
* CheckForFailedPlacements checks which placements have to be marked as
* invalid, and/or whether sufficiently many placements have failed to abort
* the entire coordinated transaction.
*
* This will usually be called twice. Once before the remote commit is done,
* and once after. This is so we can abort before executing remote commits,
* and so we can handle remote transactions that failed during commit.
*
* When preCommit or using2PC is true, failures on transactions marked as
* critical will abort the entire coordinated transaction. If not we can't
* roll back, because some remote transactions might have already committed.
*/
void
CheckForFailedPlacements(bool preCommit, bool using2PC)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
int successes = 0;
int attempts = 0;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)
{
attempts++;
if (CheckShardPlacements(shardEntry, preCommit, using2PC))
{
successes++;
}
}
/*
* If no shards could be modified at all, error out. Doesn't matter if
* we're post-commit - there's nothing to invalidate.
*/
if (attempts > 0 && successes == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active node")));
}
}
/*
* CheckShardPlacements is a helper function for CheckForFailedPlacements that
* performs the per-shard work.
*/
static bool
CheckShardPlacements(ConnectionShardHashEntry *shardEntry,
bool preCommit, bool using2PC)
{
int failures = 0;
int successes = 0;
dlist_iter placementIter;
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
dlist_iter referenceIter;
dlist_foreach(referenceIter, &placementEntry->connectionReferences)
{
ConnectionReference *reference =
dlist_container(ConnectionReference, placementNode, referenceIter.cur);
MultiConnection *connection = reference->connection;
/*
* If neither DDL nor DML were executed, there's no need for
* invalidation.
*/
if (!reference->hadDDL && !reference->hadDML)
{
continue;
}
/*
* Failed if connection was closed, or remote transaction was
* unsuccessful.
*/
if (!connection || connection->remoteTransaction.transactionFailed)
{
placementEntry->failed = true;
}
}
if (placementEntry->failed)
{
failures++;
}
else
{
successes++;
}
}
if (failures > 0 && successes == 0)
{
int elevel = 0;
/*
* Only error out if we're pre-commit or using 2PC. Can't error
* otherwise as we can end up with a state where some shard
* modifications have already committed successfully. If no
* modifications at all succeed, CheckForFailedPlacements() will error
* out. This sucks.
*/
if (preCommit || using2PC)
{
elevel = ERROR;
}
else
{
elevel = WARNING;
}
ereport(elevel,
(errmsg("could not commit transaction for shard " INT64_FORMAT
" on any active node",
shardEntry->key.shardId)));
return false;
}
/* mark all failed placements invalid */
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *placementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (placementEntry->failed)
{
UpdateShardPlacementState(placementEntry->key.placementId, FILE_INACTIVE);
}
}
return true;
}
/*
* InitPlacementConnectionManagement performs initialization of the
* infrastructure in this file at server start.
*/
void
InitPlacementConnectionManagement(void)
{
HASHCTL info;
uint32 hashFlags = 0;
/* create (placementId) -> [ConnectionReference] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionPlacementHashKey);
info.entrysize = sizeof(ConnectionPlacementHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionPlacementHash = hash_create("citus connection cache (placementid)",
64, &info, hashFlags);
/* create (shardId) -> [ConnectionShardHashEntry] hash */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ConnectionShardHashKey);
info.entrysize = sizeof(ConnectionShardHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags);
}

View File

@ -41,6 +41,7 @@
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h" #include "distributed/remote_transaction.h"
@ -72,30 +73,14 @@
/* controls use of locks to enforce safe commutativity */ /* controls use of locks to enforce safe commutativity */
bool AllModificationsCommutative = false; bool AllModificationsCommutative = false;
/*
* The following static variables are necessary to track the progression of
* multi-statement transactions managed by the router executor. After the first
* modification within a transaction, the executor populates a hash with the
* transaction's initial participants (nodes hit by that initial modification).
*
* To keep track of the reverse mapping (from shards to nodes), we have a list
* of XactShardConnSets, which map a shard identifier to a set of connection
* hash entries. This list is walked by MarkRemainingInactivePlacements to
* ensure we mark placements as failed if they reject a COMMIT.
*/
static HTAB *xactParticipantHash = NULL;
static List *xactShardConnSetList = NIL;
/* functions needed during start phase */
static void InitTransactionStateForTask(Task *task);
static HTAB * CreateXactParticipantHash(void);
/* functions needed during run phase */ /* functions needed during run phase */
static void ReacquireMetadataLocks(List *taskList); static void ReacquireMetadataLocks(List *taskList);
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task, static void ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool expectResults); bool expectResults);
static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList); static void ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task);
static List * GetModifyConnections(List *taskPlacementList,
bool markCritical,
bool startedInTransaction);
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -109,23 +94,15 @@ static bool RequiresConsistentSnapshot(Task *task);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, DestReceiver *destination,
Tuplestorestate *tupleStore); Tuplestorestate *tupleStore);
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
bool isModificationQuery);
static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
static void RemoveXactConnection(PGconn *connection);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static bool SendQueryInSingleRowMode(PGconn *connection, char *query, static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
ParamListInfo paramListInfo); ParamListInfo paramListInfo);
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
TupleDesc tupleDescriptor, bool failOnError, int64 *rows); TupleDesc tupleDescriptor, bool failOnError, int64 *rows);
static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
static void RecordShardIdParticipant(uint64 affectedShardId, int64 *rows);
NodeConnectionEntry *participantEntry);
/* to verify the health of shards after a transactional modification command */
static void MarkRemainingInactivePlacements(void);
/* /*
@ -232,52 +209,6 @@ ReacquireMetadataLocks(List *taskList)
} }
/*
* InitTransactionStateForTask is called during executor start with the first
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the
* transaction participant hash, opens connections to this task's nodes, and
* populates the hash with those connections after sending BEGIN commands to
* each. If a node fails to respond, its connection is set to NULL to prevent
* further interaction with it during the transaction.
*/
static void
InitTransactionStateForTask(Task *task)
{
ListCell *placementCell = NULL;
BeginOrContinueCoordinatedTransaction();
xactParticipantHash = CreateXactParticipantHash();
foreach(placementCell, task->taskPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
NodeConnectionKey participantKey;
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *connection =
GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort);
MemSet(&participantKey, 0, sizeof(participantKey));
strlcpy(participantKey.nodeName, placement->nodeName,
MAX_NODE_LENGTH + 1);
participantKey.nodePort = placement->nodePort;
participantEntry = hash_search(xactParticipantHash, &participantKey,
HASH_ENTER, &entryFound);
Assert(!entryFound);
/* issue BEGIN if necessary */
RemoteTransactionBeginIfNecessary(connection);
participantEntry->connection = connection;
}
XactModificationLevel = XACT_MODIFICATION_DATA;
}
/* /*
* AcquireExecutorShardLock acquires a lock on the shard for the given task and * AcquireExecutorShardLock acquires a lock on the shard for the given task and
* command type if necessary to avoid divergence between multiple replicas of * command type if necessary to avoid divergence between multiple replicas of
@ -527,33 +458,6 @@ RequiresConsistentSnapshot(Task *task)
} }
/*
* CreateXactParticipantHash initializes the map used to store the connections
* needed to process distributed transactions. Unlike the connection cache, we
* permit NULL connections here to signify that a participant has seen an error
* and is no longer receiving commands during a transaction. This hash should
* be walked at transaction end to send final COMMIT or ABORT commands.
*/
static HTAB *
CreateXactParticipantHash(void)
{
HTAB *xactParticipantHash = NULL;
HASHCTL info;
int hashFlags = 0;
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(NodeConnectionKey);
info.entrysize = sizeof(NodeConnectionEntry);
info.hcxt = TopTransactionContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
xactParticipantHash = hash_create("citus xact participant hash", 32, &info,
hashFlags);
return xactParticipantHash;
}
/* /*
* RouterExecutorRun actually executes a single task on a worker. * RouterExecutorRun actually executes a single task on a worker.
*/ */
@ -633,13 +537,14 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
if (list_length(taskList) == 1) if (list_length(taskList) == 1)
{ {
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
bool resultsOK = false;
resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery, if (isModificationQuery)
sendTuples);
if (!resultsOK)
{ {
ereport(ERROR, (errmsg("could not receive query results"))); ExecuteSingleModifyTask(queryDesc, task, sendTuples);
}
else
{
ExecuteSingleSelectTask(queryDesc, task);
} }
} }
else else
@ -692,18 +597,72 @@ out:
/* /*
* ExecuteSingleTask executes the task on the remote node, retrieves the * ExecuteSingleSelectTask executes the task on the remote node, retrieves the
* results and stores them, if SELECT or RETURNING is used, in a tuple * results and stores them in a tuple store.
* store.
* *
* If the task fails on one of the placements, the function retries it on * If the task fails on one of the placements, the function retries it on
* other placements (SELECT), reraises the remote error (constraint violation * other placements or errors out if the query fails on all placements.
* in DML), marks the affected placement as invalid (DML on some placement
* failed), or errors out (DML failed on all placements).
*/ */
static bool static void
ExecuteSingleTask(QueryDesc *queryDesc, Task *task, ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task)
bool isModificationQuery, {
TupleDesc tupleDescriptor = queryDesc->tupDesc;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
ParamListInfo paramListInfo = queryDesc->params;
List *taskPlacementList = task->taskPlacementList;
ListCell *taskPlacementCell = NULL;
char *queryString = task->queryString;
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("single-shard query may not appear in transaction blocks "
"which contain multi-shard data modifications")));
}
/*
* Try to run the query to completion on one placement. If the query fails
* attempt the query on the next placement.
*/
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
bool queryOK = false;
bool dontFailOnError = false;
int64 currentAffectedTupleCount = 0;
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *connection =
GetPlacementConnection(connectionFlags, taskPlacement, NULL);
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
continue;
}
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
dontFailOnError, &currentAffectedTupleCount);
if (queryOK)
{
return;
}
}
ereport(ERROR, (errmsg("could not receive query results")));
}
/*
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
* results and stores them, if RETURNING is used, in a tuple store.
*
* If the task fails on one of the placements, the function reraises the
* remote error (constraint violation in DML), marks the affected placement as
* invalid (other error on some placements, via the placement connection
* framework), or errors out (failed on all placements).
*/
static void
ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task,
bool expectResults) bool expectResults)
{ {
CmdType operation = queryDesc->operation; CmdType operation = queryDesc->operation;
@ -713,12 +672,15 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
ParamListInfo paramListInfo = queryDesc->params; ParamListInfo paramListInfo = queryDesc->params;
bool resultsOK = false; bool resultsOK = false;
List *taskPlacementList = task->taskPlacementList; List *taskPlacementList = task->taskPlacementList;
List *connectionList = NIL;
ListCell *taskPlacementCell = NULL; ListCell *taskPlacementCell = NULL;
List *failedPlacementList = NIL; ListCell *connectionCell = NULL;
int64 affectedTupleCount = -1; int64 affectedTupleCount = -1;
bool gotResults = false; bool gotResults = false;
char *queryString = task->queryString; char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
bool startedInTransaction =
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{ {
@ -729,59 +691,52 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
} }
/* /*
* Firstly ensure that distributed transaction is started. Then, force * Modifications for reference tables are always done using 2PC. First
* the transaction manager to use 2PC while running the task on the placements. * ensure that distributed transaction is started. Then force the
* transaction manager to use 2PC while running the task on the
* placements.
*/ */
if (taskRequiresTwoPhaseCommit) if (taskRequiresTwoPhaseCommit)
{ {
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
/*
* Mark connections for all placements as critical and establish connections
* to all placements at once.
*/
GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList);
} }
/* /*
* We could naturally handle function-based transactions (i.e. those * We could naturally handle function-based transactions (i.e. those using
* using PL/pgSQL or similar) by checking the type of queryDesc->dest, * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
* but some customers already use functions that touch multiple shards * customers already use functions that touch multiple shards from within
* from within a function, so we'll ignore functions for now. * a function, so we'll ignore functions for now.
*/ */
if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock()) if (IsTransactionBlock())
{ {
InitTransactionStateForTask(task); BeginOrContinueCoordinatedTransaction();
} }
/*
* Get connections required to execute task. This will, if necessary,
* establish the connection, mark as critical (when modifying reference
* table) and start a transaction (when in a transaction).
*/
connectionList = GetModifyConnections(taskPlacementList,
taskRequiresTwoPhaseCommit,
startedInTransaction);
/* prevent replicas of the same shard from diverging */ /* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLock(task, operation);
/* /* try to execute modification on all placements */
* Try to run the query to completion on one placement. If the query fails forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList)
* attempt the query on the next placement.
*/
foreach(taskPlacementCell, taskPlacementList)
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
bool queryOK = false; bool queryOK = false;
bool failOnError = false; bool failOnError = false;
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
PGconn *connection = GetConnectionForPlacement(taskPlacement,
isModificationQuery);
if (connection == NULL)
{
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK) if (!queryOK)
{ {
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue; continue;
} }
@ -829,87 +784,88 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
resultsOK = true; resultsOK = true;
gotResults = true; gotResults = true;
/*
* Modifications have to be executed on all placements, but for
* read queries we can stop here.
*/
if (!isModificationQuery)
{
break;
} }
} }
else
{
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
}
if (isModificationQuery)
{
ListCell *failedPlacementCell = NULL;
/* if all placements failed, error out */ /* if all placements failed, error out */
if (list_length(failedPlacementList) == list_length(task->taskPlacementList)) if (!resultsOK)
{ {
ereport(ERROR, (errmsg("could not modify any active placements"))); ereport(ERROR, (errmsg("could not modify any active placements")));
} }
/*
* Otherwise, mark failed placements as inactive: they're stale. Note that
* connections for tasks that require 2PC has already failed the whole transaction
* and there is no way that they're marked stale here.
*/
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement =
(ShardPlacement *) lfirst(failedPlacementCell);
Assert(!taskRequiresTwoPhaseCommit);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
executorState->es_processed = affectedTupleCount; executorState->es_processed = affectedTupleCount;
}
return resultsOK; if (IsTransactionBlock())
{
XactModificationLevel = XACT_MODIFICATION_DATA;
}
} }
/* /*
* GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list, * GetModifyConnections returns the list of connections required to execute
* starts the connections to the nodes and marks them critical. In the second iteration, * modify commands on the placements in tasPlacementList. If necessary remote
* the connection establishments are finished. Finally, BEGIN commands are sent, * transactions are started.
* if necessary. *
* If markCritical is true remote transactions are marked as critical. If
* noNewTransactions is true, this function errors out if there's no
* transaction in progress.
*/ */
static void static List *
GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList) GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions)
{ {
ListCell *taskPlacementCell = NULL; ListCell *taskPlacementCell = NULL;
List *multiConnectionList = NIL; List *multiConnectionList = NIL;
/* in the first iteration start the connections */ /* first initiate connection establishment for all necessary connections */
foreach(taskPlacementCell, taskPlacementList) foreach(taskPlacementCell, taskPlacementList)
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN; int connectionFlags = SESSION_LIFESPAN | FOR_DML;
MultiConnection *multiConnection = StartNodeConnection(connectionFlags, MultiConnection *multiConnection = NULL;
taskPlacement->nodeName,
taskPlacement->nodePort);
/*
* FIXME: It's not actually correct to use only one shard placement
* here for router queries involving multiple relations. We should
* check that this connection is the only modifying one associated
* with all the involved shards.
*/
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL);
/*
* If already in a transaction, disallow expanding set of remote
* transactions. That prevents some forms of distributed deadlocks.
*/
if (noNewTransactions)
{
RemoteTransaction *transaction = &multiConnection->remoteTransaction;
if (transaction->transactionState == REMOTE_TRANS_INVALID)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("no transaction participant matches %s:%d",
taskPlacement->nodeName, taskPlacement->nodePort),
errdetail("Transactions which modify distributed tables "
"may only target nodes affected by the "
"modification command which began the transaction.")));
}
}
if (markCritical)
{
MarkRemoteTransactionCritical(multiConnection); MarkRemoteTransactionCritical(multiConnection);
}
multiConnectionList = lappend(multiConnectionList, multiConnection); multiConnectionList = lappend(multiConnectionList, multiConnection);
} }
/* then finish in parallel */
FinishConnectionListEstablishment(multiConnectionList); FinishConnectionListEstablishment(multiConnectionList);
/* and start transactions if applicable */
RemoteTransactionsBeginIfNecessary(multiConnectionList); RemoteTransactionsBeginIfNecessary(multiConnectionList);
return multiConnectionList;
} }
@ -1009,8 +965,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
bool shardConnectionsFound = false; bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *connectionList = NIL;
MultiConnection *multiConnection = NULL; MultiConnection *connection = NULL;
PGconn *connection = NULL;
bool queryOK = false; bool queryOK = false;
shardConnections = GetShardConnections(shardId, &shardConnectionsFound); shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
@ -1022,14 +977,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
continue; continue;
} }
multiConnection = connection =
(MultiConnection *) list_nth(connectionList, placementIndex); (MultiConnection *) list_nth(connectionList, placementIndex);
connection = multiConnection->pgConn;
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK) if (!queryOK)
{ {
ReraiseRemoteError(connection, NULL); ReportConnectionError(connection, ERROR);
} }
} }
@ -1041,8 +995,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
bool shardConnectionsFound = false; bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *connectionList = NIL;
MultiConnection *multiConnection = NULL; MultiConnection *connection = NULL;
PGconn *connection = NULL;
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
bool failOnError = true; bool failOnError = true;
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
@ -1060,9 +1013,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
continue; continue;
} }
multiConnection = connection = (MultiConnection *) list_nth(connectionList, placementIndex);
(MultiConnection *) list_nth(connectionList, placementIndex);
connection = multiConnection->pgConn;
/* /*
* If caller is interested, store query results the first time * If caller is interested, store query results the first time
@ -1101,16 +1052,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
if (currentAffectedTupleCount != previousAffectedTupleCount) if (currentAffectedTupleCount != previousAffectedTupleCount)
{ {
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING, ereport(WARNING,
(errmsg("modified "INT64_FORMAT " tuples of shard " (errmsg("modified "INT64_FORMAT " tuples of shard "
UINT64_FORMAT ", but expected to modify "INT64_FORMAT, UINT64_FORMAT ", but expected to modify "INT64_FORMAT,
currentAffectedTupleCount, shardId, currentAffectedTupleCount, shardId,
previousAffectedTupleCount), previousAffectedTupleCount),
errdetail("modified placement on %s:%s", nodeName, errdetail("modified placement on %s:%d",
nodePort))); connection->hostname, connection->port)));
} }
} }
@ -1199,143 +1147,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
} }
/*
* GetConnectionForPlacement is the main entry point for acquiring a connection
* within the router executor. By using placements (rather than node names and
* ports) to identify connections, the router executor can keep track of shards
* used by multi-statement transactions and error out if a transaction tries
* to reach a new node altogether). In the single-statement commands context,
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
*/
static PGconn *
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
{
NodeConnectionKey participantKey;
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
/* if not in a transaction, fall through to connection cache */
if (xactParticipantHash == NULL)
{
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
placement->nodePort);
return connection;
}
Assert(IsTransactionBlock());
MemSet(&participantKey, 0, sizeof(participantKey));
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
participantKey.nodePort = placement->nodePort;
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
&entryFound);
if (entryFound)
{
if (isModificationQuery)
{
RecordShardIdParticipant(placement->shardId, participantEntry);
}
return participantEntry->connection->pgConn;
}
else
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("no transaction participant matches %s:%d",
placement->nodeName, placement->nodePort),
errdetail("Transactions which modify distributed tables may only "
"target nodes affected by the modification command "
"which began the transaction.")));
}
}
/*
* PurgeConnectionForPlacement provides a way to purge an invalid connection
* from all relevant connection hashes using the placement involved in the
* query at the time of the error. If a transaction is ongoing, this function
* ensures the right node's connection is set to NULL in the participant map
* for the transaction in addition to purging the connection cache's entry.
*/
static void
PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement)
{
CloseConnectionByPGconn(connection);
/*
* The following is logically identical to RemoveXactConnection, but since
* we have a ShardPlacement to help build a NodeConnectionKey, we avoid
* any penalty incurred by calling BuildKeyForConnection, which must ex-
* tract host, port, and user from the connection options list.
*/
if (xactParticipantHash != NULL)
{
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
Assert(IsTransactionBlock());
/* the participant hash doesn't use the user field */
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
&entryFound);
Assert(entryFound);
participantEntry->connection = NULL;
}
}
/*
* Removes a given connection from the transaction participant hash, based on
* the host and port of the provided connection. If the hash is not NULL, it
* MUST contain the provided connection, or a FATAL error is raised.
*/
static void
RemoveXactConnection(PGconn *connection)
{
NodeConnectionKey nodeKey;
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
if (xactParticipantHash == NULL)
{
return;
}
BuildKeyForConnection(connection, &nodeKey);
/* the participant hash doesn't use the user field */
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
&entryFound);
if (!entryFound)
{
ereport(FATAL, (errmsg("could not find specified transaction connection")));
}
participantEntry->connection = NULL;
}
/* /*
* SendQueryInSingleRowMode sends the given query on the connection in an * SendQueryInSingleRowMode sends the given query on the connection in an
* asynchronous way. The function also sets the single-row mode on the * asynchronous way. The function also sets the single-row mode on the
* connection so that we receive results a row at a time. * connection so that we receive results a row at a time.
*/ */
static bool static bool
SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo) SendQueryInSingleRowMode(MultiConnection *connection, char *query,
ParamListInfo paramListInfo)
{ {
int querySent = 0; int querySent = 0;
int singleRowMode = 0; int singleRowMode = 0;
@ -1349,24 +1168,27 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes, ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
&parameterValues); &parameterValues);
querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes, querySent = PQsendQueryParams(connection->pgConn, query,
parameterValues, NULL, NULL, 0); parameterCount, parameterTypes, parameterValues,
NULL, NULL, 0);
} }
else else
{ {
querySent = PQsendQuery(connection, query); querySent = PQsendQuery(connection->pgConn, query);
} }
if (querySent == 0) if (querySent == 0)
{ {
WarnRemoteError(connection, NULL); MarkRemoteTransactionFailed(connection, false);
ReportConnectionError(connection, WARNING);
return false; return false;
} }
singleRowMode = PQsetSingleRowMode(connection); singleRowMode = PQsetSingleRowMode(connection->pgConn);
if (singleRowMode == 0) if (singleRowMode == 0)
{ {
WarnRemoteError(connection, NULL); MarkRemoteTransactionFailed(connection, false);
ReportConnectionError(connection, WARNING);
return false; return false;
} }
@ -1451,7 +1273,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
* the connection. * the connection.
*/ */
static bool static bool
StoreQueryResult(MaterialState *routerState, PGconn *connection, StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
TupleDesc tupleDescriptor, bool failOnError, int64 *rows) TupleDesc tupleDescriptor, bool failOnError, int64 *rows)
{ {
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
@ -1486,7 +1308,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
uint32 columnCount = 0; uint32 columnCount = 0;
ExecStatusType resultStatus = 0; ExecStatusType resultStatus = 0;
PGresult *result = PQgetResult(connection); PGresult *result = PQgetResult(connection->pgConn);
if (result == NULL) if (result == NULL)
{ {
break; break;
@ -1499,6 +1321,8 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
MarkRemoteTransactionFailed(connection, false);
/* /*
* If the error code is in constraint violation class, we want to * If the error code is in constraint violation class, we want to
* fail fast because we must get the same error from all shard * fail fast because we must get the same error from all shard
@ -1509,12 +1333,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
if (isConstraintViolation || failOnError) if (isConstraintViolation || failOnError)
{ {
RemoveXactConnection(connection); ReportResultError(connection, result, ERROR);
ReraiseRemoteError(connection, result);
} }
else else
{ {
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
} }
PQclear(result); PQclear(result);
@ -1579,7 +1402,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
* has been an error. * has been an error.
*/ */
static bool static bool
ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows) ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
{ {
bool commandFailed = false; bool commandFailed = false;
bool gotResponse = false; bool gotResponse = false;
@ -1593,7 +1416,7 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
*/ */
while (true) while (true)
{ {
PGresult *result = PQgetResult(connection); PGresult *result = PQgetResult(connection->pgConn);
ExecStatusType status = PGRES_COMMAND_OK; ExecStatusType status = PGRES_COMMAND_OK;
if (result == NULL) if (result == NULL)
@ -1611,6 +1434,8 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
MarkRemoteTransactionFailed(connection, false);
/* /*
* If the error code is in constraint violation class, we want to * If the error code is in constraint violation class, we want to
* fail fast because we must get the same error from all shard * fail fast because we must get the same error from all shard
@ -1621,13 +1446,13 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
if (isConstraintViolation || failOnError) if (isConstraintViolation || failOnError)
{ {
RemoveXactConnection(connection); ReportResultError(connection, result, ERROR);
ReraiseRemoteError(connection, result);
} }
else else
{ {
WarnRemoteError(connection, result); ReportResultError(connection, result, WARNING);
} }
PQclear(result); PQclear(result);
commandFailed = true; commandFailed = true;
@ -1667,50 +1492,6 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
} }
/*
* RecordShardIdParticipant registers a connection as being involved with a
* particular shard during a multi-statement transaction.
*/
static void
RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry)
{
XactShardConnSet *shardConnSetMatch = NULL;
ListCell *listCell = NULL;
MemoryContext oldContext = NULL;
List *connectionEntryList = NIL;
/* check whether an entry already exists for this shard */
foreach(listCell, xactShardConnSetList)
{
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell);
if (shardConnSet->shardId == affectedShardId)
{
shardConnSetMatch = shardConnSet;
}
}
/* entries must last through the whole top-level transaction */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
/* if no entry found, make one */
if (shardConnSetMatch == NULL)
{
shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet));
shardConnSetMatch->shardId = affectedShardId;
xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch);
}
/* add connection, avoiding duplicates */
connectionEntryList = shardConnSetMatch->connectionEntryList;
shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList,
participantEntry);
MemoryContextSwitchTo(oldContext);
}
/* /*
* RouterExecutorFinish cleans up after a distributed execution. * RouterExecutorFinish cleans up after a distributed execution.
*/ */
@ -1745,114 +1526,3 @@ RouterExecutorEnd(QueryDesc *queryDesc)
queryDesc->estate = NULL; queryDesc->estate = NULL;
queryDesc->totaltime = NULL; queryDesc->totaltime = NULL;
} }
/*
* RouterExecutorPreCommitCheck() gets called after remote transactions have
* committed, so it can invalidate failed shards and perform related checks.
*/
void
RouterExecutorPreCommitCheck(void)
{
/* no transactional router modification were issued, nothing to do */
if (xactParticipantHash == NULL)
{
return;
}
MarkRemainingInactivePlacements();
}
/*
* Cleanup callback called after a transaction commits or aborts.
*/
void
RouterExecutorPostCommit(void)
{
/* reset transaction state */
xactParticipantHash = NULL;
xactShardConnSetList = NIL;
}
/*
* MarkRemainingInactivePlacements takes care of marking placements of a shard
* inactive after some of the placements rejected the final COMMIT phase of a
* transaction.
*
* Failures are detected by checking the connection & transaction state for
* each of the entries in the connection set for each shard.
*/
static void
MarkRemainingInactivePlacements(void)
{
ListCell *shardConnSetCell = NULL;
int totalSuccesses = 0;
if (xactParticipantHash == NULL)
{
return;
}
foreach(shardConnSetCell, xactShardConnSetList)
{
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell);
List *participantList = shardConnSet->connectionEntryList;
ListCell *participantCell = NULL;
int successes = list_length(participantList); /* assume full success */
/* determine how many actual successes there were: subtract failures */
foreach(participantCell, participantList)
{
NodeConnectionEntry *participant =
(NodeConnectionEntry *) lfirst(participantCell);
MultiConnection *connection = participant->connection;
/*
* Fail if the connection has been set to NULL after an error, or
* if the transaction failed for other reasons (e.g. COMMIT
* failed).
*/
if (connection == NULL || connection->remoteTransaction.transactionFailed)
{
successes--;
}
}
/* if no nodes succeeded for this shard, don't do anything */
if (successes == 0)
{
continue;
}
/* otherwise, ensure failed placements are marked inactive */
foreach(participantCell, participantList)
{
NodeConnectionEntry *participant = NULL;
participant = (NodeConnectionEntry *) lfirst(participantCell);
if (participant->connection == NULL ||
participant->connection->remoteTransaction.transactionFailed)
{
uint64 shardId = shardConnSet->shardId;
NodeConnectionKey *nodeKey = &participant->cacheKey;
uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName,
nodeKey->nodePort);
InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength,
nodeKey->nodeName, nodeKey->nodePort);
}
}
totalSuccesses++;
}
/* If no shards could be modified at all, error out. */
if (totalSuccesses == 0)
{
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
}
}

View File

@ -33,6 +33,7 @@
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
@ -160,6 +161,7 @@ _PG_init(void)
/* initialize coordinated transaction management */ /* initialize coordinated transaction management */
InitializeTransactionManagement(); InitializeTransactionManagement();
InitializeConnectionManagement(); InitializeConnectionManagement();
InitPlacementConnectionManagement();
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)

View File

@ -21,9 +21,9 @@
#include "access/xact.h" #include "access/xact.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/guc.h" #include "utils/guc.h"
@ -149,7 +149,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
RouterExecutorPostCommit();
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
{ {
@ -160,6 +159,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */ /* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{ {
ResetPlacementConnectionManagement();
AfterXactConnectionHandling(true); AfterXactConnectionHandling(true);
} }
@ -185,7 +185,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
RouterExecutorPostCommit();
/* handles both already prepared and open transactions */ /* handles both already prepared and open transactions */
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
@ -196,6 +195,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* close connections etc. */ /* close connections etc. */
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE) if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
{ {
ResetPlacementConnectionManagement();
AfterXactConnectionHandling(false); AfterXactConnectionHandling(false);
} }
@ -233,6 +233,21 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
break; break;
} }
/*
* TODO: It'd probably be a good idea to force constraints and
* such to 'immediate' here. Deferred triggers might try to send
* stuff to the remote side, which'd not be good. Doing so
* remotely would also catch a class of errors where committing
* fails, which can lead to divergence when not using 2PC.
*/
/*
* Check whether the coordinated transaction is in a state we want
* to persist, or whether we want to error out. This handles the
* case that iteratively executed commands marked all placements
* as invalid.
*/
CheckForFailedPlacements(true, CurrentTransactionUse2PC);
if (CurrentTransactionUse2PC) if (CurrentTransactionUse2PC)
{ {
@ -251,12 +266,11 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
} }
/* /*
* Call other parts of citus that need to integrate into *
* transaction management. Call them *after* committing/preparing * Check again whether shards/placement successfully
* the remote transactions, to allow marking shards as invalid * committed. This handles failure at COMMIT/PREPARE time.
* (e.g. if the remote commit failed).
*/ */
RouterExecutorPreCommitCheck(); CheckForFailedPlacements(false, CurrentTransactionUse2PC);
} }
break; break;

View File

@ -33,7 +33,11 @@ enum MultiConnectionMode
FORCE_NEW_CONNECTION = 1 << 0, FORCE_NEW_CONNECTION = 1 << 0,
/* mark returned connection as having session lifespan */ /* mark returned connection as having session lifespan */
SESSION_LIFESPAN = 1 << 1 SESSION_LIFESPAN = 1 << 1,
FOR_DDL = 1 << 2,
FOR_DML = 1 << 3
}; };
@ -68,6 +72,9 @@ typedef struct MultiConnection
/* membership in list of in-progress transactions */ /* membership in list of in-progress transactions */
dlist_node transactionNode; dlist_node transactionNode;
/* list of all placements referenced by this connection */
dlist_head referencedPlacements;
} MultiConnection; } MultiConnection;

View File

@ -37,8 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void RouterExecutorPreCommitCheck(void);
extern void RouterExecutorPostCommit(void);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);

View File

@ -0,0 +1,33 @@
/*-------------------------------------------------------------------------
* placement_connection.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PLACEMENT_CONNECTION_H
#define PLACEMENT_CONNECTION_H
#include "distributed/connection_management.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
extern MultiConnection * GetPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern void ResetPlacementConnectionManagement(void);
extern void CheckForFailedPlacements(bool preCommit, bool using2PC);
extern void CloseShardPlacementAssociation(struct MultiConnection *connection);
extern void ResetShardPlacementAssociation(struct MultiConnection *connection);
extern void InitPlacementConnectionManagement(void);
#endif /* PLACEMENT_CONNECTION_H */

View File

@ -140,8 +140,8 @@ INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638 ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT; COMMIT;
-- this logic even applies to router SELECTs occurring after a modification: -- SELECTs may occur after a modification: First check that selecting
-- selecting from the modified node is fine... -- from the modified node works.
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
@ -151,7 +151,7 @@ SELECT count(*) FROM researchers WHERE lab_id = 6;
(1 row) (1 row)
ABORT; ABORT;
-- but if a SELECT needs to go to new node, that's a problem... -- then check that SELECT going to new node still is fine
BEGIN; BEGIN;
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
FROM pg_dist_shard AS s FROM pg_dist_shard AS s
@ -161,8 +161,11 @@ AND sp.nodeport = :worker_1_port
AND s.logicalrelid = 'researchers'::regclass; AND s.logicalrelid = 'researchers'::regclass;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
ERROR: no transaction participant matches localhost:57638 count
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. -------
0
(1 row)
ABORT; ABORT;
-- applies to DDL, too -- applies to DDL, too
BEGIN; BEGIN;
@ -494,7 +497,9 @@ WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637 WARNING: failed to commit transaction on localhost:57637
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638 WARNING: failed to commit transaction on localhost:57638
ERROR: could not commit transaction on any active nodes WARNING: could not commit transaction for shard 1200002 on any active node
WARNING: could not commit transaction for shard 1200003 on any active node
ERROR: could not commit transaction on any active node
-- data should NOT be persisted -- data should NOT be persisted
SELECT * FROM objects WHERE id = 1; SELECT * FROM objects WHERE id = 1;
id | name id | name
@ -530,6 +535,7 @@ INSERT INTO labs VALUES (9, 'BAD');
COMMIT; COMMIT;
WARNING: illegal value WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637 WARNING: failed to commit transaction on localhost:57637
WARNING: could not commit transaction for shard 1200002 on any active node
\set VERBOSITY default \set VERBOSITY default
-- data to objects should be persisted, but labs should not... -- data to objects should be persisted, but labs should not...
SELECT * FROM objects WHERE id = 1; SELECT * FROM objects WHERE id = 1;

View File

@ -111,15 +111,14 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
COMMIT; COMMIT;
-- this logic even applies to router SELECTs occurring after a modification: -- SELECTs may occur after a modification: First check that selecting
-- selecting from the modified node is fine... -- from the modified node works.
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
ABORT; ABORT;
-- but if a SELECT needs to go to new node, that's a problem... -- then check that SELECT going to new node still is fine
BEGIN; BEGIN;
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3 UPDATE pg_dist_shard_placement AS sp SET shardstate = 3