Follow consistent execution order in parallel commands

pull/855/head
Marco Slot 2016-10-18 10:21:52 +02:00
parent a497e7178c
commit 65f6d7c02a
1 changed files with 176 additions and 64 deletions

View File

@ -96,6 +96,8 @@ static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool isModificationQuery, bool expectResults);
static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static void AcquireExecutorMultiShardLocks(List *shardIntervalList);
static bool IsReplicated(List *shardIntervalList);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination,
Tuplestorestate *tupleStore);
@ -252,14 +254,14 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* Bypass commutativity checks when citus.all_modifications_commutative
* is enabled.
*
* A ShareLock does not conflict with itself and therefore allows
* A RowExclusiveLock does not conflict with itself and therefore allows
* multiple commutative commands to proceed concurrently. It does
* conflict with ExclusiveLock, which may still be obtained by another
* session that executes an UPDATE/DELETE/UPSERT command with
* citus.all_modifications_commutative disabled.
*/
lockMode = ShareLock;
lockMode = RowExclusiveLock;
}
else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
@ -291,13 +293,13 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
* UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution
* order.
*
* A ShareLock does not conflict with itself and therefore allows
* A RowExclusiveLock does not conflict with itself and therefore allows
* multiple INSERT commands to proceed concurrently. It conflicts with
* ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do
* not run concurrently with INSERT.
*/
lockMode = ShareLock;
lockMode = RowExclusiveLock;
}
else
{
@ -311,6 +313,81 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
}
/*
* AcquireExecutorMultiShardLocks acquires shard locks need for execution
* of writes on multiple shards.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareUpdateExclusiveLock.
* 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareUpdateExclusiveLock.
*
* ShareUpdateExclusiveLock conflicts with itself such that only one
* multi-shard modification at a time is allowed on a shard. It also conflicts
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
* in the same order on all placements. It does not conflict with ShareLock,
* which is normally obtained by single-shard commutative writes.
*/
static void
AcquireExecutorMultiShardLocks(List *shardIntervalList)
{
LOCKMODE lockMode = NoLock;
if (AllModificationsCommutative || !IsReplicated(shardIntervalList))
{
/*
* When all writes are commutative then we only need to prevent multi-shard
* commands from running concurrently with each other and with commands
* that are explicitly non-commutative. When there is not replication then
* we only need to prevent concurrent multi-shard commands.
*
* In either case, ShareUpdateExclusive has the desired effect, since
* it conflicts with itself and ExclusiveLock (taken by non-commutative
* writes).
*/
lockMode = ShareUpdateExclusiveLock;
}
else
{
/*
* When there is replication, prevent all concurrent writes to the same
* shards to ensure the writes are ordered.
*/
lockMode = ExclusiveLock;
}
LockShardListResources(shardIntervalList, lockMode);
}
/*
* IsReplicated checks whether any of the shards in the given list has more
* than one replica.
*/
static bool
IsReplicated(List *shardIntervalList)
{
ListCell *shardIntervalCell;
bool hasReplication = false;
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList->length > 1)
{
hasReplication = true;
break;
}
}
return hasReplication;
}
/*
* CreateXactParticipantHash initializes the map used to store the connections
* needed to process distributed transactions. Unlike the connection cache, we
@ -422,8 +499,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
deparse_shard_query(query, relationId, task->anchorShardId,
newQueryString);
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
elog(DEBUG4, "query after master evaluation: %s", newQueryString->data);
ereport(DEBUG4, (errmsg("query before master evaluation: %s",
task->queryString)));
ereport(DEBUG4, (errmsg("query after master evaluation: %s",
newQueryString->data)));
task->queryString = newQueryString->data;
}
@ -698,6 +777,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
ListCell *taskCell = NULL;
char *userName = CurrentUserName();
List *shardIntervalList = NIL;
List *affectedTupleCountList = NIL;
bool tasksPending = true;
int placementIndex = 0;
if (XactModificationLevel == XACT_MODIFICATION_DATA)
{
@ -712,76 +794,87 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
LockShardListResources(shardIntervalList, ExclusiveLock);
AcquireExecutorMultiShardLocks(shardIntervalList);
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
/* send command to all relevant shard placements */
foreach(taskCell, taskList)
/* iterate over placements in rounds, to ensure in-order execution */
while (tasksPending)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
char *queryString = task->queryString;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;
int taskIndex = 0;
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
Assert(shardConnectionsFound);
tasksPending = false;
connectionList = shardConnections->connectionList;
Assert(connectionList != NIL);
foreach(connectionCell, connectionList)
/* send command to all shard placements with the current index in parallel */
foreach(taskCell, taskList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
char *queryString = task->queryString;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
TransactionConnection *transactionConnection = NULL;
PGconn *connection = NULL;
bool queryOK = false;
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
connectionList = shardConnections->connectionList;
if (placementIndex >= list_length(connectionList))
{
/* no more active placements for this task */
continue;
}
transactionConnection =
(TransactionConnection *) list_nth(connectionList, placementIndex);
connection = transactionConnection->connection;
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
ReraiseRemoteError(connection, NULL);
}
}
}
/* collects results from all relevant shard placements */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;
int64 affectedTupleCount = 0;
bool gotResults = false;
/* abort in case of cancellation */
CHECK_FOR_INTERRUPTS();
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
connectionList = shardConnections->connectionList;
foreach(connectionCell, connectionList)
/* collects results from all relevant shard placements */
foreach(taskCell, taskList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
TransactionConnection *transactionConnection = NULL;
PGconn *connection = NULL;
int64 currentAffectedTupleCount = 0;
bool failOnError = true;
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
/* abort in case of cancellation */
CHECK_FOR_INTERRUPTS();
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
connectionList = shardConnections->connectionList;
if (placementIndex >= list_length(connectionList))
{
/* no more active placements for this task */
continue;
}
transactionConnection =
(TransactionConnection *) list_nth(connectionList, placementIndex);
connection = transactionConnection->connection;
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (!gotResults && expectResults)
if (placementIndex == 0 && expectResults)
{
Assert(routerState != NULL && tupleDescriptor != NULL);
@ -797,26 +890,45 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
/* should have rolled back on error */
Assert(queryOK);
if (!gotResults)
if (placementIndex == 0)
{
affectedTupleCount = currentAffectedTupleCount;
totalAffectedTupleCount += affectedTupleCount;
}
else if (currentAffectedTupleCount != affectedTupleCount)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
totalAffectedTupleCount += currentAffectedTupleCount;
ereport(WARNING,
(errmsg("modified "INT64_FORMAT " tuples, but expected "
"to modify "INT64_FORMAT,
currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%s", nodeName,
nodePort)));
/* keep track of the initial affected tuple count */
affectedTupleCountList = lappend_int(affectedTupleCountList,
currentAffectedTupleCount);
}
else
{
/* warn the user if shard placements have diverged */
int64 previousAffectedTupleCount = list_nth_int(affectedTupleCountList,
taskIndex);
if (currentAffectedTupleCount != previousAffectedTupleCount)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING,
(errmsg("modified "INT64_FORMAT " tuples of shard "
UINT64_FORMAT ", but expected to modify "INT64_FORMAT,
currentAffectedTupleCount, shardId,
previousAffectedTupleCount),
errdetail("modified placement on %s:%s", nodeName,
nodePort)));
}
}
gotResults = true;
if (!tasksPending && placementIndex + 1 < list_length(connectionList))
{
/* more tasks to be done after thise one */
tasksPending = true;
}
taskIndex++;
}
placementIndex++;
}
CHECK_FOR_INTERRUPTS();