pull/460/merge
Jason Petersen 2016-05-02 17:18:54 +00:00
commit 193c6b789b
1 changed files with 27 additions and 12 deletions

View File

@ -127,27 +127,21 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery)
{ {
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
/* bypass commutativity checks when flag enabled */
if (AllModificationsCommutative)
{
return ShareLock;
}
if (commandType == CMD_SELECT) if (commandType == CMD_SELECT)
{ {
lockMode = NoLock; lockMode = NoLock;
} }
else if (upsertQuery) else if (upsertQuery)
{ {
lockMode = ExclusiveLock; lockMode = RowExclusiveLock;
} }
else if (commandType == CMD_INSERT) else if (commandType == CMD_INSERT)
{ {
lockMode = ShareLock; lockMode = NoLock;
} }
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE) else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{ {
lockMode = ExclusiveLock; lockMode = RowExclusiveLock;
} }
else else
{ {
@ -200,8 +194,24 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
InstrStartNode(queryDesc->totaltime); InstrStartNode(queryDesc->totaltime);
} }
if (operation == CMD_INSERT || operation == CMD_UPDATE || else if (task->upsertQuery || operation == CMD_UPDATE || operation == CMD_DELETE)
operation == CMD_DELETE) {
int32 affectedRowCount = -1;
char *originalQueryString = task->queryString;
StringInfo modifiedQueryString = makeStringInfo();
appendStringInfoString(modifiedQueryString, "BEGIN; ");
appendStringInfoString(modifiedQueryString, originalQueryString);
task->queryString = modifiedQueryString->data;
affectedRowCount = ExecuteDistributedModify(task);
task->queryString = "COMMIT";
ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount;
}
else if (operation == CMD_INSERT)
{ {
int32 affectedRowCount = ExecuteDistributedModify(task); int32 affectedRowCount = ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount; estate->es_processed = affectedRowCount;
@ -275,7 +285,11 @@ ExecuteDistributedModify(Task *task)
} }
currentAffectedTupleString = PQcmdTuples(result); currentAffectedTupleString = PQcmdTuples(result);
currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0); if (*currentAffectedTupleString != '\0')
{
currentAffectedTupleCount = pg_atoi(currentAffectedTupleString,
sizeof(int32), 0);
}
if ((affectedTupleCount == -1) || if ((affectedTupleCount == -1) ||
(affectedTupleCount == currentAffectedTupleCount)) (affectedTupleCount == currentAffectedTupleCount))
@ -288,6 +302,7 @@ ExecuteDistributedModify(Task *task)
currentAffectedTupleCount, affectedTupleCount), currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%d", errdetail("modified placement on %s:%d",
nodeName, nodePort))); nodeName, nodePort)));
failedPlacementList = lappend(failedPlacementList, taskPlacement);
} }
PQclear(result); PQclear(result);