mirror of https://github.com/citusdata/citus.git
Rename failOnError to alwaysThrowErrorOnFailure
parent
0feb1f2eb1
commit
0bbe778760
|
@ -81,9 +81,9 @@ bool EnableDeadlockPrevention = true;
|
|||
static void AcquireMetadataLocks(List *taskList);
|
||||
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
||||
ShardPlacementAccessType accessType);
|
||||
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||
CmdType operation, bool failOnError,
|
||||
bool expectResults);
|
||||
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType
|
||||
operation, bool alwaysThrowErrorOnFailure, bool
|
||||
expectResults);
|
||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||
static List * GetModifyConnections(Task *task, bool markCritical);
|
||||
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||
|
@ -98,11 +98,11 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
|||
const char ***parameterValues);
|
||||
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
|
||||
ParamListInfo paramListInfo);
|
||||
static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
||||
bool failOnError, int64 *rows,
|
||||
static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool
|
||||
alwaysThrowErrorOnFailure, int64 *rows,
|
||||
DistributedExecutionStats *executionStats);
|
||||
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
|
||||
int64 *rows);
|
||||
static bool ConsumeQueryResult(MultiConnection *connection, bool
|
||||
alwaysThrowErrorOnFailure, int64 *rows);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -479,7 +479,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
|||
bool multipleTasks = list_length(taskList) > 1;
|
||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
||||
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
||||
bool failOnError = false;
|
||||
bool alwaysThrowErrorOnFailure = false;
|
||||
CmdType operation = scanState->distributedPlan->operation;
|
||||
|
||||
/*
|
||||
|
@ -498,21 +498,21 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
|||
* the failures are rare, and we prefer to avoid marking placements invalid
|
||||
* in case of failures.
|
||||
*
|
||||
* For reference tables, we always failOnError since we absolutely want to avoid
|
||||
* marking any placements invalid.
|
||||
* For reference tables, we always set alwaysThrowErrorOnFailure since we
|
||||
* absolutely want to avoid marking any placements invalid.
|
||||
*
|
||||
* We also cannot handle faulures when there is RETURNING and there are more than
|
||||
* one task to execute.
|
||||
* We also cannot handle failures when there is RETURNING and there are more
|
||||
* than one task to execute.
|
||||
*/
|
||||
if (taskListRequires2PC)
|
||||
{
|
||||
CoordinatedTransactionUse2PC();
|
||||
|
||||
failOnError = true;
|
||||
alwaysThrowErrorOnFailure = true;
|
||||
}
|
||||
else if (multipleTasks && hasReturning)
|
||||
{
|
||||
failOnError = true;
|
||||
alwaysThrowErrorOnFailure = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -523,8 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
|||
Task *task = (Task *) lfirst(taskCell);
|
||||
|
||||
executorState->es_processed +=
|
||||
ExecuteSingleModifyTask(scanState, task, operation, failOnError,
|
||||
hasReturning);
|
||||
ExecuteSingleModifyTask(scanState, task, operation,
|
||||
alwaysThrowErrorOnFailure, hasReturning);
|
||||
}
|
||||
|
||||
scanState->finishedRemoteScan = true;
|
||||
|
@ -821,7 +821,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
|
|||
*/
|
||||
static int64
|
||||
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation,
|
||||
bool failOnError, bool expectResults)
|
||||
bool alwaysThrowErrorOnFailure, bool expectResults)
|
||||
{
|
||||
EState *executorState = NULL;
|
||||
ParamListInfo paramListInfo = NULL;
|
||||
|
@ -851,7 +851,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
|||
* table or multi-shard command) and start a transaction (when in a
|
||||
* transaction).
|
||||
*/
|
||||
connectionList = GetModifyConnections(task, failOnError);
|
||||
connectionList = GetModifyConnections(task, alwaysThrowErrorOnFailure);
|
||||
|
||||
/*
|
||||
* If we are dealing with a partitioned table, we also need to lock its
|
||||
|
@ -911,7 +911,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
|||
* If we already failed on all other placements (possibly 0),
|
||||
* relay errors directly.
|
||||
*/
|
||||
failOnError = true;
|
||||
alwaysThrowErrorOnFailure = true;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -921,12 +921,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
|||
*/
|
||||
if (!gotResults && expectResults)
|
||||
{
|
||||
queryOK = StoreQueryResult(scanState, connection, failOnError,
|
||||
queryOK = StoreQueryResult(scanState, connection, alwaysThrowErrorOnFailure,
|
||||
¤tAffectedTupleCount, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
queryOK = ConsumeQueryResult(connection, failOnError,
|
||||
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
|
||||
¤tAffectedTupleCount);
|
||||
}
|
||||
|
||||
|
@ -958,9 +958,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
|
|||
|
||||
/*
|
||||
* If a command results in an error on all workers, we relay the last error
|
||||
* in the loop above by setting failOnError. However, if all connections fail
|
||||
* we still complete the loop without throwing an error. In that case, throw
|
||||
* an error below.
|
||||
* in the loop above by setting alwaysThrowErrorOnFailure. However, if all
|
||||
* connections fail we still complete the loop without throwing an error.
|
||||
* In that case, throw an error below.
|
||||
*/
|
||||
if (!resultsOK)
|
||||
{
|
||||
|
@ -1119,7 +1119,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
|
|||
bool multipleTasks = list_length(taskList) > 1;
|
||||
bool expectResults = false;
|
||||
int64 affectedTupleCount = 0;
|
||||
bool failOnError = true;
|
||||
bool alwaysThrowErrorOnFailure = true;
|
||||
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
||||
|
||||
/* decide on whether to use coordinated transaction and 2PC */
|
||||
|
@ -1149,7 +1149,8 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
|
|||
Task *task = (Task *) lfirst(taskCell);
|
||||
|
||||
affectedTupleCount +=
|
||||
ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults);
|
||||
ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure,
|
||||
expectResults);
|
||||
}
|
||||
|
||||
return affectedTupleCount;
|
||||
|
@ -1283,7 +1284,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
List *connectionList = NIL;
|
||||
MultiConnection *connection = NULL;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
bool failOnError = true;
|
||||
bool alwaysThrowErrorOnFailure = true;
|
||||
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
|
||||
|
||||
/* abort in case of cancellation */
|
||||
|
@ -1320,12 +1321,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
{
|
||||
Assert(scanState != NULL);
|
||||
|
||||
queryOK = StoreQueryResult(scanState, connection, failOnError,
|
||||
queryOK = StoreQueryResult(scanState, connection,
|
||||
alwaysThrowErrorOnFailure,
|
||||
¤tAffectedTupleCount, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
queryOK = ConsumeQueryResult(connection, failOnError,
|
||||
queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
|
||||
¤tAffectedTupleCount);
|
||||
}
|
||||
|
||||
|
@ -1515,7 +1517,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
|
|||
*/
|
||||
static bool
|
||||
StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
||||
bool failOnError, int64 *rows,
|
||||
bool alwaysThrowErrorOnFailure, int64 *rows,
|
||||
DistributedExecutionStats *executionStats)
|
||||
{
|
||||
TupleDesc tupleDescriptor =
|
||||
|
@ -1541,7 +1543,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
|||
scanState->tuplestorestate =
|
||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||
}
|
||||
else if (!failOnError)
|
||||
else if (!alwaysThrowErrorOnFailure)
|
||||
{
|
||||
/* might have failed query execution on another placement before */
|
||||
tuplestore_clear(scanState->tuplestorestate);
|
||||
|
@ -1585,7 +1587,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
|||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
if (isConstraintViolation || failOnError ||
|
||||
if (isConstraintViolation || alwaysThrowErrorOnFailure ||
|
||||
IsRemoteTransactionCritical(connection))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
|
@ -1668,7 +1670,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
|
|||
* has been an error.
|
||||
*/
|
||||
static bool
|
||||
ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
|
||||
ConsumeQueryResult(MultiConnection *connection, bool alwaysThrowErrorOnFailure,
|
||||
int64 *rows)
|
||||
{
|
||||
bool commandFailed = false;
|
||||
bool gotResponse = false;
|
||||
|
@ -1716,7 +1719,7 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
|
|||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
if (isConstraintViolation || failOnError ||
|
||||
if (isConstraintViolation || alwaysThrowErrorOnFailure ||
|
||||
IsRemoteTransactionCritical(connection))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
|
|
Loading…
Reference in New Issue