diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4f5deb1db..1e640ba0e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -116,27 +116,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) { LOCKMODE lockMode = NoLock; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) - { - return ShareLock; - } - if (commandType == CMD_SELECT) { lockMode = NoLock; } else if (upsertQuery) { - lockMode = ExclusiveLock; + lockMode = RowExclusiveLock; } else if (commandType == CMD_INSERT) { - lockMode = ShareLock; + lockMode = NoLock; } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = NoLock; + lockMode = RowExclusiveLock; } else { @@ -195,12 +189,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT) - { - int32 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_UPDATE || operation == CMD_DELETE) + else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE) { int32 affectedRowCount = -1; char *originalQueryString = task->queryString; @@ -217,6 +206,11 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas estate->es_processed = affectedRowCount; } + else if (operation == CMD_INSERT) + { + int32 affectedRowCount = ExecuteDistributedModify(task); + estate->es_processed = affectedRowCount; + } else if (operation == CMD_SELECT) { DestReceiver *destination = queryDesc->dest; @@ -302,6 +296,7 @@ ExecuteDistributedModify(Task *task) currentAffectedTupleCount, affectedTupleCount), errdetail("modified placement on %s:%d", nodeName, nodePort))); + failedPlacementList = lappend(failedPlacementList, taskPlacement); } PQclear(result);