mirror of https://github.com/citusdata/citus.git
Improve failure handling of distributed execution
Prior to this commit, the code would skip processing the
errors happened for local commands.
Prior to https://github.com/citusdata/citus/pull/5379, it might
make sense to allow the execution continue. But, as of today,
if a modification fails on any placement, we can safely fail
the execution.
(cherry picked from commit b4008bc872
)
release-10.2-onder
parent
4bc11c9bb4
commit
13eb0336d3
|
@ -503,7 +503,9 @@ typedef enum TaskExecutionState
|
|||
/*
|
||||
* PlacementExecutionOrder indicates whether a command should be executed
|
||||
* on any replica, on all replicas sequentially (in order), or on all
|
||||
* replicas in parallel.
|
||||
* replicas in parallel. In other words, EXECUTION_ORDER_ANY is used for
|
||||
* SELECTs, EXECUTION_ORDER_SEQUENTIAL/EXECUTION_ORDER_PARALLEL is used for
|
||||
* DML/DDL.
|
||||
*/
|
||||
typedef enum PlacementExecutionOrder
|
||||
{
|
||||
|
@ -5065,6 +5067,10 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
|
|||
{
|
||||
currentTaskExecutionState = TASK_EXECUTION_FAILED;
|
||||
}
|
||||
else if (executionOrder != EXECUTION_ORDER_ANY && failedPlacementCount > 0)
|
||||
{
|
||||
currentTaskExecutionState = TASK_EXECUTION_FAILED;
|
||||
}
|
||||
else if (executionOrder == EXECUTION_ORDER_ANY && donePlacementCount > 0)
|
||||
{
|
||||
currentTaskExecutionState = TASK_EXECUTION_FINISHED;
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
CREATE SCHEMA failure_local_modification;
|
||||
SET search_path TO failure_local_modification;
|
||||
SET citus.next_shard_id TO 1989000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
|
||||
SELECT create_reference_table('failover_to_local');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO failure_local_modification;
|
||||
-- prevent local connection establishment, imitate
|
||||
-- a failure
|
||||
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
-- we force the execution to use connections (e.g., remote execution)
|
||||
-- however, we do not allow connections as local_shared_pool_size=-1
|
||||
-- so, properly error out
|
||||
SET LOCAL citus.enable_local_execution TO false;
|
||||
INSERT INTO failover_to_local VALUES (1,'1'), (2,'2'),(3,'3'),(4,'4');
|
||||
ERROR: the total number of connections on the server is more than max_connections(100)
|
||||
HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true;
|
||||
ROLLBACK;
|
||||
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA failure_local_modification cascade;
|
|
@ -60,6 +60,8 @@ test: update_colocation_mx
|
|||
test: local_shard_execution_dropped_column
|
||||
test: metadata_sync_helpers
|
||||
|
||||
test: executor_local_failure
|
||||
|
||||
# test that no tests leaked intermediate results. This should always be last
|
||||
test: ensure_no_intermediate_data_leak
|
||||
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
CREATE SCHEMA failure_local_modification;
|
||||
SET search_path TO failure_local_modification;
|
||||
SET citus.next_shard_id TO 1989000;
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
|
||||
SELECT create_reference_table('failover_to_local');
|
||||
|
||||
\c - - - :worker_2_port
|
||||
|
||||
SET search_path TO failure_local_modification;
|
||||
|
||||
-- prevent local connection establishment, imitate
|
||||
-- a failure
|
||||
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.2);
|
||||
BEGIN;
|
||||
-- we force the execution to use connections (e.g., remote execution)
|
||||
-- however, we do not allow connections as local_shared_pool_size=-1
|
||||
-- so, properly error out
|
||||
SET LOCAL citus.enable_local_execution TO false;
|
||||
INSERT INTO failover_to_local VALUES (1,'1'), (2,'2'),(3,'3'),(4,'4');
|
||||
ROLLBACK;
|
||||
|
||||
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA failure_local_modification cascade;
|
Loading…
Reference in New Issue