diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 46b7b9b7d..933c61099 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -127,27 +127,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) { LOCKMODE lockMode = NoLock; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) - { - return ShareLock; - } - if (commandType == CMD_SELECT) { lockMode = NoLock; } else if (upsertQuery) { - lockMode = ExclusiveLock; + lockMode = RowExclusiveLock; } else if (commandType == CMD_INSERT) { - lockMode = ShareLock; + lockMode = NoLock; } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = ExclusiveLock; + lockMode = RowExclusiveLock; } else { @@ -200,8 +194,24 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) + else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE) + { + int32 affectedRowCount = -1; + char *originalQueryString = task->queryString; + StringInfo modifiedQueryString = makeStringInfo(); + + appendStringInfoString(modifiedQueryString, "BEGIN; "); + appendStringInfoString(modifiedQueryString, originalQueryString); + + task->queryString = modifiedQueryString->data; + affectedRowCount = ExecuteDistributedModify(task); + + task->queryString = "COMMIT"; + ExecuteDistributedModify(task); + + estate->es_processed = affectedRowCount; + } + else if (operation == CMD_INSERT) { int32 affectedRowCount = ExecuteDistributedModify(task); estate->es_processed = affectedRowCount; @@ -275,7 +285,11 @@ ExecuteDistributedModify(Task *task) } currentAffectedTupleString = PQcmdTuples(result); - currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0); + if (*currentAffectedTupleString != '\0') + { + currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, + sizeof(int32), 0); + } if ((affectedTupleCount == -1) || (affectedTupleCount == currentAffectedTupleCount)) @@ -288,6 +302,7 @@ ExecuteDistributedModify(Task *task) currentAffectedTupleCount, affectedTupleCount), errdetail("modified placement on %s:%d", nodeName, nodePort))); + failedPlacementList = lappend(failedPlacementList, taskPlacement); } PQclear(result);