Address UPSERT needs and INSERT race conditions

UPSERTs should largely be treated like UPDATE as far as master logic is
concerned. I've switched UPDATE/DELETE/UPSERT to use the RowExclusive
lock mode, which is not self-exclusive, but does conflict with Share.
This is important for preserving existing multi-COPY behavior.

Additionally, INSERTs can now use NoLock, but if a task finds it has
modified a different number of rows than a previous placement's run,
the placement with the discrepancy must be marked INACTIVE.
pull/460/head
Jason Petersen 2016-04-20 12:49:52 -06:00
parent 98a2e094ab
commit a90933990b
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
1 changed files with 10 additions and 15 deletions

View File

@ -116,27 +116,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery)
{ {
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
/* bypass commutativity checks when flag enabled */
if (AllModificationsCommutative)
{
return ShareLock;
}
if (commandType == CMD_SELECT) if (commandType == CMD_SELECT)
{ {
lockMode = NoLock; lockMode = NoLock;
} }
else if (upsertQuery) else if (upsertQuery)
{ {
lockMode = ExclusiveLock; lockMode = RowExclusiveLock;
} }
else if (commandType == CMD_INSERT) else if (commandType == CMD_INSERT)
{ {
lockMode = ShareLock; lockMode = NoLock;
} }
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{ {
lockMode = NoLock; lockMode = RowExclusiveLock;
} }
else else
{ {
@ -195,12 +189,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
InstrStartNode(queryDesc->totaltime); InstrStartNode(queryDesc->totaltime);
} }
if (operation == CMD_INSERT) else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE)
{
int32 affectedRowCount = ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount;
}
else if (operation == CMD_UPDATE || operation == CMD_DELETE)
{ {
int32 affectedRowCount = -1; int32 affectedRowCount = -1;
char *originalQueryString = task->queryString; char *originalQueryString = task->queryString;
@ -217,6 +206,11 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
estate->es_processed = affectedRowCount; estate->es_processed = affectedRowCount;
} }
else if (operation == CMD_INSERT)
{
int32 affectedRowCount = ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount;
}
else if (operation == CMD_SELECT) else if (operation == CMD_SELECT)
{ {
DestReceiver *destination = queryDesc->dest; DestReceiver *destination = queryDesc->dest;
@ -302,6 +296,7 @@ ExecuteDistributedModify(Task *task)
currentAffectedTupleCount, affectedTupleCount), currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%d", errdetail("modified placement on %s:%d",
nodeName, nodePort))); nodeName, nodePort)));
failedPlacementList = lappend(failedPlacementList, taskPlacement);
} }
PQclear(result); PQclear(result);