From 98a2e094abcb37f06deddfa873ae137f1cc26f4b Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Thu, 31 Mar 2016 13:12:21 -0600 Subject: [PATCH] Wrap UPDATE/DELETE in xact, remove locking Doesn't handle errors or failures well, but works. --- .../executor/multi_router_executor.c | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2c759f721..4f5deb1db 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -136,7 +136,7 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = ExclusiveLock; + lockMode = NoLock; } else { @@ -195,12 +195,28 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) + if (operation == CMD_INSERT) { int32 affectedRowCount = ExecuteDistributedModify(task); estate->es_processed = affectedRowCount; } + else if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + int32 affectedRowCount = -1; + char *originalQueryString = task->queryString; + StringInfo modifiedQueryString = makeStringInfo(); + + appendStringInfoString(modifiedQueryString, "BEGIN; "); + appendStringInfoString(modifiedQueryString, originalQueryString); + + task->queryString = modifiedQueryString->data; + affectedRowCount = ExecuteDistributedModify(task); + + task->queryString = "COMMIT"; + ExecuteDistributedModify(task); + + estate->es_processed = affectedRowCount; + } else if (operation == CMD_SELECT) { DestReceiver *destination = queryDesc->dest; @@ -269,7 +285,11 @@ ExecuteDistributedModify(Task *task) } currentAffectedTupleString = PQcmdTuples(result); - currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0); + if (*currentAffectedTupleString != '\0') + { + currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, + sizeof(int32), 0); + } if ((affectedTupleCount == -1) || (affectedTupleCount == currentAffectedTupleCount))