Merge pull request #2215 from citusdata/fix_ref_table_failure

Always throw errors on failure on critical connection in router executor
pull/2203/merge
Marco Slot 2018-06-14 22:48:14 +02:00 committed by GitHub
commit 4686d49b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 52 deletions

View File

@ -81,9 +81,9 @@ bool EnableDeadlockPrevention = true;
static void AcquireMetadataLocks(List *taskList); static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType
CmdType operation, bool failOnError, operation, bool alwaysThrowErrorOnFailure, bool
bool expectResults); expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(Task *task, bool markCritical); static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
@ -98,11 +98,11 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
const char ***parameterValues); const char ***parameterValues);
static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query,
ParamListInfo paramListInfo); ParamListInfo paramListInfo);
static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, bool
bool failOnError, int64 *rows, alwaysThrowErrorOnFailure, int64 *rows,
DistributedExecutionStats *executionStats); DistributedExecutionStats *executionStats);
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, static bool ConsumeQueryResult(MultiConnection *connection, bool
int64 *rows); alwaysThrowErrorOnFailure, int64 *rows);
/* /*
@ -479,7 +479,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
EState *executorState = scanState->customScanState.ss.ps.state; EState *executorState = scanState->customScanState.ss.ps.state;
bool taskListRequires2PC = TaskListRequires2PC(taskList); bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool failOnError = false; bool alwaysThrowErrorOnFailure = false;
CmdType operation = scanState->distributedPlan->operation; CmdType operation = scanState->distributedPlan->operation;
/* /*
@ -498,21 +498,21 @@ RouterSequentialModifyExecScan(CustomScanState *node)
* the failures are rare, and we prefer to avoid marking placements invalid * the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures. * in case of failures.
* *
* For reference tables, we always failOnError since we absolutely want to avoid * For reference tables, we always set alwaysThrowErrorOnFailure since we
* marking any placements invalid. * absolutely want to avoid marking any placements invalid.
* *
* We also cannot handle faulures when there is RETURNING and there are more than * We also cannot handle failures when there is RETURNING and there are more
* one task to execute. * than one task to execute.
*/ */
if (taskListRequires2PC) if (taskListRequires2PC)
{ {
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
failOnError = true; alwaysThrowErrorOnFailure = true;
} }
else if (multipleTasks && hasReturning) else if (multipleTasks && hasReturning)
{ {
failOnError = true; alwaysThrowErrorOnFailure = true;
} }
} }
@ -523,8 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
executorState->es_processed += executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, operation, failOnError, ExecuteSingleModifyTask(scanState, task, operation,
hasReturning); alwaysThrowErrorOnFailure, hasReturning);
} }
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
@ -821,7 +821,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
*/ */
static int64 static int64
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation,
bool failOnError, bool expectResults) bool alwaysThrowErrorOnFailure, bool expectResults)
{ {
EState *executorState = NULL; EState *executorState = NULL;
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
@ -851,7 +851,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
* table or multi-shard command) and start a transaction (when in a * table or multi-shard command) and start a transaction (when in a
* transaction). * transaction).
*/ */
connectionList = GetModifyConnections(task, failOnError); connectionList = GetModifyConnections(task, alwaysThrowErrorOnFailure);
/* /*
* If we are dealing with a partitioned table, we also need to lock its * If we are dealing with a partitioned table, we also need to lock its
@ -911,7 +911,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
* If we already failed on all other placements (possibly 0), * If we already failed on all other placements (possibly 0),
* relay errors directly. * relay errors directly.
*/ */
failOnError = true; alwaysThrowErrorOnFailure = true;
} }
/* /*
@ -921,12 +921,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
*/ */
if (!gotResults && expectResults) if (!gotResults && expectResults)
{ {
queryOK = StoreQueryResult(scanState, connection, failOnError, queryOK = StoreQueryResult(scanState, connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount, NULL); &currentAffectedTupleCount, NULL);
} }
else else
{ {
queryOK = ConsumeQueryResult(connection, failOnError, queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount); &currentAffectedTupleCount);
} }
@ -958,9 +958,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
/* /*
* If a command results in an error on all workers, we relay the last error * If a command results in an error on all workers, we relay the last error
* in the loop above by setting failOnError. However, if all connections fail * in the loop above by setting alwaysThrowErrorOnFailure. However, if all
* we still complete the loop without throwing an error. In that case, throw * connections fail we still complete the loop without throwing an error.
* an error below. * In that case, throw an error below.
*/ */
if (!resultsOK) if (!resultsOK)
{ {
@ -1119,7 +1119,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
bool expectResults = false; bool expectResults = false;
int64 affectedTupleCount = 0; int64 affectedTupleCount = 0;
bool failOnError = true; bool alwaysThrowErrorOnFailure = true;
bool taskListRequires2PC = TaskListRequires2PC(taskList); bool taskListRequires2PC = TaskListRequires2PC(taskList);
/* decide on whether to use coordinated transaction and 2PC */ /* decide on whether to use coordinated transaction and 2PC */
@ -1149,7 +1149,8 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
affectedTupleCount += affectedTupleCount +=
ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure,
expectResults);
} }
return affectedTupleCount; return affectedTupleCount;
@ -1283,7 +1284,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
List *connectionList = NIL; List *connectionList = NIL;
MultiConnection *connection = NULL; MultiConnection *connection = NULL;
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
bool failOnError = true; bool alwaysThrowErrorOnFailure = true;
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false; bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
/* abort in case of cancellation */ /* abort in case of cancellation */
@ -1320,12 +1321,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
{ {
Assert(scanState != NULL); Assert(scanState != NULL);
queryOK = StoreQueryResult(scanState, connection, failOnError, queryOK = StoreQueryResult(scanState, connection,
alwaysThrowErrorOnFailure,
&currentAffectedTupleCount, NULL); &currentAffectedTupleCount, NULL);
} }
else else
{ {
queryOK = ConsumeQueryResult(connection, failOnError, queryOK = ConsumeQueryResult(connection, alwaysThrowErrorOnFailure,
&currentAffectedTupleCount); &currentAffectedTupleCount);
} }
@ -1418,18 +1420,18 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query,
if (querySent == 0) if (querySent == 0)
{ {
const bool raiseErrors = false; const bool raiseIfTransactionIsCritical = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors); HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical);
return false; return false;
} }
singleRowMode = PQsetSingleRowMode(connection->pgConn); singleRowMode = PQsetSingleRowMode(connection->pgConn);
if (singleRowMode == 0) if (singleRowMode == 0)
{ {
const bool raiseErrors = false; const bool raiseIfTransactionIsCritical = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors); HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical);
return false; return false;
} }
@ -1515,7 +1517,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
*/ */
static bool static bool
StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
bool failOnError, int64 *rows, bool alwaysThrowErrorOnFailure, int64 *rows,
DistributedExecutionStats *executionStats) DistributedExecutionStats *executionStats)
{ {
TupleDesc tupleDescriptor = TupleDesc tupleDescriptor =
@ -1541,7 +1543,7 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
scanState->tuplestorestate = scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
} }
else if (!failOnError) else if (!alwaysThrowErrorOnFailure)
{ {
/* might have failed query execution on another placement before */ /* might have failed query execution on another placement before */
tuplestore_clear(scanState->tuplestorestate); tuplestore_clear(scanState->tuplestorestate);
@ -1571,6 +1573,10 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
/*
* Mark transaction as failed, but don't throw an error. This allows us
* to give a more meaningful error message below.
*/
MarkRemoteTransactionFailed(connection, false); MarkRemoteTransactionFailed(connection, false);
/* /*
@ -1581,7 +1587,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (isConstraintViolation || failOnError) if (isConstraintViolation || alwaysThrowErrorOnFailure ||
IsRemoteTransactionCritical(connection))
{ {
ReportResultError(connection, result, ERROR); ReportResultError(connection, result, ERROR);
} }
@ -1663,7 +1670,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
* has been an error. * has been an error.
*/ */
static bool static bool
ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) ConsumeQueryResult(MultiConnection *connection, bool alwaysThrowErrorOnFailure,
int64 *rows)
{ {
bool commandFailed = false; bool commandFailed = false;
bool gotResponse = false; bool gotResponse = false;
@ -1696,6 +1704,11 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
/*
* Mark transaction as failed, but don't throw an error even if the
* transaction is critical. This allows us to give a more meaningful
* error message below.
*/
MarkRemoteTransactionFailed(connection, false); MarkRemoteTransactionFailed(connection, false);
/* /*
@ -1706,7 +1719,8 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (isConstraintViolation || failOnError) if (isConstraintViolation || alwaysThrowErrorOnFailure ||
IsRemoteTransactionCritical(connection))
{ {
ReportResultError(connection, result, ERROR); ReportResultError(connection, result, ERROR);
} }

View File

@ -43,7 +43,6 @@ static void StartRemoteTransactionSavepointRollback(MultiConnection *connection,
static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection, static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId); SubTransactionId subId);
static void CheckTransactionHealth(void);
static void Assign2PCIdentifier(MultiConnection *connection); static void Assign2PCIdentifier(MultiConnection *connection);
static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit);
@ -721,6 +720,19 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection)
} }
/*
* IsRemoteTransactionCritical returns whether the remote transaction on
* the given connection has been marked as critical.
*/
bool
IsRemoteTransactionCritical(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
return transaction->transactionCritical;
}
/* /*
* CloseRemoteTransaction handles closing a connection that, potentially, is * CloseRemoteTransaction handles closing a connection that, potentially, is
* part of a coordinated transaction. This should only ever be called from * part of a coordinated transaction. This should only ever be called from
@ -824,12 +836,6 @@ CoordinatedRemoteTransactionsCommit(void)
List *connectionList = NIL; List *connectionList = NIL;
bool raiseInterrupts = false; bool raiseInterrupts = false;
/*
* Before starting to commit on any of the nodes - after which we can't
* completely roll-back anymore - check that things are in a good state.
*/
CheckTransactionHealth();
/* /*
* Issue appropriate transaction commands to remote nodes. If everything * Issue appropriate transaction commands to remote nodes. If everything
* went well that's going to be COMMIT or COMMIT PREPARED, if individual * went well that's going to be COMMIT or COMMIT PREPARED, if individual
@ -1216,13 +1222,13 @@ FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransac
/* /*
* CheckTransactionHealth checks if any of the participating transactions in a * CheckRemoteTransactionsHealth checks if any of the participating transactions in a
* coordinated transaction failed, and what consequence that should have. * coordinated transaction failed, and what consequence that should have.
* This needs to be called before the coordinated transaction commits (but * This needs to be called before the coordinated transaction commits (but
* after they've been PREPAREd if 2PC is in use). * after they've been PREPAREd if 2PC is in use).
*/ */
static void void
CheckTransactionHealth(void) CheckRemoteTransactionsHealth(void)
{ {
dlist_iter iter; dlist_iter iter;

View File

@ -315,9 +315,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
{ {
CoordinatedRemoteTransactionsPrepare(); CoordinatedRemoteTransactionsPrepare();
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
/*
* Make sure we did not have any failures on connections marked as
* critical before committing.
*/
CheckRemoteTransactionsHealth();
} }
else else
{ {
CheckRemoteTransactionsHealth();
/* /*
* Have to commit remote transactions in PRE_COMMIT, to allow * Have to commit remote transactions in PRE_COMMIT, to allow
* us to mark failed placements as invalid. Better don't use * us to mark failed placements as invalid. Better don't use

View File

@ -115,6 +115,7 @@ extern void HandleRemoteTransactionResultError(struct MultiConnection *connectio
extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, extern void MarkRemoteTransactionFailed(struct MultiConnection *connection,
bool allowErrorPromotion); bool allowErrorPromotion);
extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
extern bool IsRemoteTransactionCritical(struct MultiConnection *connection);
/* /*
@ -129,6 +130,7 @@ extern void ResetRemoteTransaction(struct MultiConnection *connection);
extern void CoordinatedRemoteTransactionsPrepare(void); extern void CoordinatedRemoteTransactionsPrepare(void);
extern void CoordinatedRemoteTransactionsCommit(void); extern void CoordinatedRemoteTransactionsCommit(void);
extern void CoordinatedRemoteTransactionsAbort(void); extern void CoordinatedRemoteTransactionsAbort(void);
extern void CheckRemoteTransactionsHealth(void);
/* remote savepoint commands */ /* remote savepoint commands */
extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);

View File

@ -72,9 +72,8 @@ DEBUG: generating subplan 10_1 for subquery SELECT x FROM recursive_set_local.t
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT (i.i OPERATOR(pg_catalog./) 0) FROM generate_series(0, 100) i(i) ORDER BY 1 DESC DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT (i.i OPERATOR(pg_catalog./) 0) FROM generate_series(0, 100) i(i) ORDER BY 1 DESC
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
WARNING: division by zero ERROR: division by zero
CONTEXT: while executing command on localhost:57637 CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
-- we should be able to run set operations with generate series and local tables as well -- we should be able to run set operations with generate series and local tables as well
((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC; ((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC;
DEBUG: generating subplan 12_1 for subquery SELECT x FROM recursive_set_local.local_test DEBUG: generating subplan 12_1 for subquery SELECT x FROM recursive_set_local.local_test

View File

@ -106,9 +106,8 @@ WITH cte AS (
SELECT user_id FROM users_table WHERE value_2 IN (1, 2) SELECT user_id FROM users_table WHERE value_2 IN (1, 2)
) )
SELECT (SELECT * FROM cte); SELECT (SELECT * FROM cte);
WARNING: more than one row returned by a subquery used as an expression ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637 CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
WITH cte_basic AS ( WITH cte_basic AS (
SELECT user_id FROM users_table WHERE user_id = 1 SELECT user_id FROM users_table WHERE user_id = 1
) )