diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index fdd9f31d7..f194b703e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -81,9 +81,9 @@ bool EnableDeadlockPrevention = true; static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); -static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - CmdType operation, bool failOnError, - bool expectResults); +static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType + operation, bool alwaysThrowErrorOnFailure, bool + expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -98,11 +98,11 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, const char ***parameterValues); static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows, +static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool + alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats); -static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, - int64 *rows); +static bool ConsumeQueryResult(MultiConnection *connection, bool + alwaysThrowErrorOnFailure, int64 *rows); /* @@ -479,7 +479,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) bool multipleTasks = list_length(taskList) > 1; EState *executorState = scanState->customScanState.ss.ps.state; bool taskListRequires2PC = TaskListRequires2PC(taskList); - bool failOnError = false; + bool alwaysThrowErrorOnFailure = false; CmdType operation = scanState->distributedPlan->operation; /* @@ -498,21 +498,21 @@ RouterSequentialModifyExecScan(CustomScanState *node) * the failures are rare, and we prefer to avoid marking placements invalid * in case of failures. * - * For reference tables, we always failOnError since we absolutely want to avoid - * marking any placements invalid. + * For reference tables, we always set alwaysThrowErrorOnFailure since we + * absolutely want to avoid marking any placements invalid. * - * We also cannot handle faulures when there is RETURNING and there are more than - * one task to execute. + * We also cannot handle failures when there is RETURNING and there are more + * than one task to execute. */ if (taskListRequires2PC) { CoordinatedTransactionUse2PC(); - failOnError = true; + alwaysThrowErrorOnFailure = true; } else if (multipleTasks && hasReturning) { - failOnError = true; + alwaysThrowErrorOnFailure = true; } } @@ -523,8 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) Task *task = (Task *) lfirst(taskCell); executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, operation, failOnError, - hasReturning); + ExecuteSingleModifyTask(scanState, task, operation, + alwaysThrowErrorOnFailure, hasReturning); } scanState->finishedRemoteScan = true; @@ -821,7 +821,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access */ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, - bool failOnError, bool expectResults) + bool alwaysThrowErrorOnFailure, bool expectResults) { EState *executorState = NULL; ParamListInfo paramListInfo = NULL; @@ -851,7 +851,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * table or multi-shard command) and start a transaction (when in a * transaction). */ - connectionList = GetModifyConnections(task, failOnError); + connectionList = GetModifyConnections(task, alwaysThrowErrorOnFailure); /* * If we are dealing with a partitioned table, we also need to lock its @@ -911,7 +911,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * If we already failed on all other placements (possibly 0), * relay errors directly. */ - failOnError = true; + alwaysThrowErrorOnFailure = true; } /* @@ -921,12 +921,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation */ if (!gotResults && expectResults) { - queryOK = StoreQueryResult(scanState, connection, failOnError, + queryOK = StoreQueryResult(scanState, connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount, NULL); } else { - queryOK = ConsumeQueryResult(connection, failOnError, + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount); } @@ -958,9 +958,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation /* * If a command results in an error on all workers, we relay the last error - * in the loop above by setting failOnError. However, if all connections fail - * we still complete the loop without throwing an error. In that case, throw - * an error below. + * in the loop above by setting alwaysThrowErrorOnFailure. However, if all + * connections fail we still complete the loop without throwing an error. + * In that case, throw an error below. */ if (!resultsOK) { @@ -1119,7 +1119,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) bool multipleTasks = list_length(taskList) > 1; bool expectResults = false; int64 affectedTupleCount = 0; - bool failOnError = true; + bool alwaysThrowErrorOnFailure = true; bool taskListRequires2PC = TaskListRequires2PC(taskList); /* decide on whether to use coordinated transaction and 2PC */ @@ -1149,7 +1149,8 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) Task *task = (Task *) lfirst(taskCell); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); + ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure, + expectResults); } return affectedTupleCount; @@ -1283,7 +1284,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn List *connectionList = NIL; MultiConnection *connection = NULL; int64 currentAffectedTupleCount = 0; - bool failOnError = true; + bool alwaysThrowErrorOnFailure = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; /* abort in case of cancellation */ @@ -1320,12 +1321,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn { Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, failOnError, + queryOK = StoreQueryResult(scanState, connection, + alwaysThrowErrorOnFailure, ¤tAffectedTupleCount, NULL); } else { - queryOK = ConsumeQueryResult(connection, failOnError, + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount); } @@ -1515,7 +1517,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows, + bool alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats) { TupleDesc tupleDescriptor = @@ -1541,7 +1543,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); } - else if (!failOnError) + else if (!alwaysThrowErrorOnFailure) { /* might have failed query execution on another placement before */ tuplestore_clear(scanState->tuplestorestate); @@ -1585,7 +1587,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (isConstraintViolation || failOnError || + if (isConstraintViolation || alwaysThrowErrorOnFailure || IsRemoteTransactionCritical(connection)) { ReportResultError(connection, result, ERROR); @@ -1668,7 +1670,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, * has been an error. */ static bool -ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, bool alwaysThrowErrorOnFailure, + int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1716,7 +1719,7 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (isConstraintViolation || failOnError || + if (isConstraintViolation || alwaysThrowErrorOnFailure || IsRemoteTransactionCritical(connection)) { ReportResultError(connection, result, ERROR);