Merge pull request #2858 from citusdata/multi_modifications_bug

Use 2PC in adaptive executor when dealing with replication factors above 1
pull/2861/head
Philip Dubé 2019-08-02 00:20:22 +00:00 committed by GitHub
commit 9b4ba2f5b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 1320 deletions

View File

@ -938,35 +938,16 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution)
return true; return true;
} }
/*
* Checking the first task's placement list is not sufficient for all purposes since
* for append/range distributed tables we might have unequal number of placements for
* shards. However, it is safe to do here, because we're searching for a reference
* table. All other cases return false for this purpose.
*/
task = (Task *) linitial(taskList);
if (list_length(task->taskPlacementList) > 1) if (list_length(task->taskPlacementList) > 1)
{ {
/* /*
* Some tasks don't set replicationModel thus we only * Adaptive executor opts to error out on queries if a placement is unhealthy,
* rely on the anchorShardId, not replicationModel. * not marking the placement itself unhealthy in the process.
* * Use 2PC to rollback placements before the unhealthy shard failed.
* TODO: Do we ever need replicationModel in the Task structure?
* Can't we always rely on anchorShardId?
*/ */
uint64 anchorShardId = task->anchorShardId;
if (anchorShardId != INVALID_SHARD_ID && ReferenceTableShardId(anchorShardId))
{
return true; return true;
} }
/*
* Single DML/DDL tasks with replicated tables (non-reference)
* should not require BEGIN/COMMIT/ROLLBACK.
*/
return false;
}
return false; return false;
} }

View File

@ -2,7 +2,7 @@ SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000; SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000; SET citus.next_placement_id TO 750000;
-- some failure messages that comes from the worker nodes -- some failure messages that comes from the worker nodes
-- might change due to parallel exectuions, so supress those -- might change due to parallel executions, so suppress those
-- using \set VERBOSITY terse -- using \set VERBOSITY terse
-- =================================================================== -- ===================================================================
-- test end-to-end modification functionality -- test end-to-end modification functionality
@ -102,7 +102,6 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
RESET citus.task_executor_type;
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
@ -120,7 +119,6 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;
SET citus.task_executor_type TO DEFAULT;
-- try inserting without a range-partitioned shard to receive the value -- try inserting without a range-partitioned shard to receive the value
INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69); 20.69);
@ -355,9 +353,23 @@ WARNING: relation "public.limit_orders_750000" does not exist
\c - - - :worker_2_port \c - - - :worker_2_port
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Connect back to master node
\c - - - :master_port
-- Verify the insert failed and both placements are healthy -- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
-------
1
(1 row)
\c - - - :worker_2_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
-------
0
(1 row)
\c - - - :master_port
SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM limit_orders WHERE id = 276;
count count
------- -------

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@ SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000; SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000; SET citus.next_placement_id TO 750000;
-- some failure messages that comes from the worker nodes -- some failure messages that comes from the worker nodes
-- might change due to parallel exectuions, so supress those -- might change due to parallel executions, so suppress those
-- using \set VERBOSITY terse -- using \set VERBOSITY terse
-- =================================================================== -- ===================================================================
-- test end-to-end modification functionality -- test end-to-end modification functionality
@ -102,7 +102,6 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
RESET citus.task_executor_type;
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
@ -120,7 +119,6 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;
SET citus.task_executor_type TO DEFAULT;
-- try inserting without a range-partitioned shard to receive the value -- try inserting without a range-partitioned shard to receive the value
INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
20.69); 20.69);
@ -355,9 +353,23 @@ ERROR: relation "public.limit_orders_750000" does not exist
\c - - - :worker_2_port \c - - - :worker_2_port
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Connect back to master node
\c - - - :master_port
-- Verify the insert failed and both placements are healthy -- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
-------
0
(1 row)
\c - - - :worker_2_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
count
-------
0
(1 row)
\c - - - :master_port
SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM limit_orders WHERE id = 276;
count count
------- -------

View File

@ -3,7 +3,7 @@ SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000; SET citus.next_placement_id TO 750000;
-- some failure messages that comes from the worker nodes -- some failure messages that comes from the worker nodes
-- might change due to parallel exectuions, so supress those -- might change due to parallel executions, so suppress those
-- using \set VERBOSITY terse -- using \set VERBOSITY terse
-- =================================================================== -- ===================================================================
@ -88,11 +88,9 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5
20.69); 20.69);
-- ensure the values are where we put them and query to ensure they are properly pruned -- ensure the values are where we put them and query to ensure they are properly pruned
SET client_min_messages TO 'DEBUG2'; SET client_min_messages TO 'DEBUG2';
RESET citus.task_executor_type;
SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM range_partitioned WHERE id = 32743;
SELECT * FROM append_partitioned WHERE id = 414123; SELECT * FROM append_partitioned WHERE id = 414123;
SET client_min_messages TO DEFAULT; SET client_min_messages TO DEFAULT;
SET citus.task_executor_type TO DEFAULT;
-- try inserting without a range-partitioned shard to receive the value -- try inserting without a range-partitioned shard to receive the value
INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy',
@ -262,10 +260,16 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell',
-- Second: Move aside limit_orders shard on the second worker node -- Second: Move aside limit_orders shard on the second worker node
ALTER TABLE renamed_orders RENAME TO limit_orders_750000; ALTER TABLE renamed_orders RENAME TO limit_orders_750000;
-- Connect back to master node -- Verify the insert failed and both placements are healthy
-- or the insert succeeded and placement marked unhealthy
\c - - - :worker_1_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :worker_2_port
SELECT count(*) FROM limit_orders_750000 WHERE id = 276;
\c - - - :master_port \c - - - :master_port
-- Verify the insert failed and both placements are healthy
SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) FROM limit_orders WHERE id = 276;
SELECT count(*) SELECT count(*)