ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 750000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 750000; -- =================================================================== -- test end-to-end modification functionality -- =================================================================== CREATE TYPE order_side AS ENUM ('buy', 'sell'); CREATE TABLE limit_orders ( id bigint PRIMARY KEY, symbol text NOT NULL, bidder_id bigint NOT NULL, placed_at timestamp NOT NULL, kind order_side NOT NULL, limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00) ); CREATE TABLE insufficient_shards ( LIKE limit_orders ); CREATE TABLE range_partitioned ( LIKE limit_orders ); CREATE TABLE append_partitioned ( LIKE limit_orders ); SELECT master_create_distributed_table('limit_orders', 'id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_distributed_table('insufficient_shards', 'id', 'hash'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_distributed_table('range_partitioned', 'id', 'range'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_distributed_table('append_partitioned', 'id', 'append'); master_create_distributed_table --------------------------------- (1 row) SELECT master_create_worker_shards('limit_orders', 2, 2); master_create_worker_shards ----------------------------- (1 row) -- make a single shard that covers no partition values SELECT master_create_worker_shards('insufficient_shards', 1, 1); master_create_worker_shards ----------------------------- (1 row) UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 0 WHERE logicalrelid = 'insufficient_shards'::regclass; -- create range-partitioned shards SELECT master_create_empty_shard('range_partitioned') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 49999 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('range_partitioned') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 50000, shardmaxvalue = 99999 WHERE shardid = :new_shard_id; -- create append-partitioned shards SELECT master_create_empty_shard('append_partitioned') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 0, shardmaxvalue = 500000 WHERE shardid = :new_shard_id; SELECT master_create_empty_shard('append_partitioned') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 500000, shardmaxvalue = 1000000 WHERE shardid = :new_shard_id; -- basic single-row INSERT INSERT INTO limit_orders VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 32743; count ------- 1 (1 row) -- try a single-row INSERT with no shard to receive it INSERT INTO insufficient_shards VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); ERROR: distributed modifications must target exactly one shard -- try an insert to a range-partitioned table INSERT INTO range_partitioned VALUES (32743, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -- also insert to an append-partitioned table INSERT INTO append_partitioned VALUES (414123, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); -- ensure the values are where we put them and query to ensure they are properly pruned SET client_min_messages TO 'DEBUG2'; SET citus.task_executor_type TO 'real-time'; SELECT * FROM range_partitioned WHERE id = 32743; DEBUG: predicate pruning for shardId 750004 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price -------+--------+-----------+--------------------------+------+------------- 32743 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 (1 row) SELECT * FROM append_partitioned WHERE id = 414123; DEBUG: predicate pruning for shardId 750006 DEBUG: Plan is router executable id | symbol | bidder_id | placed_at | kind | limit_price --------+--------+-----------+--------------------------+------+------------- 414123 | AAPL | 9580 | Tue Oct 19 10:23:54 2004 | buy | 20.69 (1 row) SET client_min_messages TO DEFAULT; SET citus.task_executor_type TO DEFAULT; -- try inserting without a range-partitioned shard to receive the value INSERT INTO range_partitioned VALUES (999999, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); ERROR: distributed modifications must target exactly one shard -- and insert into an append-partitioned table with a value that spans shards: INSERT INTO append_partitioned VALUES (500000, 'AAPL', 9580, '2004-10-19 10:23:54', 'buy', 20.69); ERROR: distributed modifications must target exactly one shard -- INSERT with DEFAULT in the target list INSERT INTO limit_orders VALUES (12756, 'MSFT', 10959, '2013-05-08 07:29:23', 'sell', DEFAULT); SELECT COUNT(*) FROM limit_orders WHERE id = 12756; count ------- 1 (1 row) -- INSERT with expressions in target list INSERT INTO limit_orders VALUES (430, upper('ibm'), 214, timestamp '2003-01-28 10:31:17' + interval '5 hours', 'buy', sqrt(2)); SELECT COUNT(*) FROM limit_orders WHERE id = 430; count ------- 1 (1 row) -- INSERT without partition key INSERT INTO limit_orders DEFAULT VALUES; ERROR: cannot plan INSERT using row with NULL value in partition column -- squelch WARNINGs that contain worker_port SET client_min_messages TO ERROR; -- INSERT violating NOT NULL constraint INSERT INTO limit_orders VALUES (NULL, 'T', 975234, DEFAULT); ERROR: cannot plan INSERT using row with NULL value in partition column -- INSERT violating column constraint INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell', -5.00); ERROR: new row for relation "limit_orders_750000" violates check constraint "limit_orders_limit_price_check" DETAIL: Failing row contains (18811, BUD, 14962, 2014-04-05 08:32:16, sell, -5.00). CONTEXT: while executing command on localhost:57637 -- INSERT violating primary key constraint INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58); ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" DETAIL: Key (id)=(32743) already exists. CONTEXT: while executing command on localhost:57638 SET client_min_messages TO DEFAULT; -- commands with non-constant partition values are unsupported INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45', 'sell', 0.58); ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with expressions that cannot be collapsed are unsupported INSERT INTO limit_orders VALUES (2036, 'GOOG', 5634, now(), 'buy', random()); ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with mutable functions in their quals DELETE FROM limit_orders WHERE id = 246 AND bidder_id = (random() * 1000); ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with mutable but non-volatilte functions(ie: stable func.) in their quals DELETE FROM limit_orders WHERE id = 246 AND placed_at = current_timestamp; ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- commands with multiple rows are unsupported INSERT INTO limit_orders VALUES (DEFAULT), (DEFAULT); ERROR: cannot perform distributed planning for the given modification DETAIL: Multi-row INSERTs to distributed tables are not supported. -- INSERT ... SELECT ... FROM commands are unsupported INSERT INTO limit_orders SELECT * FROM limit_orders; ERROR: cannot perform distributed planning for the given modifications DETAIL: Subqueries are not supported in distributed modifications. -- commands with a RETURNING clause are unsupported INSERT INTO limit_orders VALUES (7285, 'AMZN', 3278, '2016-01-05 02:07:36', 'sell', 0.00) RETURNING *; ERROR: cannot perform distributed planning for the given modification DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *) INSERT INTO limit_orders DEFAULT VALUES; ERROR: cannot perform distributed planning for the given modification DETAIL: Common table expressions are not supported in distributed modifications. -- test simple DELETE INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; count ------- 1 (1 row) DELETE FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246; count ------- 0 (1 row) -- DELETE with expression in WHERE clause INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); SELECT COUNT(*) FROM limit_orders WHERE id = 246; count ------- 1 (1 row) DELETE FROM limit_orders WHERE id = (2 * 123); SELECT COUNT(*) FROM limit_orders WHERE id = 246; count ------- 0 (1 row) -- commands with no constraints on the partition key are not supported DELETE FROM limit_orders WHERE bidder_id = 162; ERROR: distributed modifications must target exactly one shard -- commands with a USING clause are unsupported CREATE TABLE bidders ( name text, id bigint ); DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations -- commands with a RETURNING clause are unsupported DELETE FROM limit_orders WHERE id = 246 RETURNING *; ERROR: cannot perform distributed planning for the given modification DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) DELETE FROM limit_orders; ERROR: cannot perform distributed planning for the given modification DETAIL: Common table expressions are not supported in distributed modifications. -- cursors are not supported DELETE FROM limit_orders WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); -- simple UPDATE UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; SELECT symbol FROM limit_orders WHERE id = 246; symbol -------- GM (1 row) -- expression UPDATE UPDATE limit_orders SET bidder_id = 6 * 3 WHERE id = 246; SELECT bidder_id FROM limit_orders WHERE id = 246; bidder_id ----------- 18 (1 row) -- multi-column UPDATE UPDATE limit_orders SET (kind, limit_price) = ('buy', DEFAULT) WHERE id = 246; SELECT kind, limit_price FROM limit_orders WHERE id = 246; kind | limit_price ------+------------- buy | 0.00 (1 row) -- Test that on unique contraint violations, we fail fast INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); INSERT INTO limit_orders VALUES (275, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); ERROR: duplicate key value violates unique constraint "limit_orders_pkey_750001" DETAIL: Key (id)=(275) already exists. CONTEXT: while executing command on localhost:57638 -- Test that shards which miss a modification are marked unhealthy -- First: Connect to the second worker node \c - - - :worker_2_port -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port -- Fourth: Perform an INSERT on the remaining node INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); WARNING: relation "limit_orders_750000" does not exist CONTEXT: while executing command on localhost:57638 -- Last: Verify the insert worked but the deleted placement is now unhealthy SELECT count(*) FROM limit_orders WHERE id = 276; count ------- 1 (1 row) SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.nodename = 'localhost' AND sp.nodeport = :worker_2_port AND sp.shardstate = 3 AND s.logicalrelid = 'limit_orders'::regclass; count ------- 1 (1 row) -- Test that if all shards miss a modification, no state change occurs -- First: Connect to the first worker node \c - - - :worker_1_port -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE limit_orders_750000 RENAME TO renamed_orders; -- Third: Connect back to master node \c - - - :master_port -- Fourth: Perform an INSERT on the remaining node INSERT INTO limit_orders VALUES (276, 'ADR', 140, '2007-07-02 16:32:15', 'sell', 43.67); WARNING: relation "limit_orders_750000" does not exist CONTEXT: while executing command on localhost:57637 ERROR: could not modify any active placements -- Last: Verify worker is still healthy SELECT count(*) FROM pg_dist_shard_placement AS sp, pg_dist_shard AS s WHERE sp.shardid = s.shardid AND sp.nodename = 'localhost' AND sp.nodeport = :worker_1_port AND sp.shardstate = 1 AND s.logicalrelid = 'limit_orders'::regclass; count ------- 2 (1 row) -- Undo our change... -- First: Connect to the first worker node \c - - - :worker_1_port -- Second: Move aside limit_orders shard on the second worker node ALTER TABLE renamed_orders RENAME TO limit_orders_750000; -- Third: Connect back to master node \c - - - :master_port -- commands with no constraints on the partition key are not supported UPDATE limit_orders SET limit_price = 0.00; ERROR: distributed modifications must target exactly one shard -- attempting to change the partition key is unsupported UPDATE limit_orders SET id = 0 WHERE id = 246; ERROR: modifying the partition value of rows is not allowed -- UPDATEs with a FROM clause are unsupported UPDATE limit_orders SET limit_price = 0.00 FROM bidders WHERE limit_orders.id = 246 AND limit_orders.bidder_id = bidders.id AND bidders.name = 'Bernie Madoff'; ERROR: cannot plan queries that include both regular and partitioned relations -- commands with a RETURNING clause are unsupported UPDATE limit_orders SET symbol = 'GM' WHERE id = 246 RETURNING *; ERROR: cannot perform distributed planning for the given modification DETAIL: RETURNING clauses are not supported in distributed modifications. -- commands containing a CTE are unsupported WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) UPDATE limit_orders SET symbol = 'GM'; ERROR: cannot perform distributed planning for the given modification DETAIL: Common table expressions are not supported in distributed modifications. SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- GM | 18 (1 row) -- updates referencing just a var are supported UPDATE limit_orders SET bidder_id = id WHERE id = 246; -- updates referencing a column are supported UPDATE limit_orders SET bidder_id = bidder_id + 1 WHERE id = 246; -- IMMUTABLE functions are allowed UPDATE limit_orders SET symbol = LOWER(symbol) WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; symbol | bidder_id --------+----------- gm | 247 (1 row) -- updates referencing non-IMMUTABLE functions are unsupported UPDATE limit_orders SET placed_at = now() WHERE id = 246; ERROR: functions used in modification queries on distributed tables must be marked IMMUTABLE -- cursors are not supported UPDATE limit_orders SET symbol = 'GM' WHERE CURRENT OF cursor_name; ERROR: distributed modifications must target exactly one shard