Improve failure handling of distributed execution

Prior to this commit, the code would skip processing the
    errors happened for local commands.

    Prior to https://github.com/citusdata/citus/pull/5379, it might
    make sense to allow the execution continue. But, as of today,
    if a modification fails on any placement, we can safely fail
    the execution.

(cherry picked from commit b4008bc872)
release-11.1-onder
onderkalaci 2023-07-31 15:57:27 +03:00
parent 785c5815bb
commit 5c766f4fb5
4 changed files with 86 additions and 1 deletions

View File

@ -511,7 +511,9 @@ typedef enum TaskExecutionState
/*
* PlacementExecutionOrder indicates whether a command should be executed
* on any replica, on all replicas sequentially (in order), or on all
* replicas in parallel.
* replicas in parallel. In other words, EXECUTION_ORDER_ANY is used for
* SELECTs, EXECUTION_ORDER_SEQUENTIAL/EXECUTION_ORDER_PARALLEL is used for
* DML/DDL.
*/
typedef enum PlacementExecutionOrder
{
@ -5291,6 +5293,10 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
{
currentTaskExecutionState = TASK_EXECUTION_FAILED;
}
else if (executionOrder != EXECUTION_ORDER_ANY && failedPlacementCount > 0)
{
currentTaskExecutionState = TASK_EXECUTION_FAILED;
}
else if (executionOrder == EXECUTION_ORDER_ANY && donePlacementCount > 0)
{
currentTaskExecutionState = TASK_EXECUTION_FINISHED;

View File

@ -0,0 +1,47 @@
CREATE SCHEMA failure_local_modification;
SET search_path TO failure_local_modification;
SET citus.next_shard_id TO 1989000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
SELECT create_reference_table('failover_to_local');
create_reference_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO failure_local_modification;
-- prevent local connection establishment, imitate
-- a failure
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.2);
pg_sleep
---------------------------------------------------------------------
(1 row)
BEGIN;
-- we force the execution to use connections (e.g., remote execution)
-- however, we do not allow connections as local_shared_pool_size=-1
-- so, properly error out
SET LOCAL citus.enable_local_execution TO false;
INSERT INTO failover_to_local VALUES (1,'1'), (2,'2'),(3,'3'),(4,'4');
ERROR: the total number of connections on the server is more than max_connections(100)
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
ROLLBACK;
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA failure_local_modification cascade;

View File

@ -68,6 +68,7 @@ test: local_shard_execution_dropped_column
test: metadata_sync_helpers
test: issue_6592
test: executor_local_failure
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak

View File

@ -0,0 +1,31 @@
CREATE SCHEMA failure_local_modification;
SET search_path TO failure_local_modification;
SET citus.next_shard_id TO 1989000;
SET citus.shard_replication_factor TO 1;
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
SELECT create_reference_table('failover_to_local');
\c - - - :worker_2_port
SET search_path TO failure_local_modification;
-- prevent local connection establishment, imitate
-- a failure
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
SELECT pg_sleep(0.2);
BEGIN;
-- we force the execution to use connections (e.g., remote execution)
-- however, we do not allow connections as local_shared_pool_size=-1
-- so, properly error out
SET LOCAL citus.enable_local_execution TO false;
INSERT INTO failover_to_local VALUES (1,'1'), (2,'2'),(3,'3'),(4,'4');
ROLLBACK;
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA failure_local_modification cascade;