diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 1689eddfa..f194b703e 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -81,9 +81,9 @@ bool EnableDeadlockPrevention = true; static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); -static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - CmdType operation, bool failOnError, - bool expectResults); +static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType + operation, bool alwaysThrowErrorOnFailure, bool + expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -98,11 +98,11 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, const char ***parameterValues); static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); -static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows, +static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool + alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats); -static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, - int64 *rows); +static bool ConsumeQueryResult(MultiConnection *connection, bool + alwaysThrowErrorOnFailure, int64 *rows); /* @@ -479,7 +479,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) bool multipleTasks = list_length(taskList) > 1; EState *executorState = scanState->customScanState.ss.ps.state; bool taskListRequires2PC = TaskListRequires2PC(taskList); - bool failOnError = false; + bool alwaysThrowErrorOnFailure = false; CmdType operation = scanState->distributedPlan->operation; /* @@ -498,21 +498,21 @@ RouterSequentialModifyExecScan(CustomScanState *node) * the failures are rare, and we prefer to avoid marking placements invalid * in case of failures. * - * For reference tables, we always failOnError since we absolutely want to avoid - * marking any placements invalid. + * For reference tables, we always set alwaysThrowErrorOnFailure since we + * absolutely want to avoid marking any placements invalid. * - * We also cannot handle faulures when there is RETURNING and there are more than - * one task to execute. + * We also cannot handle failures when there is RETURNING and there are more + * than one task to execute. */ if (taskListRequires2PC) { CoordinatedTransactionUse2PC(); - failOnError = true; + alwaysThrowErrorOnFailure = true; } else if (multipleTasks && hasReturning) { - failOnError = true; + alwaysThrowErrorOnFailure = true; } } @@ -523,8 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) Task *task = (Task *) lfirst(taskCell); executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, operation, failOnError, - hasReturning); + ExecuteSingleModifyTask(scanState, task, operation, + alwaysThrowErrorOnFailure, hasReturning); } scanState->finishedRemoteScan = true; @@ -821,7 +821,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access */ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, - bool failOnError, bool expectResults) + bool alwaysThrowErrorOnFailure, bool expectResults) { EState *executorState = NULL; ParamListInfo paramListInfo = NULL; @@ -851,7 +851,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * table or multi-shard command) and start a transaction (when in a * transaction). */ - connectionList = GetModifyConnections(task, failOnError); + connectionList = GetModifyConnections(task, alwaysThrowErrorOnFailure); /* * If we are dealing with a partitioned table, we also need to lock its @@ -911,7 +911,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * If we already failed on all other placements (possibly 0), * relay errors directly. */ - failOnError = true; + alwaysThrowErrorOnFailure = true; } /* @@ -921,12 +921,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation */ if (!gotResults && expectResults) { - queryOK = StoreQueryResult(scanState, connection, failOnError, + queryOK = StoreQueryResult(scanState, connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount, NULL); } else { - queryOK = ConsumeQueryResult(connection, failOnError, + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount); } @@ -958,9 +958,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation /* * If a command results in an error on all workers, we relay the last error - * in the loop above by setting failOnError. However, if all connections fail - * we still complete the loop without throwing an error. In that case, throw - * an error below. + * in the loop above by setting alwaysThrowErrorOnFailure. However, if all + * connections fail we still complete the loop without throwing an error. + * In that case, throw an error below. */ if (!resultsOK) { @@ -1119,7 +1119,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) bool multipleTasks = list_length(taskList) > 1; bool expectResults = false; int64 affectedTupleCount = 0; - bool failOnError = true; + bool alwaysThrowErrorOnFailure = true; bool taskListRequires2PC = TaskListRequires2PC(taskList); /* decide on whether to use coordinated transaction and 2PC */ @@ -1149,7 +1149,8 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) Task *task = (Task *) lfirst(taskCell); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); + ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure, + expectResults); } return affectedTupleCount; @@ -1283,7 +1284,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn List *connectionList = NIL; MultiConnection *connection = NULL; int64 currentAffectedTupleCount = 0; - bool failOnError = true; + bool alwaysThrowErrorOnFailure = true; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; /* abort in case of cancellation */ @@ -1320,12 +1321,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn { Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, failOnError, + queryOK = StoreQueryResult(scanState, connection, + alwaysThrowErrorOnFailure, ¤tAffectedTupleCount, NULL); } else { - queryOK = ConsumeQueryResult(connection, failOnError, + queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure, ¤tAffectedTupleCount); } @@ -1418,18 +1420,18 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query, if (querySent == 0) { - const bool raiseErrors = false; + const bool raiseIfTransactionIsCritical = true; - HandleRemoteTransactionConnectionError(connection, raiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical); return false; } singleRowMode = PQsetSingleRowMode(connection->pgConn); if (singleRowMode == 0) { - const bool raiseErrors = false; + const bool raiseIfTransactionIsCritical = true; - HandleRemoteTransactionConnectionError(connection, raiseErrors); + HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical); return false; } @@ -1515,7 +1517,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - bool failOnError, int64 *rows, + bool alwaysThrowErrorOnFailure, int64 *rows, DistributedExecutionStats *executionStats) { TupleDesc tupleDescriptor = @@ -1541,7 +1543,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); } - else if (!failOnError) + else if (!alwaysThrowErrorOnFailure) { /* might have failed query execution on another placement before */ tuplestore_clear(scanState->tuplestorestate); @@ -1571,6 +1573,10 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, int category = 0; bool isConstraintViolation = false; + /* + * Mark transaction as failed, but don't throw an error. This allows us + * to give a more meaningful error message below. + */ MarkRemoteTransactionFailed(connection, false); /* @@ -1581,7 +1587,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (isConstraintViolation || failOnError) + if (isConstraintViolation || alwaysThrowErrorOnFailure || + IsRemoteTransactionCritical(connection)) { ReportResultError(connection, result, ERROR); } @@ -1663,7 +1670,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, * has been an error. */ static bool -ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) +ConsumeQueryResult(MultiConnection *connection, bool alwaysThrowErrorOnFailure, + int64 *rows) { bool commandFailed = false; bool gotResponse = false; @@ -1696,6 +1704,11 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) int category = 0; bool isConstraintViolation = false; + /* + * Mark transaction as failed, but don't throw an error even if the + * transaction is critical. This allows us to give a more meaningful + * error message below. + */ MarkRemoteTransactionFailed(connection, false); /* @@ -1706,7 +1719,8 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); - if (isConstraintViolation || failOnError) + if (isConstraintViolation || alwaysThrowErrorOnFailure || + IsRemoteTransactionCritical(connection)) { ReportResultError(connection, result, ERROR); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 044876547..f8eaf3d74 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -43,7 +43,6 @@ static void StartRemoteTransactionSavepointRollback(MultiConnection *connection, static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId subId); -static void CheckTransactionHealth(void); static void Assign2PCIdentifier(MultiConnection *connection); static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); @@ -721,6 +720,19 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection) } +/* + * IsRemoteTransactionCritical returns whether the remote transaction on + * the given connection has been marked as critical. + */ +bool +IsRemoteTransactionCritical(struct MultiConnection *connection) +{ + RemoteTransaction *transaction = &connection->remoteTransaction; + + return transaction->transactionCritical; +} + + /* * CloseRemoteTransaction handles closing a connection that, potentially, is * part of a coordinated transaction. This should only ever be called from @@ -824,12 +836,6 @@ CoordinatedRemoteTransactionsCommit(void) List *connectionList = NIL; bool raiseInterrupts = false; - /* - * Before starting to commit on any of the nodes - after which we can't - * completely roll-back anymore - check that things are in a good state. - */ - CheckTransactionHealth(); - /* * Issue appropriate transaction commands to remote nodes. If everything * went well that's going to be COMMIT or COMMIT PREPARED, if individual @@ -1216,13 +1222,13 @@ FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransac /* - * CheckTransactionHealth checks if any of the participating transactions in a + * CheckRemoteTransactionsHealth checks if any of the participating transactions in a * coordinated transaction failed, and what consequence that should have. * This needs to be called before the coordinated transaction commits (but * after they've been PREPAREd if 2PC is in use). */ -static void -CheckTransactionHealth(void) +void +CheckRemoteTransactionsHealth(void) { dlist_iter iter; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index f4b7234ee..df269cf39 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -315,9 +315,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) { CoordinatedRemoteTransactionsPrepare(); CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; + + /* + * Make sure we did not have any failures on connections marked as + * critical before committing. + */ + CheckRemoteTransactionsHealth(); } else { + CheckRemoteTransactionsHealth(); + /* * Have to commit remote transactions in PRE_COMMIT, to allow * us to mark failed placements as invalid. Better don't use diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index 89eb822ab..92a065e11 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -115,6 +115,7 @@ extern void HandleRemoteTransactionResultError(struct MultiConnection *connectio extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, bool allowErrorPromotion); extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); +extern bool IsRemoteTransactionCritical(struct MultiConnection *connection); /* @@ -129,6 +130,7 @@ extern void ResetRemoteTransaction(struct MultiConnection *connection); extern void CoordinatedRemoteTransactionsPrepare(void); extern void CoordinatedRemoteTransactionsCommit(void); extern void CoordinatedRemoteTransactionsAbort(void); +extern void CheckRemoteTransactionsHealth(void); /* remote savepoint commands */ extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); diff --git a/src/test/regress/expected/set_operation_and_local_tables.out b/src/test/regress/expected/set_operation_and_local_tables.out index 3d1aaa9bd..29a59abe1 100644 --- a/src/test/regress/expected/set_operation_and_local_tables.out +++ b/src/test/regress/expected/set_operation_and_local_tables.out @@ -72,9 +72,8 @@ DEBUG: generating subplan 10_1 for subquery SELECT x FROM recursive_set_local.t DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT (i.i OPERATOR(pg_catalog./) 0) FROM generate_series(0, 100) i(i) ORDER BY 1 DESC DEBUG: Creating router plan DEBUG: Plan is router executable -WARNING: division by zero +ERROR: division by zero CONTEXT: while executing command on localhost:57637 -ERROR: could not receive query results -- we should be able to run set operations with generate series and local tables as well ((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC; DEBUG: generating subplan 12_1 for subquery SELECT x FROM recursive_set_local.local_test diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index be36fecdf..04e5a57cd 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -106,9 +106,8 @@ WITH cte AS ( SELECT user_id FROM users_table WHERE value_2 IN (1, 2) ) SELECT (SELECT * FROM cte); -WARNING: more than one row returned by a subquery used as an expression +ERROR: more than one row returned by a subquery used as an expression CONTEXT: while executing command on localhost:57637 -ERROR: could not receive query results WITH cte_basic AS ( SELECT user_id FROM users_table WHERE user_id = 1 )