From a90933990bd2b96b2abce2d437f604c5a36bd21c Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 20 Apr 2016 12:49:52 -0600 Subject: [PATCH] Address UPSERT needs and INSERT race conditions UPSERTs should largely be treated like UPDATE as far as master logic is concerned. I've switched UPDATE/DELETE/UPSERT to use the RowExclusive lock mode, which is not self-exclusive, but does conflict with Share. This is important for preserving existing multi-COPY behavior. Additionally, INSERTs can now use NoLock, but if a task finds it has modified a different number of rows than a previous placement's run, the placement with the discrepancy must be marked INACTIVE. --- .../executor/multi_router_executor.c | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4f5deb1db..1e640ba0e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -116,27 +116,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery) { LOCKMODE lockMode = NoLock; - /* bypass commutativity checks when flag enabled */ - if (AllModificationsCommutative) - { - return ShareLock; - } - if (commandType == CMD_SELECT) { lockMode = NoLock; } else if (upsertQuery) { - lockMode = ExclusiveLock; + lockMode = RowExclusiveLock; } else if (commandType == CMD_INSERT) { - lockMode = ShareLock; + lockMode = NoLock; } else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) { - lockMode = NoLock; + lockMode = RowExclusiveLock; } else { @@ -195,12 +189,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas InstrStartNode(queryDesc->totaltime); } - if (operation == CMD_INSERT) - { - int32 affectedRowCount = ExecuteDistributedModify(task); - estate->es_processed = affectedRowCount; - } - else if (operation == CMD_UPDATE || operation == CMD_DELETE) + else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE) { int32 affectedRowCount = -1; char *originalQueryString = task->queryString; @@ -217,6 +206,11 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas estate->es_processed = affectedRowCount; } + else if (operation == CMD_INSERT) + { + int32 affectedRowCount = ExecuteDistributedModify(task); + estate->es_processed = affectedRowCount; + } else if (operation == CMD_SELECT) { DestReceiver *destination = queryDesc->dest; @@ -302,6 +296,7 @@ ExecuteDistributedModify(Task *task) currentAffectedTupleCount, affectedTupleCount), errdetail("modified placement on %s:%d", nodeName, nodePort))); + failedPlacementList = lappend(failedPlacementList, taskPlacement); } PQclear(result);