mirror of https://github.com/citusdata/citus.git
Convert router executor to placement connection management infrastructure.
Remove the router specific transaction and shard management, and replace it with the new placement connection API. This mostly leaves behaviour alone, except that it is now, inside a transaction, legal to select from a shard to which no pre-existing connection exists. To simplify code the code handling task executions for select and modify has been split into two - the previous coding was starting to get confusing due to the amount of only conditionally applicable code. Modification connections & transactions are now always established in parallel, not just for reference tables.pull/1079/head
parent
bfa742d794
commit
7320c17f00
|
@ -41,6 +41,7 @@
|
|||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
|
@ -72,30 +73,14 @@
|
|||
/* controls use of locks to enforce safe commutativity */
|
||||
bool AllModificationsCommutative = false;
|
||||
|
||||
|
||||
/*
|
||||
* The following static variables are necessary to track the progression of
|
||||
* multi-statement transactions managed by the router executor. After the first
|
||||
* modification within a transaction, the executor populates a hash with the
|
||||
* transaction's initial participants (nodes hit by that initial modification).
|
||||
*
|
||||
* To keep track of the reverse mapping (from shards to nodes), we have a list
|
||||
* of XactShardConnSets, which map a shard identifier to a set of connection
|
||||
* hash entries. This list is walked by MarkRemainingInactivePlacements to
|
||||
* ensure we mark placements as failed if they reject a COMMIT.
|
||||
*/
|
||||
static HTAB *xactParticipantHash = NULL;
|
||||
static List *xactShardConnSetList = NIL;
|
||||
|
||||
/* functions needed during start phase */
|
||||
static void InitTransactionStateForTask(Task *task);
|
||||
static HTAB * CreateXactParticipantHash(void);
|
||||
|
||||
/* functions needed during run phase */
|
||||
static void ReacquireMetadataLocks(List *taskList);
|
||||
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
static void GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList);
|
||||
static void ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task,
|
||||
bool expectResults);
|
||||
static void ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task);
|
||||
static List * GetModifyConnections(List *taskPlacementList,
|
||||
bool markCritical,
|
||||
bool startedInTransaction);
|
||||
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
|
||||
|
@ -109,23 +94,15 @@ static bool RequiresConsistentSnapshot(Task *task);
|
|||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||
DestReceiver *destination,
|
||||
Tuplestorestate *tupleStore);
|
||||
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
|
||||
bool isModificationQuery);
|
||||
static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
|
||||
static void RemoveXactConnection(PGconn *connection);
|
||||
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
||||
Oid **parameterTypes,
|
||||
const char ***parameterValues);
|
||||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
|
||||
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
|
||||
ParamListInfo paramListInfo);
|
||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||
static bool StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
|
||||
TupleDesc tupleDescriptor, bool failOnError, int64 *rows);
|
||||
static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows);
|
||||
static void RecordShardIdParticipant(uint64 affectedShardId,
|
||||
NodeConnectionEntry *participantEntry);
|
||||
|
||||
/* to verify the health of shards after a transactional modification command */
|
||||
static void MarkRemainingInactivePlacements(void);
|
||||
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
|
||||
int64 *rows);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -232,52 +209,6 @@ ReacquireMetadataLocks(List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitTransactionStateForTask is called during executor start with the first
|
||||
* modifying (INSERT/UPDATE/DELETE) task during a transaction. It creates the
|
||||
* transaction participant hash, opens connections to this task's nodes, and
|
||||
* populates the hash with those connections after sending BEGIN commands to
|
||||
* each. If a node fails to respond, its connection is set to NULL to prevent
|
||||
* further interaction with it during the transaction.
|
||||
*/
|
||||
static void
|
||||
InitTransactionStateForTask(Task *task)
|
||||
{
|
||||
ListCell *placementCell = NULL;
|
||||
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
||||
xactParticipantHash = CreateXactParticipantHash();
|
||||
|
||||
foreach(placementCell, task->taskPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||
NodeConnectionKey participantKey;
|
||||
NodeConnectionEntry *participantEntry = NULL;
|
||||
bool entryFound = false;
|
||||
int connectionFlags = SESSION_LIFESPAN;
|
||||
MultiConnection *connection =
|
||||
GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort);
|
||||
|
||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||
strlcpy(participantKey.nodeName, placement->nodeName,
|
||||
MAX_NODE_LENGTH + 1);
|
||||
participantKey.nodePort = placement->nodePort;
|
||||
|
||||
participantEntry = hash_search(xactParticipantHash, &participantKey,
|
||||
HASH_ENTER, &entryFound);
|
||||
Assert(!entryFound);
|
||||
|
||||
/* issue BEGIN if necessary */
|
||||
RemoteTransactionBeginIfNecessary(connection);
|
||||
|
||||
participantEntry->connection = connection;
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquireExecutorShardLock acquires a lock on the shard for the given task and
|
||||
* command type if necessary to avoid divergence between multiple replicas of
|
||||
|
@ -527,33 +458,6 @@ RequiresConsistentSnapshot(Task *task)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateXactParticipantHash initializes the map used to store the connections
|
||||
* needed to process distributed transactions. Unlike the connection cache, we
|
||||
* permit NULL connections here to signify that a participant has seen an error
|
||||
* and is no longer receiving commands during a transaction. This hash should
|
||||
* be walked at transaction end to send final COMMIT or ABORT commands.
|
||||
*/
|
||||
static HTAB *
|
||||
CreateXactParticipantHash(void)
|
||||
{
|
||||
HTAB *xactParticipantHash = NULL;
|
||||
HASHCTL info;
|
||||
int hashFlags = 0;
|
||||
|
||||
MemSet(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(NodeConnectionKey);
|
||||
info.entrysize = sizeof(NodeConnectionEntry);
|
||||
info.hcxt = TopTransactionContext;
|
||||
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||
|
||||
xactParticipantHash = hash_create("citus xact participant hash", 32, &info,
|
||||
hashFlags);
|
||||
|
||||
return xactParticipantHash;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterExecutorRun actually executes a single task on a worker.
|
||||
*/
|
||||
|
@ -633,13 +537,14 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
|||
if (list_length(taskList) == 1)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
bool resultsOK = false;
|
||||
|
||||
resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery,
|
||||
sendTuples);
|
||||
if (!resultsOK)
|
||||
if (isModificationQuery)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not receive query results")));
|
||||
ExecuteSingleModifyTask(queryDesc, task, sendTuples);
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteSingleSelectTask(queryDesc, task);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -692,18 +597,72 @@ out:
|
|||
|
||||
|
||||
/*
|
||||
* ExecuteSingleTask executes the task on the remote node, retrieves the
|
||||
* results and stores them, if SELECT or RETURNING is used, in a tuple
|
||||
* store.
|
||||
* ExecuteSingleSelectTask executes the task on the remote node, retrieves the
|
||||
* results and stores them in a tuple store.
|
||||
*
|
||||
* If the task fails on one of the placements, the function retries it on
|
||||
* other placements (SELECT), reraises the remote error (constraint violation
|
||||
* in DML), marks the affected placement as invalid (DML on some placement
|
||||
* failed), or errors out (DML failed on all placements).
|
||||
* other placements or errors out if the query fails on all placements.
|
||||
*/
|
||||
static bool
|
||||
ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
||||
bool isModificationQuery,
|
||||
static void
|
||||
ExecuteSingleSelectTask(QueryDesc *queryDesc, Task *task)
|
||||
{
|
||||
TupleDesc tupleDescriptor = queryDesc->tupDesc;
|
||||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||
ParamListInfo paramListInfo = queryDesc->params;
|
||||
List *taskPlacementList = task->taskPlacementList;
|
||||
ListCell *taskPlacementCell = NULL;
|
||||
char *queryString = task->queryString;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("single-shard query may not appear in transaction blocks "
|
||||
"which contain multi-shard data modifications")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to run the query to completion on one placement. If the query fails
|
||||
* attempt the query on the next placement.
|
||||
*/
|
||||
foreach(taskPlacementCell, taskPlacementList)
|
||||
{
|
||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||
bool queryOK = false;
|
||||
bool dontFailOnError = false;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
int connectionFlags = SESSION_LIFESPAN;
|
||||
MultiConnection *connection =
|
||||
GetPlacementConnection(connectionFlags, taskPlacement, NULL);
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
|
||||
dontFailOnError, ¤tAffectedTupleCount);
|
||||
if (queryOK)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ereport(ERROR, (errmsg("could not receive query results")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
|
||||
* results and stores them, if RETURNING is used, in a tuple store.
|
||||
*
|
||||
* If the task fails on one of the placements, the function reraises the
|
||||
* remote error (constraint violation in DML), marks the affected placement as
|
||||
* invalid (other error on some placements, via the placement connection
|
||||
* framework), or errors out (failed on all placements).
|
||||
*/
|
||||
static void
|
||||
ExecuteSingleModifyTask(QueryDesc *queryDesc, Task *task,
|
||||
bool expectResults)
|
||||
{
|
||||
CmdType operation = queryDesc->operation;
|
||||
|
@ -713,12 +672,15 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
|||
ParamListInfo paramListInfo = queryDesc->params;
|
||||
bool resultsOK = false;
|
||||
List *taskPlacementList = task->taskPlacementList;
|
||||
List *connectionList = NIL;
|
||||
ListCell *taskPlacementCell = NULL;
|
||||
List *failedPlacementList = NIL;
|
||||
ListCell *connectionCell = NULL;
|
||||
int64 affectedTupleCount = -1;
|
||||
bool gotResults = false;
|
||||
char *queryString = task->queryString;
|
||||
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
|
||||
bool startedInTransaction =
|
||||
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
||||
{
|
||||
|
@ -729,59 +691,52 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
|||
}
|
||||
|
||||
/*
|
||||
* Firstly ensure that distributed transaction is started. Then, force
|
||||
* the transaction manager to use 2PC while running the task on the placements.
|
||||
* Modifications for reference tables are always done using 2PC. First
|
||||
* ensure that distributed transaction is started. Then force the
|
||||
* transaction manager to use 2PC while running the task on the
|
||||
* placements.
|
||||
*/
|
||||
if (taskRequiresTwoPhaseCommit)
|
||||
{
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
CoordinatedTransactionUse2PC();
|
||||
|
||||
/*
|
||||
* Mark connections for all placements as critical and establish connections
|
||||
* to all placements at once.
|
||||
*/
|
||||
GetPlacementConnectionsReadyForTwoPhaseCommit(taskPlacementList);
|
||||
}
|
||||
|
||||
/*
|
||||
* We could naturally handle function-based transactions (i.e. those
|
||||
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
|
||||
* but some customers already use functions that touch multiple shards
|
||||
* from within a function, so we'll ignore functions for now.
|
||||
* We could naturally handle function-based transactions (i.e. those using
|
||||
* PL/pgSQL or similar) by checking the type of queryDesc->dest, but some
|
||||
* customers already use functions that touch multiple shards from within
|
||||
* a function, so we'll ignore functions for now.
|
||||
*/
|
||||
if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock())
|
||||
if (IsTransactionBlock())
|
||||
{
|
||||
InitTransactionStateForTask(task);
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
}
|
||||
|
||||
/*
|
||||
* Get connections required to execute task. This will, if necessary,
|
||||
* establish the connection, mark as critical (when modifying reference
|
||||
* table) and start a transaction (when in a transaction).
|
||||
*/
|
||||
connectionList = GetModifyConnections(taskPlacementList,
|
||||
taskRequiresTwoPhaseCommit,
|
||||
startedInTransaction);
|
||||
|
||||
/* prevent replicas of the same shard from diverging */
|
||||
AcquireExecutorShardLock(task, operation);
|
||||
|
||||
/*
|
||||
* Try to run the query to completion on one placement. If the query fails
|
||||
* attempt the query on the next placement.
|
||||
*/
|
||||
foreach(taskPlacementCell, taskPlacementList)
|
||||
/* try to execute modification on all placements */
|
||||
forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList)
|
||||
{
|
||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||
bool queryOK = false;
|
||||
bool failOnError = false;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
PGconn *connection = GetConnectionForPlacement(taskPlacement,
|
||||
isModificationQuery);
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
continue;
|
||||
}
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
PurgeConnectionForPlacement(connection, taskPlacement);
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -829,87 +784,88 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
|||
|
||||
resultsOK = true;
|
||||
gotResults = true;
|
||||
|
||||
/*
|
||||
* Modifications have to be executed on all placements, but for
|
||||
* read queries we can stop here.
|
||||
*/
|
||||
if (!isModificationQuery)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
PurgeConnectionForPlacement(connection, taskPlacement);
|
||||
|
||||
failedPlacementList = lappend(failedPlacementList, taskPlacement);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (isModificationQuery)
|
||||
{
|
||||
ListCell *failedPlacementCell = NULL;
|
||||
|
||||
/* if all placements failed, error out */
|
||||
if (list_length(failedPlacementList) == list_length(task->taskPlacementList))
|
||||
if (!resultsOK)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not modify any active placements")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Otherwise, mark failed placements as inactive: they're stale. Note that
|
||||
* connections for tasks that require 2PC has already failed the whole transaction
|
||||
* and there is no way that they're marked stale here.
|
||||
*/
|
||||
foreach(failedPlacementCell, failedPlacementList)
|
||||
{
|
||||
ShardPlacement *failedPlacement =
|
||||
(ShardPlacement *) lfirst(failedPlacementCell);
|
||||
|
||||
Assert(!taskRequiresTwoPhaseCommit);
|
||||
|
||||
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
|
||||
}
|
||||
|
||||
executorState->es_processed = affectedTupleCount;
|
||||
}
|
||||
|
||||
return resultsOK;
|
||||
if (IsTransactionBlock())
|
||||
{
|
||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetPlacementConnectionsReadyForTwoPhaseCommit iterates over the task placement list,
|
||||
* starts the connections to the nodes and marks them critical. In the second iteration,
|
||||
* the connection establishments are finished. Finally, BEGIN commands are sent,
|
||||
* if necessary.
|
||||
* GetModifyConnections returns the list of connections required to execute
|
||||
* modify commands on the placements in tasPlacementList. If necessary remote
|
||||
* transactions are started.
|
||||
*
|
||||
* If markCritical is true remote transactions are marked as critical. If
|
||||
* noNewTransactions is true, this function errors out if there's no
|
||||
* transaction in progress.
|
||||
*/
|
||||
static void
|
||||
GetPlacementConnectionsReadyForTwoPhaseCommit(List *taskPlacementList)
|
||||
static List *
|
||||
GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions)
|
||||
{
|
||||
ListCell *taskPlacementCell = NULL;
|
||||
List *multiConnectionList = NIL;
|
||||
|
||||
/* in the first iteration start the connections */
|
||||
/* first initiate connection establishment for all necessary connections */
|
||||
foreach(taskPlacementCell, taskPlacementList)
|
||||
{
|
||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||
int connectionFlags = SESSION_LIFESPAN;
|
||||
MultiConnection *multiConnection = StartNodeConnection(connectionFlags,
|
||||
taskPlacement->nodeName,
|
||||
taskPlacement->nodePort);
|
||||
int connectionFlags = SESSION_LIFESPAN | FOR_DML;
|
||||
MultiConnection *multiConnection = NULL;
|
||||
|
||||
/*
|
||||
* FIXME: It's not actually correct to use only one shard placement
|
||||
* here for router queries involving multiple relations. We should
|
||||
* check that this connection is the only modifying one associated
|
||||
* with all the involved shards.
|
||||
*/
|
||||
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL);
|
||||
|
||||
/*
|
||||
* If already in a transaction, disallow expanding set of remote
|
||||
* transactions. That prevents some forms of distributed deadlocks.
|
||||
*/
|
||||
if (noNewTransactions)
|
||||
{
|
||||
RemoteTransaction *transaction = &multiConnection->remoteTransaction;
|
||||
|
||||
if (transaction->transactionState == REMOTE_TRANS_INVALID)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||
errmsg("no transaction participant matches %s:%d",
|
||||
taskPlacement->nodeName, taskPlacement->nodePort),
|
||||
errdetail("Transactions which modify distributed tables "
|
||||
"may only target nodes affected by the "
|
||||
"modification command which began the transaction.")));
|
||||
}
|
||||
}
|
||||
|
||||
if (markCritical)
|
||||
{
|
||||
MarkRemoteTransactionCritical(multiConnection);
|
||||
}
|
||||
|
||||
multiConnectionList = lappend(multiConnectionList, multiConnection);
|
||||
}
|
||||
|
||||
/* then finish in parallel */
|
||||
FinishConnectionListEstablishment(multiConnectionList);
|
||||
|
||||
/* and start transactions if applicable */
|
||||
RemoteTransactionsBeginIfNecessary(multiConnectionList);
|
||||
|
||||
return multiConnectionList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1009,8 +965,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
bool shardConnectionsFound = false;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
MultiConnection *multiConnection = NULL;
|
||||
PGconn *connection = NULL;
|
||||
MultiConnection *connection = NULL;
|
||||
bool queryOK = false;
|
||||
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
|
@ -1022,14 +977,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
continue;
|
||||
}
|
||||
|
||||
multiConnection =
|
||||
connection =
|
||||
(MultiConnection *) list_nth(connectionList, placementIndex);
|
||||
connection = multiConnection->pgConn;
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
ReraiseRemoteError(connection, NULL);
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1041,8 +995,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
bool shardConnectionsFound = false;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
MultiConnection *multiConnection = NULL;
|
||||
PGconn *connection = NULL;
|
||||
MultiConnection *connection = NULL;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
bool failOnError = true;
|
||||
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
|
||||
|
@ -1060,9 +1013,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
continue;
|
||||
}
|
||||
|
||||
multiConnection =
|
||||
(MultiConnection *) list_nth(connectionList, placementIndex);
|
||||
connection = multiConnection->pgConn;
|
||||
connection = (MultiConnection *) list_nth(connectionList, placementIndex);
|
||||
|
||||
/*
|
||||
* If caller is interested, store query results the first time
|
||||
|
@ -1101,16 +1052,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
|
||||
if (currentAffectedTupleCount != previousAffectedTupleCount)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
ereport(WARNING,
|
||||
(errmsg("modified "INT64_FORMAT " tuples of shard "
|
||||
UINT64_FORMAT ", but expected to modify "INT64_FORMAT,
|
||||
currentAffectedTupleCount, shardId,
|
||||
previousAffectedTupleCount),
|
||||
errdetail("modified placement on %s:%s", nodeName,
|
||||
nodePort)));
|
||||
errdetail("modified placement on %s:%d",
|
||||
connection->hostname, connection->port)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1199,143 +1147,14 @@ ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetConnectionForPlacement is the main entry point for acquiring a connection
|
||||
* within the router executor. By using placements (rather than node names and
|
||||
* ports) to identify connections, the router executor can keep track of shards
|
||||
* used by multi-statement transactions and error out if a transaction tries
|
||||
* to reach a new node altogether). In the single-statement commands context,
|
||||
* GetConnectionForPlacement simply falls through to GetOrEstablishConnection.
|
||||
*/
|
||||
static PGconn *
|
||||
GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
||||
{
|
||||
NodeConnectionKey participantKey;
|
||||
NodeConnectionEntry *participantEntry = NULL;
|
||||
bool entryFound = false;
|
||||
|
||||
/* if not in a transaction, fall through to connection cache */
|
||||
if (xactParticipantHash == NULL)
|
||||
{
|
||||
PGconn *connection = GetOrEstablishConnection(placement->nodeName,
|
||||
placement->nodePort);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
Assert(IsTransactionBlock());
|
||||
|
||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||
strlcpy(participantKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
||||
participantKey.nodePort = placement->nodePort;
|
||||
|
||||
participantEntry = hash_search(xactParticipantHash, &participantKey, HASH_FIND,
|
||||
&entryFound);
|
||||
|
||||
if (entryFound)
|
||||
{
|
||||
if (isModificationQuery)
|
||||
{
|
||||
RecordShardIdParticipant(placement->shardId, participantEntry);
|
||||
}
|
||||
|
||||
return participantEntry->connection->pgConn;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||
errmsg("no transaction participant matches %s:%d",
|
||||
placement->nodeName, placement->nodePort),
|
||||
errdetail("Transactions which modify distributed tables may only "
|
||||
"target nodes affected by the modification command "
|
||||
"which began the transaction.")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PurgeConnectionForPlacement provides a way to purge an invalid connection
|
||||
* from all relevant connection hashes using the placement involved in the
|
||||
* query at the time of the error. If a transaction is ongoing, this function
|
||||
* ensures the right node's connection is set to NULL in the participant map
|
||||
* for the transaction in addition to purging the connection cache's entry.
|
||||
*/
|
||||
static void
|
||||
PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement)
|
||||
{
|
||||
CloseConnectionByPGconn(connection);
|
||||
|
||||
/*
|
||||
* The following is logically identical to RemoveXactConnection, but since
|
||||
* we have a ShardPlacement to help build a NodeConnectionKey, we avoid
|
||||
* any penalty incurred by calling BuildKeyForConnection, which must ex-
|
||||
* tract host, port, and user from the connection options list.
|
||||
*/
|
||||
if (xactParticipantHash != NULL)
|
||||
{
|
||||
NodeConnectionEntry *participantEntry = NULL;
|
||||
bool entryFound = false;
|
||||
NodeConnectionKey nodeKey;
|
||||
char *currentUser = CurrentUserName();
|
||||
|
||||
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
|
||||
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
|
||||
nodeKey.nodePort = placement->nodePort;
|
||||
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
|
||||
Assert(IsTransactionBlock());
|
||||
|
||||
/* the participant hash doesn't use the user field */
|
||||
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
||||
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
||||
&entryFound);
|
||||
|
||||
Assert(entryFound);
|
||||
|
||||
participantEntry->connection = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Removes a given connection from the transaction participant hash, based on
|
||||
* the host and port of the provided connection. If the hash is not NULL, it
|
||||
* MUST contain the provided connection, or a FATAL error is raised.
|
||||
*/
|
||||
static void
|
||||
RemoveXactConnection(PGconn *connection)
|
||||
{
|
||||
NodeConnectionKey nodeKey;
|
||||
NodeConnectionEntry *participantEntry = NULL;
|
||||
bool entryFound = false;
|
||||
|
||||
if (xactParticipantHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
BuildKeyForConnection(connection, &nodeKey);
|
||||
|
||||
/* the participant hash doesn't use the user field */
|
||||
MemSet(&nodeKey.nodeUser, 0, sizeof(nodeKey.nodeUser));
|
||||
participantEntry = hash_search(xactParticipantHash, &nodeKey, HASH_FIND,
|
||||
&entryFound);
|
||||
|
||||
if (!entryFound)
|
||||
{
|
||||
ereport(FATAL, (errmsg("could not find specified transaction connection")));
|
||||
}
|
||||
|
||||
participantEntry->connection = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendQueryInSingleRowMode sends the given query on the connection in an
|
||||
* asynchronous way. The function also sets the single-row mode on the
|
||||
* connection so that we receive results a row at a time.
|
||||
*/
|
||||
static bool
|
||||
SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramListInfo)
|
||||
SendQueryInSingleRowMode(MultiConnection *connection, char *query,
|
||||
ParamListInfo paramListInfo)
|
||||
{
|
||||
int querySent = 0;
|
||||
int singleRowMode = 0;
|
||||
|
@ -1349,24 +1168,27 @@ SendQueryInSingleRowMode(PGconn *connection, char *query, ParamListInfo paramLis
|
|||
ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes,
|
||||
¶meterValues);
|
||||
|
||||
querySent = PQsendQueryParams(connection, query, parameterCount, parameterTypes,
|
||||
parameterValues, NULL, NULL, 0);
|
||||
querySent = PQsendQueryParams(connection->pgConn, query,
|
||||
parameterCount, parameterTypes, parameterValues,
|
||||
NULL, NULL, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
querySent = PQsendQuery(connection, query);
|
||||
querySent = PQsendQuery(connection->pgConn, query);
|
||||
}
|
||||
|
||||
if (querySent == 0)
|
||||
{
|
||||
WarnRemoteError(connection, NULL);
|
||||
MarkRemoteTransactionFailed(connection, false);
|
||||
ReportConnectionError(connection, WARNING);
|
||||
return false;
|
||||
}
|
||||
|
||||
singleRowMode = PQsetSingleRowMode(connection);
|
||||
singleRowMode = PQsetSingleRowMode(connection->pgConn);
|
||||
if (singleRowMode == 0)
|
||||
{
|
||||
WarnRemoteError(connection, NULL);
|
||||
MarkRemoteTransactionFailed(connection, false);
|
||||
ReportConnectionError(connection, WARNING);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1451,7 +1273,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
|
|||
* the connection.
|
||||
*/
|
||||
static bool
|
||||
StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||
StoreQueryResult(MaterialState *routerState, MultiConnection *connection,
|
||||
TupleDesc tupleDescriptor, bool failOnError, int64 *rows)
|
||||
{
|
||||
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||
|
@ -1486,7 +1308,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
uint32 columnCount = 0;
|
||||
ExecStatusType resultStatus = 0;
|
||||
|
||||
PGresult *result = PQgetResult(connection);
|
||||
PGresult *result = PQgetResult(connection->pgConn);
|
||||
if (result == NULL)
|
||||
{
|
||||
break;
|
||||
|
@ -1499,6 +1321,8 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
int category = 0;
|
||||
bool isConstraintViolation = false;
|
||||
|
||||
MarkRemoteTransactionFailed(connection, false);
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
* fail fast because we must get the same error from all shard
|
||||
|
@ -1509,12 +1333,11 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
|
||||
if (isConstraintViolation || failOnError)
|
||||
{
|
||||
RemoveXactConnection(connection);
|
||||
ReraiseRemoteError(connection, result);
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
else
|
||||
{
|
||||
WarnRemoteError(connection, result);
|
||||
ReportResultError(connection, result, WARNING);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -1579,7 +1402,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
* has been an error.
|
||||
*/
|
||||
static bool
|
||||
ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
||||
ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
|
||||
{
|
||||
bool commandFailed = false;
|
||||
bool gotResponse = false;
|
||||
|
@ -1593,7 +1416,7 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
|||
*/
|
||||
while (true)
|
||||
{
|
||||
PGresult *result = PQgetResult(connection);
|
||||
PGresult *result = PQgetResult(connection->pgConn);
|
||||
ExecStatusType status = PGRES_COMMAND_OK;
|
||||
|
||||
if (result == NULL)
|
||||
|
@ -1611,6 +1434,8 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
|||
int category = 0;
|
||||
bool isConstraintViolation = false;
|
||||
|
||||
MarkRemoteTransactionFailed(connection, false);
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
* fail fast because we must get the same error from all shard
|
||||
|
@ -1621,13 +1446,13 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
|||
|
||||
if (isConstraintViolation || failOnError)
|
||||
{
|
||||
RemoveXactConnection(connection);
|
||||
ReraiseRemoteError(connection, result);
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
else
|
||||
{
|
||||
WarnRemoteError(connection, result);
|
||||
ReportResultError(connection, result, WARNING);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
commandFailed = true;
|
||||
|
@ -1667,50 +1492,6 @@ ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RecordShardIdParticipant registers a connection as being involved with a
|
||||
* particular shard during a multi-statement transaction.
|
||||
*/
|
||||
static void
|
||||
RecordShardIdParticipant(uint64 affectedShardId, NodeConnectionEntry *participantEntry)
|
||||
{
|
||||
XactShardConnSet *shardConnSetMatch = NULL;
|
||||
ListCell *listCell = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
List *connectionEntryList = NIL;
|
||||
|
||||
/* check whether an entry already exists for this shard */
|
||||
foreach(listCell, xactShardConnSetList)
|
||||
{
|
||||
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(listCell);
|
||||
|
||||
if (shardConnSet->shardId == affectedShardId)
|
||||
{
|
||||
shardConnSetMatch = shardConnSet;
|
||||
}
|
||||
}
|
||||
|
||||
/* entries must last through the whole top-level transaction */
|
||||
oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||
|
||||
/* if no entry found, make one */
|
||||
if (shardConnSetMatch == NULL)
|
||||
{
|
||||
shardConnSetMatch = (XactShardConnSet *) palloc0(sizeof(XactShardConnSet));
|
||||
shardConnSetMatch->shardId = affectedShardId;
|
||||
|
||||
xactShardConnSetList = lappend(xactShardConnSetList, shardConnSetMatch);
|
||||
}
|
||||
|
||||
/* add connection, avoiding duplicates */
|
||||
connectionEntryList = shardConnSetMatch->connectionEntryList;
|
||||
shardConnSetMatch->connectionEntryList = list_append_unique_ptr(connectionEntryList,
|
||||
participantEntry);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterExecutorFinish cleans up after a distributed execution.
|
||||
*/
|
||||
|
@ -1745,114 +1526,3 @@ RouterExecutorEnd(QueryDesc *queryDesc)
|
|||
queryDesc->estate = NULL;
|
||||
queryDesc->totaltime = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RouterExecutorPreCommitCheck() gets called after remote transactions have
|
||||
* committed, so it can invalidate failed shards and perform related checks.
|
||||
*/
|
||||
void
|
||||
RouterExecutorPreCommitCheck(void)
|
||||
{
|
||||
/* no transactional router modification were issued, nothing to do */
|
||||
if (xactParticipantHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MarkRemainingInactivePlacements();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Cleanup callback called after a transaction commits or aborts.
|
||||
*/
|
||||
void
|
||||
RouterExecutorPostCommit(void)
|
||||
{
|
||||
/* reset transaction state */
|
||||
xactParticipantHash = NULL;
|
||||
xactShardConnSetList = NIL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkRemainingInactivePlacements takes care of marking placements of a shard
|
||||
* inactive after some of the placements rejected the final COMMIT phase of a
|
||||
* transaction.
|
||||
*
|
||||
* Failures are detected by checking the connection & transaction state for
|
||||
* each of the entries in the connection set for each shard.
|
||||
*/
|
||||
static void
|
||||
MarkRemainingInactivePlacements(void)
|
||||
{
|
||||
ListCell *shardConnSetCell = NULL;
|
||||
int totalSuccesses = 0;
|
||||
|
||||
if (xactParticipantHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
foreach(shardConnSetCell, xactShardConnSetList)
|
||||
{
|
||||
XactShardConnSet *shardConnSet = (XactShardConnSet *) lfirst(shardConnSetCell);
|
||||
List *participantList = shardConnSet->connectionEntryList;
|
||||
ListCell *participantCell = NULL;
|
||||
int successes = list_length(participantList); /* assume full success */
|
||||
|
||||
/* determine how many actual successes there were: subtract failures */
|
||||
foreach(participantCell, participantList)
|
||||
{
|
||||
NodeConnectionEntry *participant =
|
||||
(NodeConnectionEntry *) lfirst(participantCell);
|
||||
MultiConnection *connection = participant->connection;
|
||||
|
||||
/*
|
||||
* Fail if the connection has been set to NULL after an error, or
|
||||
* if the transaction failed for other reasons (e.g. COMMIT
|
||||
* failed).
|
||||
*/
|
||||
if (connection == NULL || connection->remoteTransaction.transactionFailed)
|
||||
{
|
||||
successes--;
|
||||
}
|
||||
}
|
||||
|
||||
/* if no nodes succeeded for this shard, don't do anything */
|
||||
if (successes == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* otherwise, ensure failed placements are marked inactive */
|
||||
foreach(participantCell, participantList)
|
||||
{
|
||||
NodeConnectionEntry *participant = NULL;
|
||||
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
||||
|
||||
if (participant->connection == NULL ||
|
||||
participant->connection->remoteTransaction.transactionFailed)
|
||||
{
|
||||
uint64 shardId = shardConnSet->shardId;
|
||||
NodeConnectionKey *nodeKey = &participant->cacheKey;
|
||||
uint64 shardLength = 0;
|
||||
uint64 placementId = INVALID_PLACEMENT_ID;
|
||||
|
||||
placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName,
|
||||
nodeKey->nodePort);
|
||||
InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength,
|
||||
nodeKey->nodeName, nodeKey->nodePort);
|
||||
}
|
||||
}
|
||||
|
||||
totalSuccesses++;
|
||||
}
|
||||
|
||||
/* If no shards could be modified at all, error out. */
|
||||
if (totalSuccesses == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
|
@ -150,7 +149,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
* callbacks still can perform work if needed.
|
||||
*/
|
||||
ResetShardPlacementTransactionState();
|
||||
RouterExecutorPostCommit();
|
||||
|
||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
|
||||
{
|
||||
|
@ -187,7 +185,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
* callbacks still can perform work if needed.
|
||||
*/
|
||||
ResetShardPlacementTransactionState();
|
||||
RouterExecutorPostCommit();
|
||||
|
||||
/* handles both already prepared and open transactions */
|
||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
||||
|
@ -268,14 +265,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
|
||||
}
|
||||
|
||||
/*
|
||||
* Call other parts of citus that need to integrate into
|
||||
* transaction management. Call them *after* committing/preparing
|
||||
* the remote transactions, to allow marking shards as invalid
|
||||
* (e.g. if the remote commit failed).
|
||||
*/
|
||||
RouterExecutorPreCommitCheck();
|
||||
|
||||
/*
|
||||
*
|
||||
* Check again whether shards/placement successfully
|
||||
|
|
|
@ -37,8 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList
|
|||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||
extern void RouterExecutorPreCommitCheck(void);
|
||||
extern void RouterExecutorPostCommit(void);
|
||||
|
||||
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
||||
|
||||
|
|
|
@ -140,8 +140,8 @@ INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
|||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
COMMIT;
|
||||
-- this logic even applies to router SELECTs occurring after a modification:
|
||||
-- selecting from the modified node is fine...
|
||||
-- SELECTs may occur after a modification: First check that selecting
|
||||
-- from the modified node works.
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
|
@ -151,7 +151,7 @@ SELECT count(*) FROM researchers WHERE lab_id = 6;
|
|||
(1 row)
|
||||
|
||||
ABORT;
|
||||
-- but if a SELECT needs to go to new node, that's a problem...
|
||||
-- then check that SELECT going to new node still is fine
|
||||
BEGIN;
|
||||
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
|
||||
FROM pg_dist_shard AS s
|
||||
|
@ -161,8 +161,11 @@ AND sp.nodeport = :worker_1_port
|
|||
AND s.logicalrelid = 'researchers'::regclass;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
-- applies to DDL, too
|
||||
BEGIN;
|
||||
|
@ -494,7 +497,9 @@ WARNING: illegal value
|
|||
WARNING: failed to commit transaction on localhost:57637
|
||||
WARNING: illegal value
|
||||
WARNING: failed to commit transaction on localhost:57638
|
||||
ERROR: could not commit transaction on any active nodes
|
||||
WARNING: could not commit transaction for shard 1200002 on any active node
|
||||
WARNING: could not commit transaction for shard 1200003 on any active node
|
||||
ERROR: could not commit transaction on any active node
|
||||
-- data should NOT be persisted
|
||||
SELECT * FROM objects WHERE id = 1;
|
||||
id | name
|
||||
|
@ -530,6 +535,7 @@ INSERT INTO labs VALUES (9, 'BAD');
|
|||
COMMIT;
|
||||
WARNING: illegal value
|
||||
WARNING: failed to commit transaction on localhost:57637
|
||||
WARNING: could not commit transaction for shard 1200002 on any active node
|
||||
\set VERBOSITY default
|
||||
-- data to objects should be persisted, but labs should not...
|
||||
SELECT * FROM objects WHERE id = 1;
|
||||
|
|
|
@ -111,15 +111,14 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
|||
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||
COMMIT;
|
||||
|
||||
-- this logic even applies to router SELECTs occurring after a modification:
|
||||
-- selecting from the modified node is fine...
|
||||
-- SELECTs may occur after a modification: First check that selecting
|
||||
-- from the modified node works.
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
ABORT;
|
||||
|
||||
-- but if a SELECT needs to go to new node, that's a problem...
|
||||
|
||||
-- then check that SELECT going to new node still is fine
|
||||
BEGIN;
|
||||
|
||||
UPDATE pg_dist_shard_placement AS sp SET shardstate = 3
|
||||
|
|
Loading…
Reference in New Issue