Merge amosbird:i64_affected_tuples into master

pull/579/head
Jason Petersen 2016-06-08 10:35:40 -06:00
commit b31808a1c5
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
1 changed files with 21 additions and 11 deletions

View File

@ -33,6 +33,7 @@
#include "utils/errcodes.h" #include "utils/errcodes.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/palloc.h" #include "utils/palloc.h"
#include "utils/int8.h"
/* controls use of locks to enforce safe commutativity */ /* controls use of locks to enforce safe commutativity */
@ -41,7 +42,7 @@ bool AllModificationsCommutative = false;
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery); static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode); static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
static int32 ExecuteDistributedModify(Task *task); static uint64 ExecuteDistributedModify(Task *task);
static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount, static void ExecuteSingleShardSelect(QueryDesc *queryDesc, uint64 tupleCount,
Task *task, EState *executorState, Task *task, EState *executorState,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
@ -203,7 +204,7 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
if (operation == CMD_INSERT || operation == CMD_UPDATE || if (operation == CMD_INSERT || operation == CMD_UPDATE ||
operation == CMD_DELETE) operation == CMD_DELETE)
{ {
int32 affectedRowCount = ExecuteDistributedModify(task); uint64 affectedRowCount = ExecuteDistributedModify(task);
estate->es_processed = affectedRowCount; estate->es_processed = affectedRowCount;
} }
else if (operation == CMD_SELECT) else if (operation == CMD_SELECT)
@ -236,10 +237,10 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
* of modified rows in that case and errors in all others. This function will * of modified rows in that case and errors in all others. This function will
* also generate warnings for individual placement failures. * also generate warnings for individual placement failures.
*/ */
static int32 static uint64
ExecuteDistributedModify(Task *task) ExecuteDistributedModify(Task *task)
{ {
int32 affectedTupleCount = -1; int64 affectedTupleCount = -1;
ListCell *taskPlacementCell = NULL; ListCell *taskPlacementCell = NULL;
List *failedPlacementList = NIL; List *failedPlacementList = NIL;
ListCell *failedPlacementCell = NULL; ListCell *failedPlacementCell = NULL;
@ -253,7 +254,7 @@ ExecuteDistributedModify(Task *task)
PGconn *connection = NULL; PGconn *connection = NULL;
PGresult *result = NULL; PGresult *result = NULL;
char *currentAffectedTupleString = NULL; char *currentAffectedTupleString = NULL;
int32 currentAffectedTupleCount = -1; int64 currentAffectedTupleCount = -1;
Assert(taskPlacement->shardState == FILE_FINALIZED); Assert(taskPlacement->shardState == FILE_FINALIZED);
@ -294,7 +295,16 @@ ExecuteDistributedModify(Task *task)
} }
currentAffectedTupleString = PQcmdTuples(result); currentAffectedTupleString = PQcmdTuples(result);
currentAffectedTupleCount = pg_atoi(currentAffectedTupleString, sizeof(int32), 0);
/* could throw error if input > MAX_INT64 */
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount);
Assert(currentAffectedTupleCount >= 0);
#if (PG_VERSION_NUM < 90600)
/* before 9.6, PostgreSQL used a uint32 for this field, so check */
Assert(currentAffectedTupleCount <= PG_UINT32_MAX);
#endif
if ((affectedTupleCount == -1) || if ((affectedTupleCount == -1) ||
(affectedTupleCount == currentAffectedTupleCount)) (affectedTupleCount == currentAffectedTupleCount))
@ -303,10 +313,10 @@ ExecuteDistributedModify(Task *task)
} }
else else
{ {
ereport(WARNING, (errmsg("modified %d tuples, but expected to modify %d", ereport(WARNING,
currentAffectedTupleCount, affectedTupleCount), (errmsg("modified " INT64_FORMAT " tuples, but expected to modify "
errdetail("modified placement on %s:%d", INT64_FORMAT, currentAffectedTupleCount, affectedTupleCount),
nodeName, nodePort))); errdetail("modified placement on %s:%d", nodeName, nodePort)));
} }
PQclear(result); PQclear(result);
@ -330,7 +340,7 @@ ExecuteDistributedModify(Task *task)
failedPlacement->nodeName, failedPlacement->nodePort); failedPlacement->nodeName, failedPlacement->nodePort);
} }
return affectedTupleCount; return (uint64) affectedTupleCount;
} }