From 98a2e094abcb37f06deddfa873ae137f1cc26f4b Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Thu, 31 Mar 2016 13:12:21 -0600 Subject: [PATCH 1/2] Wrap UPDATE/DELETE in xact, remove locking Doesn't handle errors or failures well, but works. --- .../executor/multi_router_executor.c | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2c759f721..4f5deb1db 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -136,7 +136,7 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = ExclusiveLock; + lockMode = NoLock; } else { @@ -195,12 +195,28 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) + if (operation == CMD_INSERT) { int32 affectedRowCount = ExecuteDistributedModify(task); estate->es_processed = affectedRowCount; } + else if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + int32 affectedRowCount = -1; + char *originalQueryString = task->queryString; + StringInfo modifiedQueryString = makeStringInfo(); + + appendStringInfoString(modifiedQueryString, "BEGIN; "); + appendStringInfoString(modifiedQueryString, originalQueryString); + + task->queryString = modifiedQueryString->data; + affectedRowCount = ExecuteDistributedModify(task); + + task->queryString = "COMMIT"; + ExecuteDistributedModify(task); + + estate->es_processed = affectedRowCount; + } else if (operation == CMD_SELECT) { DestReceiver *destination = queryDesc->dest; @@ -269,7 +285,11 @@ ExecuteDistributedModify(Task *task) } currentAffectedTupleString = PQcmdTuples(result); - currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0); + if (*currentAffectedTupleString != '\0') + { + currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, + sizeof(int32), 0); + } if ((affectedTupleCount == -1) || (affectedTupleCount == currentAffectedTupleCount)) From a90933990bd2b96b2abce2d437f604c5a36bd21c Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 20 Apr 2016 12:49:52 -0600 Subject: [PATCH 2/2] Address UPSERT needs and INSERT race conditions UPSERTs should largely be treated like UPDATE as far as master logic is concerned. I've switched UPDATE/DELETE/UPSERT to use the RowExclusive lock mode, which is not self-exclusive, but does conflict with Share. This is important for preserving existing multi-COPY behavior. Additionally, INSERTs can now use NoLock, but if a task finds it has modified a different number of rows than a previous placement's run, the placement with the discrepancy must be marked INACTIVE. --- .../executor/multi_router_executor.c | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4f5deb1db..1e640ba0e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -116,27 +116,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) { LOCKMODE lockMode = NoLock; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) - { - return ShareLock; - } - if (commandType == CMD_SELECT) { lockMode = NoLock; } else if (upsertQuery) { - lockMode = ExclusiveLock; + lockMode = RowExclusiveLock; } else if (commandType == CMD_INSERT) { - lockMode = ShareLock; + lockMode = NoLock; } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = NoLock; + lockMode = RowExclusiveLock; } else { @@ -195,12 +189,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT) - { - int32 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_UPDATE || operation == CMD_DELETE) + else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE) { int32 affectedRowCount = -1; char *originalQueryString = task->queryString; @@ -217,6 +206,11 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas estate->es_processed = affectedRowCount; } + else if (operation == CMD_INSERT) + { + int32 affectedRowCount = ExecuteDistributedModify(task); + estate->es_processed = affectedRowCount; + } else if (operation == CMD_SELECT) { DestReceiver *destination = queryDesc->dest; @@ -302,6 +296,7 @@ ExecuteDistributedModify(Task *task) currentAffectedTupleCount, affectedTupleCount), errdetail("modified placement on %s:%d", nodeName, nodePort))); + failedPlacementList = lappend(failedPlacementList, taskPlacement); } PQclear(result);