From 19bcb1b4f7c8c54948f9959b460776a6df562b9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 30 Jul 2019 13:22:17 +0000 Subject: [PATCH 1/2] multi_modifications: extend to demonstrate issue in adaptive executor --- .../regress/expected/multi_modifications.out | 22 +- .../expected/multi_modifications_0.out | 1282 ----------------- .../expected/multi_modifications_9.out | 22 +- src/test/regress/sql/multi_modifications.sql | 14 +- 4 files changed, 43 insertions(+), 1297 deletions(-) delete mode 100644 src/test/regress/expected/multi_modifications_0.out diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 01cfeaa22..7c55359a3 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -2,7 +2,7 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; -- some failure messages that comes from the worker nodes --- might change due to parallel exectuions, so supress those +-- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse -- =================================================================== -- test end-to-end modification functionality @@ -102,7 +102,6 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; -RESET citus.task_executor_type; SELECT * FROM range_partitioned WHERE id = 32743; DEBUG: Creating router plan DEBUG: Plan is router executable @@ -120,7 +119,6 @@ DEBUG: Plan is router executable (1 row) SET client_min_messages TO DEFAULT; -SET citus.task_executor_type TO DEFAULT; -- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -355,9 +353,23 @@ WARNING: relation "public.limit_orders_750000" does not exist \c - - - :worker_2_port -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; --- Connect back to master node -\c - - - :master_port -- Verify the insert failed and both placements are healthy +-- or the insert succeeded and placement marked unhealthy +\c - - - :worker_1_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + count +------- + 1 +(1 row) + +\c - - - :worker_2_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + count +------- + 0 +(1 row) + +\c - - - :master_port SELECT count(*) FROM limit_orders WHERE id = 276; count ------- diff --git a/src/test/regress/expected/multi_modifications_0.out b/src/test/regress/expected/multi_modifications_0.out deleted file mode 100644 index 772c29609..000000000 --- a/src/test/regress/expected/multi_modifications_0.out +++ /dev/null @@ -1,1282 +0,0 @@ -SET citus.shard_count TO 32; -SET citus.next_shard_id TO 750000; -SET citus.next_placement_id TO 750000; --- =================================================================== --- test end-to-end modification functionality --- =================================================================== -CREATE TYPE order_side AS ENUM ('buy', 'sell'); -CREATE TABLE limit_orders ( - id bigint PRIMARY KEY, - symbol text NOT NULL, - bidder_id bigint NOT NULL, - placed_at timestamp NOT NULL, - kind order_side NOT NULL, - limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) -); -CREATE TABLE multiple_hash ( - category text NOT NULL, - data text NOT NULL -); -CREATE TABLE insufficient_shards ( LIKE limit_orders ); -CREATE TABLE range_partitioned ( LIKE limit_orders ); -CREATE TABLE append_partitioned ( LIKE limit_orders ); -SET citus.shard_count TO 2; -SELECT create_distributed_table('limit_orders', 'id', 'hash'); - create_distributed_table --------------------------- - -(1 row) - -SELECT create_distributed_table('multiple_hash', 'id', 'hash'); -ERROR: column "id" of relation "multiple_hash" does not exist -SELECT create_distributed_table('range_partitioned', 'id', 'range'); - create_distributed_table --------------------------- - -(1 row) - -SELECT create_distributed_table('append_partitioned', 'id', 'append'); - create_distributed_table --------------------------- - -(1 row) - -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 1; --- make a single shard that covers no partition values -SELECT create_distributed_table('insufficient_shards', 'id', 'hash'); - create_distributed_table --------------------------- - -(1 row) - -UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 0 -WHERE logicalrelid = 'insufficient_shards'::regclass; --- create range-partitioned shards -SELECT master_create_empty_shard('range_partitioned') AS new_shard_id -\gset -UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 49999 -WHERE shardid = :new_shard_id; -SELECT master_create_empty_shard('range_partitioned') AS new_shard_id -\gset -UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999 -WHERE shardid = :new_shard_id; --- create append-partitioned shards -SELECT master_create_empty_shard('append_partitioned') AS new_shard_id -\gset -UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 -WHERE shardid = :new_shard_id; -SELECT master_create_empty_shard('append_partitioned') AS new_shard_id -\gset -UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 -WHERE shardid = :new_shard_id; --- basic single-row INSERT -INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); -SELECT COUNT(*) FROM limit_orders WHERE id = 32743; - count -------- - 1 -(1 row) - --- basic single-row INSERT with RETURNING -INSERT INTO limit_orders VALUES (32744, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69) RETURNING *; - id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- - 32744 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 -(1 row) - --- try a single-row INSERT with no shard to receive it -INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); -ERROR: cannot find shard interval -DETAIL: Hash of the partition column value does not fall into any shards. --- try an insert to a range-partitioned table -INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); --- also insert to an append-partitioned table -INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); --- ensure the values are where we put them and query to ensure they are properly pruned -SET client_min_messages TO 'DEBUG2'; -SET citus.task_executor_type TO 'real-time'; -SELECT * FROM range_partitioned WHERE id = 32743; -DEBUG: Creating router plan -DEBUG: Plan is router executable - id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- - 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 -(1 row) - -SELECT * FROM append_partitioned WHERE id = 414123; -DEBUG: Plan is router executable - id | symbol | bidder_id | placed_at | kind | limit_price ---------+--------+-----------+--------------------------+------+------------- - 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 -(1 row) - -SET client_min_messages TO DEFAULT; -SET citus.task_executor_type TO DEFAULT; --- try inserting without a range-partitioned shard to receive the value -INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); -ERROR: cannot run INSERT command which targets no shards -HINT: Make sure you have created a shard which can receive this partition column value. --- and insert into an append-partitioned table with a value that spans shards: -INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', - 20.69); -ERROR: cannot run INSERT command which targets multiple shards -HINT: Make sure the value for partition column "id" falls into a single shard. --- INSERT with DEFAULT in the target list -INSERT INTO limit_orders VALUES (12756, 'MSFT', 10959, '2013-05-08 07:29:23', 'sell', - DEFAULT); -SELECT COUNT(*) FROM limit_orders WHERE id = 12756; - count -------- - 1 -(1 row) - --- INSERT with expressions in target list -INSERT INTO limit_orders VALUES (430, upper('ibm'), 214, timestamp '2003-01-28 10:31:17' + - interval '5 hours', 'buy', sqrt(2)); -SELECT COUNT(*) FROM limit_orders WHERE id = 430; - count -------- - 1 -(1 row) - --- INSERT without partition key -INSERT INTO limit_orders DEFAULT VALUES; -ERROR: cannot perform an INSERT without a partition column value --- squelch WARNINGs that contain worker_port -SET client_min_messages TO ERROR; --- INSERT violating NOT NULL constraint -INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT); -ERROR: cannot perform an INSERT with NULL in the partition column --- INSERT violating column constraint -INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', - -5.00); -ERROR: new row for relation "limit_orders_750000" violates check constraint "limit_orders_limit_price_check" -DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00). -CONTEXT: while executing command on localhost:57637 --- INSERT violating primary key constraint -INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); -ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" -DETAIL: Key (id)=(32743) already exists. -CONTEXT: while executing command on localhost:57638 --- INSERT violating primary key constraint, with RETURNING specified. -INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING *; -ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" -DETAIL: Key (id)=(32743) already exists. -CONTEXT: while executing command on localhost:57638 --- INSERT, with RETURNING specified, failing with a non-constraint error -INSERT INTO limit_orders VALUES (34153, 'LEE', 5994, '2001-04-16 03:37:28', 'buy', 0.58) RETURNING id / 0; -ERROR: division by zero -CONTEXT: while executing command on localhost:57637 -SET client_min_messages TO DEFAULT; --- commands with non-constant partition values are supported -INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', - 'sell', 0.58); --- values for other columns are totally fine -INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); --- commands with mutable functions in their quals -DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); -ERROR: functions used in the WHERE clause of modification queries on distributed tables must not be VOLATILE --- commands with mutable but non-volatile functions(ie: stable func.) in their quals --- (the cast to timestamp is because the timestamp_eq_timestamptz operator is stable) -DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp::timestamp; --- multi-row inserts are supported -INSERT INTO limit_orders VALUES (12037, 'GOOG', 5634, '2001-04-16 03:37:28', 'buy', 0.50), - (12038, 'GOOG', 5634, '2001-04-17 03:37:28', 'buy', 2.50), - (12039, 'GOOG', 5634, '2001-04-18 03:37:28', 'buy', 1.50); -SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 12037 AND 12039; - count -------- - 3 -(1 row) - --- even those with functions and returning -INSERT INTO limit_orders VALUES (22037, 'GOOG', 5634, now(), 'buy', 0.50), - (22038, 'GOOG', 5634, now(), 'buy', 2.50), - (22039, 'GOOG', 5634, now(), 'buy', 1.50) -RETURNING id; - id -------- - 22038 - 22039 - 22037 -(3 rows) - -SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 22037 AND 22039; - count -------- - 3 -(1 row) - --- even those with functions in their partition columns -INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'buy', 0.50), - (random() * 10 + 80000, 'GOOG', 5634, now(), 'buy', 2.50), - (random() * 10 + 80090, 'GOOG', 5634, now(), 'buy', 1.50); -SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; - count -------- - 3 -(1 row) - --- commands containing a CTE are supported -WITH deleted_orders AS (DELETE FROM limit_orders WHERE id < 0 RETURNING *) -INSERT INTO limit_orders SELECT * FROM deleted_orders; --- test simple DELETE -INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -SELECT COUNT(*) FROM limit_orders WHERE id = 246; - count -------- - 1 -(1 row) - -DELETE FROM limit_orders WHERE id = 246; -SELECT COUNT(*) FROM limit_orders WHERE id = 246; - count -------- - 0 -(1 row) - --- test simple DELETE with RETURNING -DELETE FROM limit_orders WHERE id = 430 RETURNING *; - id | symbol | bidder_id | placed_at | kind | limit_price ------+--------+-----------+--------------------------+------+----------------- - 430 | IBM | 214 | Tue Jan 28 15:31:17 2003 | buy | 1.4142135623731 -(1 row) - -SELECT COUNT(*) FROM limit_orders WHERE id = 430; - count -------- - 0 -(1 row) - --- DELETE with expression in WHERE clause -INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -SELECT COUNT(*) FROM limit_orders WHERE id = 246; - count -------- - 1 -(1 row) - -DELETE FROM limit_orders WHERE id = (2 * 123); -SELECT COUNT(*) FROM limit_orders WHERE id = 246; - count -------- - 0 -(1 row) - --- commands with a USING clause are supported -CREATE TABLE bidders ( name text, id bigint ); -DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND - limit_orders.bidder_id = bidders.id AND - bidders.name = 'Bernie Madoff'; -ERROR: relation bidders is not distributed --- commands containing a CTE are supported -WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) -DELETE FROM limit_orders WHERE id < 0; --- we have to be careful that modifying CTEs are part of the transaction and can thus roll back -WITH new_orders AS (INSERT INTO limit_orders VALUES (412, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66)) -DELETE FROM limit_orders RETURNING id / 0; -ERROR: division by zero -CONTEXT: while executing command on localhost:57638 -SELECT * FROM limit_orders WHERE id = 412; - id | symbol | bidder_id | placed_at | kind | limit_price -----+--------+-----------+-----------+------+------------- -(0 rows) - -INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); --- simple UPDATE -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; -SELECT symbol FROM limit_orders WHERE id = 246; - symbol --------- - GM -(1 row) - --- simple UPDATE with RETURNING -UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; - id | symbol | bidder_id | placed_at | kind | limit_price ------+--------+-----------+--------------------------+------+------------- - 246 | GM | 162 | Mon Jul 02 16:32:15 2007 | sell | 20.69 -(1 row) - --- expression UPDATE -UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; -SELECT bidder_id FROM limit_orders WHERE id = 246; - bidder_id ------------ - 18 -(1 row) - --- expression UPDATE with RETURNING -UPDATE limit_orders SET bidder_id = 6 * 5 WHERE id = 246 RETURNING *; - id | symbol | bidder_id | placed_at | kind | limit_price ------+--------+-----------+--------------------------+------+------------- - 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | sell | 20.69 -(1 row) - --- multi-column UPDATE -UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; -SELECT kind, limit_price FROM limit_orders WHERE id = 246; - kind | limit_price -------+------------- - buy | 0.00 -(1 row) - --- multi-column UPDATE with RETURNING -UPDATE limit_orders SET (kind, limit_price) = ('buy', 999) WHERE id = 246 RETURNING *; - id | symbol | bidder_id | placed_at | kind | limit_price ------+--------+-----------+--------------------------+------+------------- - 246 | GM | 30 | Mon Jul 02 16:32:15 2007 | buy | 999 -(1 row) - --- Test that on unique contraint violations, we fail fast -INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" -DETAIL: Key (id)=(275) already exists. -CONTEXT: while executing command on localhost:57638 --- Test that shards which miss a modification are marked unhealthy --- First: Connect to the second worker node -\c - - - :worker_2_port --- Second: Move aside limit_orders shard on the second worker node -ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; --- Third: Connect back to master node -\c - - - :master_port --- Fourth: Perform an INSERT on the remaining node -INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -WARNING: relation "public.limit_orders_750000" does not exist -CONTEXT: while executing command on localhost:57638 --- Last: Verify the insert worked but the deleted placement is now unhealthy -SELECT count(*) FROM limit_orders WHERE id = 276; - count -------- - 1 -(1 row) - -SELECT count(*) -FROM pg_dist_shard_placement AS sp, - pg_dist_shard AS s -WHERE sp.shardid = s.shardid -AND sp.nodename = 'localhost' -AND sp.nodeport = :worker_2_port -AND sp.shardstate = 3 -AND s.logicalrelid = 'limit_orders'::regclass; - count -------- - 1 -(1 row) - --- Test that if all shards miss a modification, no state change occurs --- First: Connect to the first worker node -\c - - - :worker_1_port --- Second: Move aside limit_orders shard on the second worker node -ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; --- Third: Connect back to master node -\c - - - :master_port --- Fourth: Perform an INSERT on the remaining node -INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); -ERROR: relation "public.limit_orders_750000" does not exist -CONTEXT: while executing command on localhost:57637 --- Last: Verify worker is still healthy -SELECT count(*) -FROM pg_dist_shard_placement AS sp, - pg_dist_shard AS s -WHERE sp.shardid = s.shardid -AND sp.nodename = 'localhost' -AND sp.nodeport = :worker_1_port -AND sp.shardstate = 1 -AND s.logicalrelid = 'limit_orders'::regclass; - count -------- - 2 -(1 row) - --- Undo our change... --- First: Connect to the first worker node -\c - - - :worker_1_port --- Second: Move aside limit_orders shard on the second worker node -ALTER TABLE renamed_orders RENAME TO limit_orders_750000; --- Third: Connect back to master node -\c - - - :master_port --- attempting to change the partition key is unsupported -UPDATE limit_orders SET id = 0 WHERE id = 246; -ERROR: modifying the partition value of rows is not allowed -UPDATE limit_orders SET id = 0 WHERE id = 0 OR id = 246; -ERROR: modifying the partition value of rows is not allowed --- setting the partition column value to itself is allowed -UPDATE limit_orders SET id = 246 WHERE id = 246; -UPDATE limit_orders SET id = 246 WHERE id = 246 AND symbol = 'GM'; -UPDATE limit_orders SET id = limit_orders.id WHERE id = 246; --- UPDATEs with a FROM clause are unsupported -UPDATE limit_orders SET limit_price = 0.00 FROM bidders - WHERE limit_orders.id = 246 AND - limit_orders.bidder_id = bidders.id AND - bidders.name = 'Bernie Madoff'; -ERROR: relation bidders is not distributed --- should succeed with a CTE -WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43)) -UPDATE limit_orders SET symbol = 'GM'; -SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; - symbol | bidder_id ---------+----------- - GM | 30 -(1 row) - --- updates referencing just a var are supported -UPDATE limit_orders SET bidder_id = id WHERE id = 246; --- updates referencing a column are supported -UPDATE limit_orders SET bidder_id = bidder_id + 1 WHERE id = 246; --- IMMUTABLE functions are allowed -UPDATE limit_orders SET symbol = LOWER(symbol) WHERE id = 246; -SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; - symbol | bidder_id ---------+----------- - gm | 247 -(1 row) - --- IMMUTABLE functions are allowed -- even in returning -UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWER(symbol), symbol; - id | lower | symbol ------+-------+-------- - 246 | gm | GM -(1 row) - -ALTER TABLE limit_orders ADD COLUMN array_of_values integer[]; --- updates referencing STABLE functions are allowed -UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246; --- so are binary operators -UPDATE limit_orders SET array_of_values = 1 || array_of_values WHERE id = 246; -CREATE FUNCTION immutable_append(old_values int[], new_value int) -RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; -\c - - - :worker_1_port -CREATE FUNCTION immutable_append(old_values int[], new_value int) -RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; -\c - - - :worker_2_port -CREATE FUNCTION immutable_append(old_values int[], new_value int) -RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE; -\c - - - :master_port --- immutable function calls with vars are also allowed -UPDATE limit_orders -SET array_of_values = immutable_append(array_of_values, 2) WHERE id = 246; -CREATE FUNCTION stable_append(old_values int[], new_value int) -RETURNS int[] AS $$ BEGIN RETURN old_values || new_value; END; $$ -LANGUAGE plpgsql STABLE; --- but STABLE function calls with vars are not allowed -UPDATE limit_orders -SET array_of_values = stable_append(array_of_values, 3) WHERE id = 246; -ERROR: STABLE functions used in UPDATE queries cannot be called with column references -SELECT array_of_values FROM limit_orders WHERE id = 246; - array_of_values ------------------ - {1,2} -(1 row) - --- STRICT functions work as expected -CREATE FUNCTION temp_strict_func(integer,integer) RETURNS integer AS -'SELECT COALESCE($1, 2) + COALESCE($1, 3);' LANGUAGE SQL STABLE STRICT; -UPDATE limit_orders SET bidder_id = temp_strict_func(1, null) WHERE id = 246; -ERROR: null value in column "bidder_id" violates not-null constraint -DETAIL: Failing row contains (246, GM, null, 2007-07-02 16:32:15, buy, 999, {1,2}). -CONTEXT: while executing command on localhost:57637 -SELECT array_of_values FROM limit_orders WHERE id = 246; - array_of_values ------------------ - {1,2} -(1 row) - -ALTER TABLE limit_orders DROP array_of_values; --- even in RETURNING -UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW(); -ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause --- check that multi-row UPDATE/DELETEs with RETURNING work -INSERT INTO multiple_hash VALUES ('0', '1'); -INSERT INTO multiple_hash VALUES ('0', '2'); -INSERT INTO multiple_hash VALUES ('0', '3'); -INSERT INTO multiple_hash VALUES ('0', '4'); -INSERT INTO multiple_hash VALUES ('0', '5'); -INSERT INTO multiple_hash VALUES ('0', '6'); -UPDATE multiple_hash SET data = data ||'-1' WHERE category = '0' RETURNING *; - category | data -----------+------ - 0 | 1-1 - 0 | 2-1 - 0 | 3-1 - 0 | 4-1 - 0 | 5-1 - 0 | 6-1 -(6 rows) - -DELETE FROM multiple_hash WHERE category = '0' RETURNING *; - category | data -----------+------ - 0 | 1-1 - 0 | 2-1 - 0 | 3-1 - 0 | 4-1 - 0 | 5-1 - 0 | 6-1 -(6 rows) - --- ensure returned row counters are correct -\set QUIET off -INSERT INTO multiple_hash VALUES ('1', '1'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('1', '2'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('1', '3'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('2', '1'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('2', '2'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('2', '3'); -INSERT 0 1 -INSERT INTO multiple_hash VALUES ('2', '3') RETURNING *; - category | data -----------+------ - 2 | 3 -(1 row) - -INSERT 0 1 --- check that update return the right number of rows --- one row -UPDATE multiple_hash SET data = data ||'-1' WHERE category = '1' AND data = '1'; -UPDATE 1 --- three rows -UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1'; -UPDATE 3 --- three rows, with RETURNING -UPDATE multiple_hash SET data = data ||'-2' WHERE category = '1' RETURNING category; - category ----------- - 1 - 1 - 1 -(3 rows) - -UPDATE 3 --- check -SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; - category | data -----------+--------- - 1 | 1-1-2-2 - 1 | 2-2-2 - 1 | 3-2-2 -(3 rows) - --- check that deletes return the right number of rows --- one row -DELETE FROM multiple_hash WHERE category = '2' AND data = '1'; -DELETE 1 --- two rows -DELETE FROM multiple_hash WHERE category = '2'; -DELETE 3 --- three rows, with RETURNING -DELETE FROM multiple_hash WHERE category = '1' RETURNING category; - category ----------- - 1 - 1 - 1 -(3 rows) - -DELETE 3 --- check -SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; - category | data -----------+------ -(0 rows) - -SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; - category | data -----------+------ -(0 rows) - --- verify interaction of default values, SERIAL, and RETURNING -\set QUIET on -CREATE TABLE app_analytics_events (id serial, app_id integer, name text); -SET citus.shard_count TO 4; -SELECT create_distributed_table('app_analytics_events', 'app_id', 'hash'); - create_distributed_table --------------------------- - -(1 row) - -INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; - id ----- - 1 -(1 row) - -INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; - id ----- - 2 -(1 row) - -INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; - id | app_id | name -----+--------+------ - 3 | 103 | Mynt -(1 row) - -DROP TABLE app_analytics_events; --- again with serial in the partition column -CREATE TABLE app_analytics_events (id serial, app_id integer, name text); -SELECT create_distributed_table('app_analytics_events', 'id'); - create_distributed_table --------------------------- - -(1 row) - -INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; - id ----- - 1 -(1 row) - -INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; - id ----- - 2 -(1 row) - -INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; - id | app_id | name -----+--------+------ - 3 | 103 | Mynt -(1 row) - --- Test multi-row insert with serial in the partition column -INSERT INTO app_analytics_events (app_id, name) -VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; - id | app_id | name -----+--------+------ - 5 | 105 | Mynt - 4 | 104 | Wayz -(2 rows) - -INSERT INTO app_analytics_events (id, name) -VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; - id | app_id | name ------+--------+------ - 6 | | Foo - 300 | | Wah -(2 rows) - -PREPARE prep(varchar) AS -INSERT INTO app_analytics_events (id, name) -VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; -EXECUTE prep('version-1'); - id | app_id | name ------+--------+------------- - 7 | | version-1.1 - 400 | | version-1.2 -(2 rows) - -EXECUTE prep('version-2'); - id | app_id | name ------+--------+------------- - 8 | | version-2.1 - 400 | | version-2.2 -(2 rows) - -EXECUTE prep('version-3'); - id | app_id | name ------+--------+------------- - 400 | | version-3.2 - 9 | | version-3.1 -(2 rows) - -EXECUTE prep('version-4'); - id | app_id | name ------+--------+------------- - 10 | | version-4.1 - 400 | | version-4.2 -(2 rows) - -EXECUTE prep('version-5'); - id | app_id | name ------+--------+------------- - 400 | | version-5.2 - 11 | | version-5.1 -(2 rows) - -EXECUTE prep('version-6'); - id | app_id | name ------+--------+------------- - 400 | | version-6.2 - 12 | | version-6.1 -(2 rows) - -SELECT * FROM app_analytics_events ORDER BY id, name; - id | app_id | name ------+--------+----------------- - 1 | 101 | Fauxkemon Geaux - 2 | 102 | Wayz - 3 | 103 | Mynt - 4 | 104 | Wayz - 5 | 105 | Mynt - 6 | | Foo - 7 | | version-1.1 - 8 | | version-2.1 - 9 | | version-3.1 - 10 | | version-4.1 - 11 | | version-5.1 - 12 | | version-6.1 - 300 | | Wah - 400 | | version-1.2 - 400 | | version-2.2 - 400 | | version-3.2 - 400 | | version-4.2 - 400 | | version-5.2 - 400 | | version-6.2 -(19 rows) - -TRUNCATE app_analytics_events; --- Test multi-row insert with a dropped column -ALTER TABLE app_analytics_events DROP COLUMN app_id; -INSERT INTO app_analytics_events (name) -VALUES ('Wayz'), ('Mynt') RETURNING *; - id | name -----+------ - 14 | Mynt - 13 | Wayz -(2 rows) - -SELECT * FROM app_analytics_events ORDER BY id; - id | name -----+------ - 13 | Wayz - 14 | Mynt -(2 rows) - -DROP TABLE app_analytics_events; --- Test multi-row insert with a dropped column before the partition column -CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); -SELECT create_distributed_table('app_analytics_events', 'name', colocate_with => 'none'); - create_distributed_table --------------------------- - -(1 row) - -ALTER TABLE app_analytics_events DROP COLUMN app_id; -INSERT INTO app_analytics_events (name) -VALUES ('Wayz'), ('Mynt') RETURNING *; - id | name -----+------ - 3 | Mynt - 3 | Wayz -(2 rows) - -SELECT * FROM app_analytics_events WHERE name = 'Wayz'; - id | name -----+------ - 3 | Wayz -(1 row) - -DROP TABLE app_analytics_events; --- Test multi-row insert with serial in a reference table -CREATE TABLE app_analytics_events (id serial, app_id integer, name text); -SELECT create_reference_table('app_analytics_events'); - create_reference_table ------------------------- - -(1 row) - -INSERT INTO app_analytics_events (app_id, name) -VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; - id | app_id | name -----+--------+------ - 1 | 104 | Wayz - 2 | 105 | Mynt -(2 rows) - -SELECT * FROM app_analytics_events ORDER BY id; - id | app_id | name -----+--------+------ - 1 | 104 | Wayz - 2 | 105 | Mynt -(2 rows) - -DROP TABLE app_analytics_events; --- Test multi-row insert with serial in a non-partition column -CREATE TABLE app_analytics_events (id int, app_id serial, name text); -SELECT create_distributed_table('app_analytics_events', 'id'); - create_distributed_table --------------------------- - -(1 row) - -INSERT INTO app_analytics_events (id, name) -VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; - name | app_id -------+-------- - Mynt | 2 - Wayz | 1 -(2 rows) - -SELECT * FROM app_analytics_events ORDER BY id; - id | app_id | name -----+--------+------ - 98 | 2 | Mynt - 99 | 1 | Wayz -(2 rows) - -DROP TABLE app_analytics_events; --- test UPDATE with subqueries -CREATE TABLE raw_table (id bigint, value bigint); -CREATE TABLE summary_table ( - id bigint, - min_value numeric, - average_value numeric, - count int, - uniques int); -SELECT create_distributed_table('raw_table', 'id'); - create_distributed_table --------------------------- - -(1 row) - -SELECT create_distributed_table('summary_table', 'id'); - create_distributed_table --------------------------- - -(1 row) - -INSERT INTO raw_table VALUES (1, 100); -INSERT INTO raw_table VALUES (1, 200); -INSERT INTO raw_table VALUES (1, 200); -INSERT INTO raw_table VALUES (1, 300); -INSERT INTO raw_table VALUES (2, 400); -INSERT INTO raw_table VALUES (2, 500); -INSERT INTO summary_table VALUES (1); -INSERT INTO summary_table VALUES (2); -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+---------------+-------+--------- - 1 | | | | - 2 | | | | -(2 rows) - -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 - ) average_query -WHERE id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | | 200.0000000000000000 | | - 2 | | | | -(2 rows) - --- try different syntax -UPDATE summary_table SET (min_value, average_value) = - (SELECT min(value), avg(value) FROM raw_table WHERE id = 2) -WHERE id = 2; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - -UPDATE summary_table SET min_value = 100 - WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value > 100) AND id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- indeed, we don't need filter on UPDATE explicitly if SELECT already prunes to one shard -UPDATE summary_table SET uniques = 2 - WHERE id IN (SELECT id FROM raw_table WHERE id = 1 and value IN (100, 200)); -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | | 2 - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- use inner results for non-partition column -UPDATE summary_table SET uniques = NULL - WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- these should not update anything -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 - ) average_query -WHERE id = 1 AND id = 4; -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 - ) average_query -WHERE id = 1 AND id = 4; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- update with NULL value -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 4 - ) average_query -WHERE id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- multi-shard updates with recursively planned subqueries -BEGIN; -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table) average_query; -ROLLBACK; -BEGIN; -UPDATE summary_table SET average_value = average_value + 1 WHERE id = - (SELECT id FROM raw_table WHERE value > 100 LIMIT 1); -ROLLBACK; --- test complex queries -UPDATE summary_table -SET - uniques = metrics.expensive_uniques, - count = metrics.total_count -FROM - (SELECT - id, - count(DISTINCT (CASE WHEN value > 100 then value end)) AS expensive_uniques, - count(value) AS total_count - FROM raw_table - WHERE id = 1 - GROUP BY id) metrics -WHERE - summary_table.id = metrics.id AND - summary_table.id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | | 4 | 2 - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- test joins -UPDATE summary_table SET count = count + 1 FROM raw_table - WHERE raw_table.id = summary_table.id AND summary_table.id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | | 5 | 2 - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- test with prepared statements -PREPARE prepared_update_with_subquery(int, int) AS - UPDATE summary_table SET count = count + $1 FROM raw_table - WHERE raw_table.id = summary_table.id AND summary_table.id = $2; --- execute 6 times to trigger prepared statement usage -EXECUTE prepared_update_with_subquery(10, 1); -EXECUTE prepared_update_with_subquery(10, 1); -EXECUTE prepared_update_with_subquery(10, 1); -EXECUTE prepared_update_with_subquery(10, 1); -EXECUTE prepared_update_with_subquery(10, 1); -EXECUTE prepared_update_with_subquery(10, 1); -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | | 65 | 2 - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- test with reference tables -CREATE TABLE reference_raw_table (id bigint, value bigint); -CREATE TABLE reference_summary_table ( - id bigint, - min_value numeric, - average_value numeric, - count int, - uniques int); -SELECT create_reference_table('reference_raw_table'); - create_reference_table ------------------------- - -(1 row) - -SELECT create_reference_table('reference_summary_table'); - create_reference_table ------------------------- - -(1 row) - -INSERT INTO reference_raw_table VALUES (1, 100); -INSERT INTO reference_raw_table VALUES (1, 200); -INSERT INTO reference_raw_table VALUES (1, 200); -INSERT INTO reference_raw_table VALUES (1,300), (2, 400), (2,500) RETURNING *; - id | value -----+------- - 1 | 300 - 2 | 400 - 2 | 500 -(3 rows) - -INSERT INTO reference_summary_table VALUES (1); -INSERT INTO reference_summary_table VALUES (2); -SELECT * FROM reference_summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+---------------+-------+--------- - 1 | | | | - 2 | | | | -(2 rows) - -UPDATE reference_summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 - ) average_query -WHERE id = 1; -UPDATE reference_summary_table SET average_value = average_query.average_value FROM ( - SELECT average_value FROM summary_table WHERE id = 1 FOR UPDATE - ) average_query -WHERE id = 1; -ERROR: cannot perform select on a distributed table and modify a reference table -UPDATE reference_summary_table SET (min_value, average_value) = - (SELECT min(value), avg(value) FROM reference_raw_table WHERE id = 2) -WHERE id = 2; -SELECT * FROM reference_summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | | -(2 rows) - --- no need partition colum equalities on reference tables -UPDATE reference_summary_table SET (count) = - (SELECT id AS inner_id FROM reference_raw_table WHERE value = 500) -WHERE min_value = 400; -SELECT * FROM reference_summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | | 200.0000000000000000 | | - 2 | 400 | 450.0000000000000000 | 2 | -(2 rows) - --- can read from a reference table and update a distributed table -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM reference_raw_table WHERE id = 1 - ) average_query -WHERE id = 1; --- cannot read from a distributed table and update a reference table -UPDATE reference_summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 - ) average_query -WHERE id = 1; -ERROR: cannot perform select on a distributed table and modify a reference table -UPDATE reference_summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 AND id = 2 - ) average_query -WHERE id = 1; -ERROR: cannot perform select on a distributed table and modify a reference table --- test master_modify_multiple_shards() with subqueries and expect to fail -SELECT master_modify_multiple_shards(' - UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 1 - ) average_query - WHERE id = 1'); -ERROR: cannot run multi shard modify query with master_modify_multiple_shards when the query involves subquery or join -DETAIL: Execute the query without using master_modify_multiple_shards() --- test connection API via using COPY --- COPY on SELECT part -BEGIN; -\COPY raw_table FROM STDIN WITH CSV -INSERT INTO summary_table VALUES (3); -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 3 - ) average_query -WHERE id = 3; -COMMIT; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | 65 | 2 - 2 | 400 | 450.0000000000000000 | | - 3 | | 150.0000000000000000 | | -(3 rows) - --- COPY on UPDATE part -BEGIN; -INSERT INTO raw_table VALUES (4, 100); -INSERT INTO raw_table VALUES (4, 200); -\COPY summary_table FROM STDIN WITH CSV -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 4 - ) average_query -WHERE id = 4; -COMMIT; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | 65 | 2 - 2 | 400 | 450.0000000000000000 | | - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | -(4 rows) - --- COPY on both part -BEGIN; -\COPY raw_table FROM STDIN WITH CSV -\COPY summary_table FROM STDIN WITH CSV -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM raw_table WHERE id = 5 - ) average_query -WHERE id = 5; -COMMIT; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | 65 | 2 - 2 | 400 | 450.0000000000000000 | | - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | -(5 rows) - --- COPY on reference tables -BEGIN; -\COPY reference_raw_table FROM STDIN WITH CSV -\COPY summary_table FROM STDIN WITH CSV -UPDATE summary_table SET average_value = average_query.average FROM ( - SELECT avg(value) AS average FROM reference_raw_table WHERE id = 6 - ) average_query -WHERE id = 6; -COMMIT; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 1 | 100 | 200.0000000000000000 | 65 | 2 - 2 | 400 | 450.0000000000000000 | | - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | - 6 | | 150.0000000000000000 | | -(6 rows) - --- test DELETE queries -SELECT * FROM raw_table ORDER BY id, value; - id | value -----+------- - 1 | 100 - 1 | 200 - 1 | 200 - 1 | 300 - 2 | 400 - 2 | 500 - 3 | 100 - 3 | 200 - 4 | 100 - 4 | 200 - 5 | 100 - 5 | 200 -(12 rows) - -DELETE FROM summary_table - WHERE min_value IN (SELECT value FROM raw_table WHERE id = 1) AND id = 1; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 2 | 400 | 450.0000000000000000 | | - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | - 6 | | 150.0000000000000000 | | -(5 rows) - --- test with different syntax -DELETE FROM summary_table USING raw_table - WHERE summary_table.id = raw_table.id AND raw_table.id = 2; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | - 6 | | 150.0000000000000000 | | -(4 rows) - --- cannot read from a distributed table and delete from a reference table -DELETE FROM reference_summary_table USING raw_table - WHERE reference_summary_table.id = raw_table.id AND raw_table.id = 3; -ERROR: cannot perform select on a distributed table and modify a reference table -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | - 6 | | 150.0000000000000000 | | -(4 rows) - --- test connection API via using COPY with DELETEs -BEGIN; -\COPY summary_table FROM STDIN WITH CSV -DELETE FROM summary_table USING raw_table - WHERE summary_table.id = raw_table.id AND raw_table.id = 1; -DELETE FROM summary_table USING reference_raw_table - WHERE summary_table.id = reference_raw_table.id AND reference_raw_table.id = 2; -COMMIT; -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+----------------------+-------+--------- - 3 | | 150.0000000000000000 | | - 4 | | 150.0000000000000000 | | - 5 | | 150.0000000000000000 | | - 6 | | 150.0000000000000000 | | -(4 rows) - --- test DELETEs with prepared statements -PREPARE prepared_delete_with_join(int) AS - DELETE FROM summary_table USING raw_table - WHERE summary_table.id = raw_table.id AND raw_table.id = $1; -INSERT INTO raw_table VALUES (6, 100); --- execute 6 times to trigger prepared statement usage -EXECUTE prepared_delete_with_join(1); -EXECUTE prepared_delete_with_join(2); -EXECUTE prepared_delete_with_join(3); -EXECUTE prepared_delete_with_join(4); -EXECUTE prepared_delete_with_join(5); -EXECUTE prepared_delete_with_join(6); -SELECT * FROM summary_table ORDER BY id; - id | min_value | average_value | count | uniques -----+-----------+---------------+-------+--------- -(0 rows) - --- we don't support subqueries in VALUES clause -INSERT INTO summary_table (id) VALUES ((SELECT id FROM summary_table)); -ERROR: subqueries are not supported within INSERT queries -HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. -INSERT INTO summary_table (id) VALUES (5), ((SELECT id FROM summary_table)); -ERROR: subqueries are not supported within INSERT queries -HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. --- similar queries with reference tables -INSERT INTO reference_summary_table (id) VALUES ((SELECT id FROM summary_table)); -ERROR: subqueries are not supported within INSERT queries -HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. -INSERT INTO summary_table (id) VALUES ((SELECT id FROM reference_summary_table)); -ERROR: subqueries are not supported within INSERT queries -HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax. -DROP TABLE raw_table; -DROP TABLE summary_table; -DROP TABLE reference_raw_table; -DROP TABLE reference_summary_table; diff --git a/src/test/regress/expected/multi_modifications_9.out b/src/test/regress/expected/multi_modifications_9.out index 7843a3d92..8367a5b01 100644 --- a/src/test/regress/expected/multi_modifications_9.out +++ b/src/test/regress/expected/multi_modifications_9.out @@ -2,7 +2,7 @@ SET citus.shard_count TO 32; SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; -- some failure messages that comes from the worker nodes --- might change due to parallel exectuions, so supress those +-- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse -- =================================================================== -- test end-to-end modification functionality @@ -102,7 +102,6 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; -RESET citus.task_executor_type; SELECT * FROM range_partitioned WHERE id = 32743; DEBUG: Creating router plan DEBUG: Plan is router executable @@ -120,7 +119,6 @@ DEBUG: Plan is router executable (1 row) SET client_min_messages TO DEFAULT; -SET citus.task_executor_type TO DEFAULT; -- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); @@ -355,9 +353,23 @@ ERROR: relation "public.limit_orders_750000" does not exist \c - - - :worker_2_port -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; --- Connect back to master node -\c - - - :master_port -- Verify the insert failed and both placements are healthy +-- or the insert succeeded and placement marked unhealthy +\c - - - :worker_1_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + count +------- + 0 +(1 row) + +\c - - - :worker_2_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + count +------- + 0 +(1 row) + +\c - - - :master_port SELECT count(*) FROM limit_orders WHERE id = 276; count ------- diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 85e85a2e3..38247bdf0 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -3,7 +3,7 @@ SET citus.next_shard_id TO 750000; SET citus.next_placement_id TO 750000; -- some failure messages that comes from the worker nodes --- might change due to parallel exectuions, so supress those +-- might change due to parallel executions, so suppress those -- using \set VERBOSITY terse -- =================================================================== @@ -88,11 +88,9 @@ INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:5 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; -RESET citus.task_executor_type; SELECT * FROM range_partitioned WHERE id = 32743; SELECT * FROM append_partitioned WHERE id = 414123; SET client_min_messages TO DEFAULT; -SET citus.task_executor_type TO DEFAULT; -- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', @@ -262,10 +260,16 @@ INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; --- Connect back to master node +-- Verify the insert failed and both placements are healthy +-- or the insert succeeded and placement marked unhealthy +\c - - - :worker_1_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + +\c - - - :worker_2_port +SELECT count(*) FROM limit_orders_750000 WHERE id = 276; + \c - - - :master_port --- Verify the insert failed and both placements are healthy SELECT count(*) FROM limit_orders WHERE id = 276; SELECT count(*) From fdc0ef63923e46e8a19fa53ae8c46577fc10563b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jul 2019 20:41:54 +0000 Subject: [PATCH 2/2] Adaptive executor: use 2PC when replication_factor > 1 --- .../distributed/executor/adaptive_executor.c | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 49e37596c..64310c836 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -938,33 +938,14 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution) return true; } - /* - * Checking the first task's placement list is not sufficient for all purposes since - * for append/range distributed tables we might have unequal number of placements for - * shards. However, it is safe to do here, because we're searching for a reference - * table. All other cases return false for this purpose. - */ - task = (Task *) linitial(taskList); if (list_length(task->taskPlacementList) > 1) { /* - * Some tasks don't set replicationModel thus we only - * rely on the anchorShardId, not replicationModel. - * - * TODO: Do we ever need replicationModel in the Task structure? - * Can't we always rely on anchorShardId? + * Adaptive executor opts to error out on queries if a placement is unhealthy, + * not marking the placement itself unhealthy in the process. + * Use 2PC to rollback placements before the unhealthy shard failed. */ - uint64 anchorShardId = task->anchorShardId; - if (anchorShardId != INVALID_SHARD_ID && ReferenceTableShardId(anchorShardId)) - { - return true; - } - - /* - * Single DML/DDL tasks with replicated tables (non-reference) - * should not require BEGIN/COMMIT/ROLLBACK. - */ - return false; + return true; } return false;