diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6c0238a23..bbce5d070 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -96,6 +96,8 @@ static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList, bool isModificationQuery, bool expectResults); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); +static void AcquireExecutorMultiShardLocks(List *shardIntervalList); +static bool IsReplicated(List *shardIntervalList); static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, DestReceiver *destination, Tuplestorestate *tupleStore); @@ -252,14 +254,14 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * Bypass commutativity checks when citus.all_modifications_commutative * is enabled. * - * A ShareLock does not conflict with itself and therefore allows + * A RowExclusiveLock does not conflict with itself and therefore allows * multiple commutative commands to proceed concurrently. It does * conflict with ExclusiveLock, which may still be obtained by another * session that executes an UPDATE/DELETE/UPSERT command with * citus.all_modifications_commutative disabled. */ - lockMode = ShareLock; + lockMode = RowExclusiveLock; } else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) { @@ -291,13 +293,13 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) * UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution * order. * - * A ShareLock does not conflict with itself and therefore allows + * A RowExclusiveLock does not conflict with itself and therefore allows * multiple INSERT commands to proceed concurrently. It conflicts with * ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do * not run concurrently with INSERT. */ - lockMode = ShareLock; + lockMode = RowExclusiveLock; } else { @@ -311,6 +313,81 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } +/* + * AcquireExecutorMultiShardLocks acquires shard locks need for execution + * of writes on multiple shards. + * + * 1. If citus.all_modifications_commutative is set to true, then all locks + * are acquired as ShareUpdateExclusiveLock. + * 2. If citus.all_modifications_commutative is false, then only the shards + * with 2 or more replicas are locked with ExclusiveLock. Otherwise, the + * lock is acquired with ShareUpdateExclusiveLock. + * + * ShareUpdateExclusiveLock conflicts with itself such that only one + * multi-shard modification at a time is allowed on a shard. It also conflicts + * with ExclusiveLock, which ensures that updates/deletes/upserts are applied + * in the same order on all placements. It does not conflict with ShareLock, + * which is normally obtained by single-shard commutative writes. + */ +static void +AcquireExecutorMultiShardLocks(List *shardIntervalList) +{ + LOCKMODE lockMode = NoLock; + + if (AllModificationsCommutative || !IsReplicated(shardIntervalList)) + { + /* + * When all writes are commutative then we only need to prevent multi-shard + * commands from running concurrently with each other and with commands + * that are explicitly non-commutative. When there is not replication then + * we only need to prevent concurrent multi-shard commands. + * + * In either case, ShareUpdateExclusive has the desired effect, since + * it conflicts with itself and ExclusiveLock (taken by non-commutative + * writes). + */ + + lockMode = ShareUpdateExclusiveLock; + } + else + { + /* + * When there is replication, prevent all concurrent writes to the same + * shards to ensure the writes are ordered. + */ + lockMode = ExclusiveLock; + } + + LockShardListResources(shardIntervalList, lockMode); +} + + +/* + * IsReplicated checks whether any of the shards in the given list has more + * than one replica. + */ +static bool +IsReplicated(List *shardIntervalList) +{ + ListCell *shardIntervalCell; + bool hasReplication = false; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + List *shardPlacementList = FinalizedShardPlacementList(shardId); + if (shardPlacementList->length > 1) + { + hasReplication = true; + break; + } + } + + return hasReplication; +} + + /* * CreateXactParticipantHash initializes the map used to store the connections * needed to process distributed transactions. Unlike the connection cache, we @@ -422,8 +499,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) deparse_shard_query(query, relationId, task->anchorShardId, newQueryString); - elog(DEBUG4, "query before master evaluation: %s", task->queryString); - elog(DEBUG4, "query after master evaluation: %s", newQueryString->data); + ereport(DEBUG4, (errmsg("query before master evaluation: %s", + task->queryString))); + ereport(DEBUG4, (errmsg("query after master evaluation: %s", + newQueryString->data))); task->queryString = newQueryString->data; } @@ -698,6 +777,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn ListCell *taskCell = NULL; char *userName = CurrentUserName(); List *shardIntervalList = NIL; + List *affectedTupleCountList = NIL; + bool tasksPending = true; + int placementIndex = 0; if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -712,76 +794,87 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn shardIntervalList = TaskShardIntervalList(taskList); /* ensure that there are no concurrent modifications on the same shards */ - LockShardListResources(shardIntervalList, ExclusiveLock); + AcquireExecutorMultiShardLocks(shardIntervalList); /* open connection to all relevant placements, if not already open */ OpenTransactionsToAllShardPlacements(shardIntervalList, userName); - /* send command to all relevant shard placements */ - foreach(taskCell, taskList) + /* iterate over placements in rounds, to ensure in-order execution */ + while (tasksPending) { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - char *queryString = task->queryString; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - List *connectionList = NIL; - ListCell *connectionCell = NULL; + int taskIndex = 0; - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); + tasksPending = false; - connectionList = shardConnections->connectionList; - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) + /* send command to all shard placements with the current index in parallel */ + foreach(taskCell, taskList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + char *queryString = task->queryString; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + TransactionConnection *transactionConnection = NULL; + PGconn *connection = NULL; bool queryOK = false; + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + if (placementIndex >= list_length(connectionList)) + { + /* no more active placements for this task */ + continue; + } + + transactionConnection = + (TransactionConnection *) list_nth(connectionList, placementIndex); + connection = transactionConnection->connection; + queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) { ReraiseRemoteError(connection, NULL); } } - } - /* collects results from all relevant shard placements */ - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - int64 shardId = task->anchorShardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - List *connectionList = NIL; - ListCell *connectionCell = NULL; - int64 affectedTupleCount = 0; - bool gotResults = false; - - /* abort in case of cancellation */ - CHECK_FOR_INTERRUPTS(); - - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - connectionList = shardConnections->connectionList; - - foreach(connectionCell, connectionList) + /* collects results from all relevant shard placements */ + foreach(taskCell, taskList) { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; + Task *task = (Task *) lfirst(taskCell); + int64 shardId = task->anchorShardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + List *connectionList = NIL; + TransactionConnection *transactionConnection = NULL; + PGconn *connection = NULL; int64 currentAffectedTupleCount = 0; bool failOnError = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; + /* abort in case of cancellation */ + CHECK_FOR_INTERRUPTS(); + + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + connectionList = shardConnections->connectionList; + + if (placementIndex >= list_length(connectionList)) + { + /* no more active placements for this task */ + continue; + } + + transactionConnection = + (TransactionConnection *) list_nth(connectionList, placementIndex); + connection = transactionConnection->connection; + /* * If caller is interested, store query results the first time * through. The output of the query's execution on other shards is * discarded if we run there (because it's a modification query). */ - if (!gotResults && expectResults) + if (placementIndex == 0 && expectResults) { Assert(routerState != NULL && tupleDescriptor != NULL); @@ -797,26 +890,45 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn /* should have rolled back on error */ Assert(queryOK); - if (!gotResults) + if (placementIndex == 0) { - affectedTupleCount = currentAffectedTupleCount; - totalAffectedTupleCount += affectedTupleCount; - } - else if (currentAffectedTupleCount != affectedTupleCount) - { - char *nodeName = ConnectionGetOptionValue(connection, "host"); - char *nodePort = ConnectionGetOptionValue(connection, "port"); + totalAffectedTupleCount += currentAffectedTupleCount; - ereport(WARNING, - (errmsg("modified "INT64_FORMAT " tuples, but expected " - "to modify "INT64_FORMAT, - currentAffectedTupleCount, affectedTupleCount), - errdetail("modified placement on %s:%s", nodeName, - nodePort))); + /* keep track of the initial affected tuple count */ + affectedTupleCountList = lappend_int(affectedTupleCountList, + currentAffectedTupleCount); + } + else + { + /* warn the user if shard placements have diverged */ + int64 previousAffectedTupleCount = list_nth_int(affectedTupleCountList, + taskIndex); + + if (currentAffectedTupleCount != previousAffectedTupleCount) + { + char *nodeName = ConnectionGetOptionValue(connection, "host"); + char *nodePort = ConnectionGetOptionValue(connection, "port"); + + ereport(WARNING, + (errmsg("modified "INT64_FORMAT " tuples of shard " + UINT64_FORMAT ", but expected to modify "INT64_FORMAT, + currentAffectedTupleCount, shardId, + previousAffectedTupleCount), + errdetail("modified placement on %s:%s", nodeName, + nodePort))); + } } - gotResults = true; + if (!tasksPending && placementIndex + 1 < list_length(connectionList)) + { + /* more tasks to be done after thise one */ + tasksPending = true; + } + + taskIndex++; } + + placementIndex++; } CHECK_FOR_INTERRUPTS();