mirror of https://github.com/citusdata/citus.git
Wrap UPDATE/DELETE in xact, remove locking
Doesn't handle errors or failures well, but works.pull/460/head
parent
73bc20688c
commit
98a2e094ab
|
@ -136,7 +136,7 @@ CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery)
|
||||||
}
|
}
|
||||||
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
||||||
{
|
{
|
||||||
lockMode = ExclusiveLock;
|
lockMode = NoLock;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -195,12 +195,28 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
|
||||||
InstrStartNode(queryDesc->totaltime);
|
InstrStartNode(queryDesc->totaltime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
|
if (operation == CMD_INSERT)
|
||||||
operation == CMD_DELETE)
|
|
||||||
{
|
{
|
||||||
int32 affectedRowCount = ExecuteDistributedModify(task);
|
int32 affectedRowCount = ExecuteDistributedModify(task);
|
||||||
estate->es_processed = affectedRowCount;
|
estate->es_processed = affectedRowCount;
|
||||||
}
|
}
|
||||||
|
else if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||||
|
{
|
||||||
|
int32 affectedRowCount = -1;
|
||||||
|
char *originalQueryString = task->queryString;
|
||||||
|
StringInfo modifiedQueryString = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfoString(modifiedQueryString, "BEGIN; ");
|
||||||
|
appendStringInfoString(modifiedQueryString, originalQueryString);
|
||||||
|
|
||||||
|
task->queryString = modifiedQueryString->data;
|
||||||
|
affectedRowCount = ExecuteDistributedModify(task);
|
||||||
|
|
||||||
|
task->queryString = "COMMIT";
|
||||||
|
ExecuteDistributedModify(task);
|
||||||
|
|
||||||
|
estate->es_processed = affectedRowCount;
|
||||||
|
}
|
||||||
else if (operation == CMD_SELECT)
|
else if (operation == CMD_SELECT)
|
||||||
{
|
{
|
||||||
DestReceiver *destination = queryDesc->dest;
|
DestReceiver *destination = queryDesc->dest;
|
||||||
|
@ -269,7 +285,11 @@ ExecuteDistributedModify(Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentAffectedTupleString = PQcmdTuples(result);
|
currentAffectedTupleString = PQcmdTuples(result);
|
||||||
currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0);
|
if (*currentAffectedTupleString != '\0')
|
||||||
|
{
|
||||||
|
currentAffectedTupleCount = pg_atoi(currentAffectedTupleString,
|
||||||
|
sizeof(int32), 0);
|
||||||
|
}
|
||||||
|
|
||||||
if ((affectedTupleCount == -1) ||
|
if ((affectedTupleCount == -1) ||
|
||||||
(affectedTupleCount == currentAffectedTupleCount))
|
(affectedTupleCount == currentAffectedTupleCount))
|
||||||
|
|
Loading…
Reference in New Issue