Make 8 more tests runnable multiple times via run_test.py (#6791)

Soon I will be doing some changes related to #692 in router planner
and those changes require updating ~5/6 tests related to router
planning. And to make those test files runnable by run_test.py
multiple times, we need to make some other tests (that they're
run in parallel / they badly depend on) ready for run_test.py too.
pull/6792/head
Onur Tirtir 2023-03-27 12:19:06 +03:00 committed by GitHub
parent da7db53c87
commit 372a93b529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 218 additions and 99 deletions

View File

@ -109,6 +109,10 @@ if __name__ == "__main__":
"multi_mx_function_table_reference",
],
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_simple_queries": TestDeps("base_schedule"),
}
if not (test_file_name or test_file_path):

View File

@ -3,6 +3,14 @@
-- create, distribute, INSERT, SELECT and UPDATE
-- ===================================================================
SET citus.next_shard_id TO 530000;
-- Given that other test files depend on the existence of types created in this file,
-- we cannot drop them at the end. Instead, we drop them at the beginning of the test
-- to make this file runnable multiple times via run_test.py.
BEGIN;
SET LOCAL client_min_messages TO WARNING;
DROP TYPE IF EXISTS test_composite_type, other_composite_type, bug_status CASCADE;
DROP OPERATOR FAMILY IF EXISTS cats_op_fam USING hash;
COMMIT;
-- create a custom type...
CREATE TYPE test_composite_type AS (
i integer,

View File

@ -1,5 +1,7 @@
SET citus.next_shard_id TO 1200000;
SET citus.next_placement_id TO 1200000;
CREATE SCHEMA multi_modifying_xacts;
SET search_path TO multi_modifying_xacts;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
@ -190,7 +192,7 @@ ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- but the DDL should correctly roll back
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='multi_modifying_xacts.labs'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
id | bigint | not null
@ -339,7 +341,7 @@ CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
END;
$rli$ LANGUAGE plpgsql;
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
SELECT * FROM run_command_on_placements('multi_modifying_xacts.researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE multi_modifying_xacts.reject_large_id()')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
@ -498,6 +500,7 @@ AND s.logicalrelid = 'objects'::regclass;
-- create trigger on one worker to reject certain values
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
@ -514,6 +517,7 @@ AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- test partial failure; worker_1 succeeds, 2 fails
-- in this case, we expect the transaction to abort
\set VERBOSITY terse
@ -551,6 +555,7 @@ DELETE FROM objects;
-- there cannot be errors on different shards at different times
-- because the first failure will fail the whole transaction
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
@ -567,6 +572,7 @@ AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
INSERT INTO objects VALUES (2, 'BAD');
@ -602,12 +608,14 @@ AND (s.logicalrelid = 'objects'::regclass OR
-- what if the failures happen at COMMIT time?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON objects_1200003
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- should be the same story as before, just at COMMIT time
-- as we use 2PC, the transaction is rollbacked
BEGIN;
@ -644,12 +652,14 @@ WHERE sp.shardid = s.shardid
AND s.logicalrelid = 'objects'::regclass;
-- what if all nodes have failures at COMMIT time?
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON labs_1200002;
CREATE CONSTRAINT TRIGGER reject_bad
AFTER INSERT ON labs_1200002
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- reduce the log level for differences between PG14 and PG15
-- in PGconn->errorMessage
-- relevant PG commit b15f254466aefbabcbed001929f6e09db59fd158
@ -688,8 +698,10 @@ AND (s.logicalrelid = 'objects'::regclass OR
-- what if one shard (objects) succeeds but another (labs) completely fails?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200004;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
@ -833,6 +845,7 @@ SELECT * FROM reference_modifying_xacts;
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
@ -849,6 +862,7 @@ AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
@ -860,12 +874,14 @@ ERROR: illegal value
COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
@ -890,8 +906,10 @@ ORDER BY s.logicalrelid, sp.shardstate;
-- for the time-being drop the constraint
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
-- distributed table
@ -923,6 +941,7 @@ INSERT INTO hash_modifying_xacts VALUES (2, 2);
ABORT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
@ -939,6 +958,7 @@ AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
@ -955,6 +975,7 @@ SELECT * FROM reference_modifying_xacts WHERE key = 55;
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
CREATE CONSTRAINT TRIGGER reject_bad_hash
@ -962,6 +983,7 @@ AFTER INSERT ON hash_modifying_xacts_1200007
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
BEGIN;
@ -994,11 +1016,13 @@ ORDER BY s.logicalrelid, sp.shardstate;
-- and ensure that hash distributed table's
-- change is rollbacked as well
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
BEGIN;
-- to expand participant to include all worker nodes
@ -1127,8 +1151,10 @@ SELECT count(*) FROM pg_dist_transaction;
-- in which we'll make the remote host unavailable
-- first create the new user on all nodes
CREATE USER test_user;
GRANT ALL ON SCHEMA multi_modifying_xacts TO test_user;
-- now connect back to the master with the new user
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200015;
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
@ -1148,21 +1174,24 @@ SELECT create_distributed_table('numbers_hash_failure_test', 'key');
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.override_table_visibility TO false;
\dt reference_failure_test_1200015
List of relations
Schema | Name | Type | Owner
List of relations
Schema | Name | Type | Owner
---------------------------------------------------------------------
public | reference_failure_test_1200015 | table | test_user
multi_modifying_xacts | reference_failure_test_1200015 | table | test_user
(1 row)
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
NOTICE: not propagating ALTER ROLE ... RENAME TO commands to worker nodes
-- connect back to master and query the reference table
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
@ -1277,14 +1306,17 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
-- break the other node as well
\c - :default_user - :worker_2_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
NOTICE: not propagating ALTER ROLE ... RENAME TO commands to worker nodes
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- fails on all shard placements
INSERT INTO numbers_hash_failure_test VALUES (2,2);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" does not exist
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200020;
SET citus.next_placement_id TO 1200033;
-- unbreak both nodes by renaming the user back to the original name
@ -1297,6 +1329,7 @@ SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_us
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
reference_failure_test, numbers_hash_failure_test;
REVOKE ALL ON SCHEMA multi_modifying_xacts FROM test_user;
DROP USER test_user;
-- set up foreign keys to test transactions with co-located and reference tables
BEGIN;
@ -1322,7 +1355,9 @@ SELECT create_reference_table('itemgroups');
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS users ;
RESET client_min_messages;
CREATE TABLE users (
id int PRIMARY KEY,
name text,
@ -1354,18 +1389,18 @@ JOIN
USING (shardid)
ORDER BY
id;
id | shard_name | nodename | nodeport
id | shard_name | nodename | nodeport
---------------------------------------------------------------------
1 | users_1200022 | localhost | 57637
2 | users_1200025 | localhost | 57638
3 | users_1200023 | localhost | 57638
4 | users_1200023 | localhost | 57638
5 | users_1200022 | localhost | 57637
6 | users_1200024 | localhost | 57637
7 | users_1200023 | localhost | 57638
8 | users_1200022 | localhost | 57637
9 | users_1200025 | localhost | 57638
10 | users_1200022 | localhost | 57637
1 | multi_modifying_xacts.users_1200022 | localhost | 57637
2 | multi_modifying_xacts.users_1200025 | localhost | 57638
3 | multi_modifying_xacts.users_1200023 | localhost | 57638
4 | multi_modifying_xacts.users_1200023 | localhost | 57638
5 | multi_modifying_xacts.users_1200022 | localhost | 57637
6 | multi_modifying_xacts.users_1200024 | localhost | 57637
7 | multi_modifying_xacts.users_1200023 | localhost | 57638
8 | multi_modifying_xacts.users_1200022 | localhost | 57637
9 | multi_modifying_xacts.users_1200025 | localhost | 57638
10 | multi_modifying_xacts.users_1200022 | localhost | 57637
(10 rows)
END;
@ -1546,5 +1581,5 @@ SELECT name FROM labs WHERE id = 1001;
(1 row)
RESET citus.function_opens_transaction_block;
DROP FUNCTION insert_abort();
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_modifying_xacts CASCADE;

View File

@ -1,6 +1,10 @@
--
-- MULTI_MX_COPY_DATA
--
-- We truncate them to make this test runnable multiple times.
-- Note that we cannot do that at the end of the test because
-- we need to keep the data for the other tests.
TRUNCATE lineitem_mx, orders_mx;
\set nation_data_file :abs_srcdir '/data/nation.data'
\set client_side_copy_command '\\copy nation_hash FROM ' :'nation_data_file' ' with delimiter '''|''';'
:client_side_copy_command
@ -161,3 +165,4 @@ SET search_path TO public;
:client_side_copy_command
\set client_side_copy_command '\\copy supplier_mx FROM ' :'supplier_data_file' ' with delimiter '''|''';'
:client_side_copy_command
DROP TABLE citus_mx_test_schema.nation_hash_replicated;

View File

@ -406,3 +406,6 @@ SELECT * FROM labs_mx WHERE id = 8;
---------------------------------------------------------------------
(0 rows)
TRUNCATE objects_mx, labs_mx, researchers_mx;
DROP TRIGGER reject_bad_mx ON labs_mx_1220102;
DROP FUNCTION reject_bad_mx;

View File

@ -1460,3 +1460,7 @@ DEBUG: query has a single distribution column value: 1
51
(6 rows)
SET client_min_messages to WARNING;
TRUNCATE articles_hash_mx, company_employees_mx, articles_single_shard_hash_mx;
DROP MATERIALIZED VIEW mv_articles_hash_mx_error;
DROP TABLE authors_hash_mx;

View File

@ -6,6 +6,8 @@ SET citus.next_shard_id TO 840000;
-- router planner, so we're disabling it in this file. We've bunch of
-- other tests that triggers fast-path-router planner
SET citus.enable_fast_path_router_planner TO false;
CREATE SCHEMA multi_router_planner;
SET search_path TO multi_router_planner;
CREATE TABLE articles_hash (
id bigint NOT NULL,
author_id bigint NOT NULL,
@ -290,10 +292,10 @@ WITH first_author AS MATERIALIZED (
UPDATE articles_hash SET title = first_author.name
FROM first_author WHERE articles_hash.author_id = 2 AND articles_hash.id = first_author.id;
DEBUG: Router planner doesn't support SELECT FOR UPDATE in common table expressions involving reference tables.
DEBUG: generating subplan XXX_1 for CTE first_author: SELECT articles_hash.id, auref.name FROM public.articles_hash, public.authors_reference auref WHERE ((articles_hash.author_id OPERATOR(pg_catalog.=) 2) AND (auref.id OPERATOR(pg_catalog.=) articles_hash.author_id)) FOR UPDATE OF articles_hash FOR UPDATE OF auref
DEBUG: generating subplan XXX_1 for CTE first_author: SELECT articles_hash.id, auref.name FROM multi_router_planner.articles_hash, multi_router_planner.authors_reference auref WHERE ((articles_hash.author_id OPERATOR(pg_catalog.=) 2) AND (auref.id OPERATOR(pg_catalog.=) articles_hash.author_id)) FOR UPDATE OF articles_hash FOR UPDATE OF auref
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE public.articles_hash SET title = first_author.name FROM (SELECT intermediate_result.id, intermediate_result.name FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name character varying(20))) first_author WHERE ((articles_hash.author_id OPERATOR(pg_catalog.=) 2) AND (articles_hash.id OPERATOR(pg_catalog.=) first_author.id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE multi_router_planner.articles_hash SET title = first_author.name FROM (SELECT intermediate_result.id, intermediate_result.name FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name character varying(20))) first_author WHERE ((articles_hash.author_id OPERATOR(pg_catalog.=) 2) AND (articles_hash.id OPERATOR(pg_catalog.=) first_author.id))
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
WITH first_author AS MATERIALIZED (
@ -356,10 +358,10 @@ WITH id_author AS MATERIALIZED ( SELECT id, author_id FROM articles_hash WHERE a
id_title AS MATERIALIZED (SELECT id, title from articles_hash WHERE author_id = 2)
SELECT * FROM id_author, id_title WHERE id_author.id = id_title.id;
DEBUG: cannot run command which targets multiple shards
DEBUG: generating subplan XXX_1 for CTE id_author: SELECT id, author_id FROM public.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 1)
DEBUG: generating subplan XXX_1 for CTE id_author: SELECT id, author_id FROM multi_router_planner.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 1)
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: generating subplan XXX_2 for CTE id_title: SELECT id, title FROM public.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 2)
DEBUG: generating subplan XXX_2 for CTE id_title: SELECT id, title FROM multi_router_planner.articles_hash WHERE (author_id OPERATOR(pg_catalog.=) 2)
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id_author.id, id_author.author_id, id_title.id, id_title.title FROM (SELECT intermediate_result.id, intermediate_result.author_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint)) id_author, (SELECT intermediate_result.id, intermediate_result.title FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, title character varying(20))) id_title WHERE (id_author.id OPERATOR(pg_catalog.=) id_title.id)
@ -456,7 +458,7 @@ WITH new_article AS MATERIALIZED(
)
SELECT * FROM new_article;
DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned
DEBUG: generating subplan XXX_1 for CTE new_article: INSERT INTO public.articles_hash (id, author_id, title, word_count) VALUES (1, 1, 'arsenous'::character varying, 9) RETURNING id, author_id, title, word_count
DEBUG: generating subplan XXX_1 for CTE new_article: INSERT INTO multi_router_planner.articles_hash (id, author_id, title, word_count) VALUES (1, 1, 'arsenous'::character varying, 9) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) new_article
@ -471,7 +473,7 @@ WITH update_article AS MATERIALIZED(
)
SELECT * FROM update_article;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash SET word_count = 10 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 9)) RETURNING id, author_id, title, word_count
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE multi_router_planner.articles_hash SET word_count = 10 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 9)) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) update_article
DEBUG: Creating router plan
@ -485,7 +487,7 @@ WITH update_article AS MATERIALIZED (
)
SELECT coalesce(1,random());
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.articles_hash SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE multi_router_planner.articles_hash SET word_count = 11 WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce"
DEBUG: Creating router plan
@ -510,7 +512,7 @@ WITH update_article AS MATERIALIZED (
)
SELECT coalesce(1,random());
DEBUG: cannot router plan modification of a non-distributed table
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE public.authors_reference SET name = ''::character varying WHERE (id OPERATOR(pg_catalog.=) 0) RETURNING name, id
DEBUG: generating subplan XXX_1 for CTE update_article: UPDATE multi_router_planner.authors_reference SET name = ''::character varying WHERE (id OPERATOR(pg_catalog.=) 0) RETURNING name, id
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT COALESCE((1)::double precision, random()) AS "coalesce"
DEBUG: Creating router plan
@ -524,7 +526,7 @@ WITH delete_article AS MATERIALIZED (
)
SELECT * FROM delete_article;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for CTE delete_article: DELETE FROM public.articles_hash WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: generating subplan XXX_1 for CTE delete_article: DELETE FROM multi_router_planner.articles_hash WHERE ((id OPERATOR(pg_catalog.=) 1) AND (word_count OPERATOR(pg_catalog.=) 10)) RETURNING id, author_id, title, word_count
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, author_id, title, word_count FROM (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) delete_article
DEBUG: Creating router plan
@ -653,8 +655,8 @@ FROM articles_hash, (SELECT id, word_count FROM articles_hash) AS test WHERE tes
ORDER BY test.word_count DESC, articles_hash.id LIMIT 5;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT id, word_count FROM public.articles_hash
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE (test.id OPERATOR(pg_catalog.=) articles_hash.id) ORDER BY test.word_count DESC, articles_hash.id LIMIT 5
DEBUG: generating subplan XXX_1 for subquery SELECT id, word_count FROM multi_router_planner.articles_hash
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM multi_router_planner.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE (test.id OPERATOR(pg_catalog.=) articles_hash.id) ORDER BY test.word_count DESC, articles_hash.id LIMIT 5
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: push down of limit count: 5
id | word_count
@ -672,8 +674,8 @@ WHERE test.id = articles_hash.id and articles_hash.author_id = 1
ORDER BY articles_hash.id;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT id, word_count FROM public.articles_hash
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM public.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE ((test.id OPERATOR(pg_catalog.=) articles_hash.id) AND (articles_hash.author_id OPERATOR(pg_catalog.=) 1)) ORDER BY articles_hash.id
DEBUG: generating subplan XXX_1 for subquery SELECT id, word_count FROM multi_router_planner.articles_hash
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT articles_hash.id, test.word_count FROM multi_router_planner.articles_hash, (SELECT intermediate_result.id, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, word_count integer)) test WHERE ((test.id OPERATOR(pg_catalog.=) articles_hash.id) AND (articles_hash.author_id OPERATOR(pg_catalog.=) 1)) ORDER BY articles_hash.id
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
id | word_count
@ -788,9 +790,9 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
WHERE a.author_id = 2 and a.author_id = b.author_id
LIMIT 3;
DEBUG: found no worker with all shard placements
DEBUG: generating subplan XXX_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM public.articles_single_shard_hash
DEBUG: generating subplan XXX_1 for CTE single_shard: SELECT id, author_id, title, word_count FROM multi_router_planner.articles_single_shard_hash
DEBUG: Creating router plan
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a.author_id AS first_author, b.word_count AS second_word_count FROM public.articles_hash a, (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) b WHERE ((a.author_id OPERATOR(pg_catalog.=) 2) AND (a.author_id OPERATOR(pg_catalog.=) b.author_id)) LIMIT 3
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a.author_id AS first_author, b.word_count AS second_word_count FROM multi_router_planner.articles_hash a, (SELECT intermediate_result.id, intermediate_result.author_id, intermediate_result.title, intermediate_result.word_count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, author_id bigint, title character varying(20), word_count integer)) b WHERE ((a.author_id OPERATOR(pg_catalog.=) 2) AND (a.author_id OPERATOR(pg_catalog.=) b.author_id)) LIMIT 3
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
first_author | second_word_count
@ -1575,10 +1577,10 @@ SELECT 1 FROM authors_reference r JOIN (
) num_db ON (r.id = num_db.datid) LIMIT 1;
DEBUG: found no worker with all shard placements
DEBUG: function does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM public.number1() s(datid)
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM multi_router_planner.number1() s(datid)
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT s.datid FROM ((SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) s LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (multi_router_planner.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
DEBUG: Creating router plan
?column?
---------------------------------------------------------------------
@ -1590,10 +1592,10 @@ SELECT s.datid FROM number1() s LEFT JOIN pg_database d ON s.datid = d.oid;
SELECT 1 FROM authors_reference r JOIN num_db ON (r.id = num_db.datid) LIMIT 1;
DEBUG: found no worker with all shard placements
DEBUG: function does not have co-located tables
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM public.number1() s(datid)
DEBUG: generating subplan XXX_1 for subquery SELECT datid FROM multi_router_planner.number1() s(datid)
DEBUG: Creating router plan
DEBUG: generating subplan XXX_2 for subquery SELECT s.datid FROM ((SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) s LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (multi_router_planner.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) num_db ON ((r.id OPERATOR(pg_catalog.=) num_db.datid))) LIMIT 1
DEBUG: Creating router plan
?column?
---------------------------------------------------------------------
@ -1603,8 +1605,8 @@ DEBUG: Creating router plan
WITH cte AS MATERIALIZED (SELECT * FROM num_db)
SELECT 1 FROM authors_reference r JOIN cte ON (r.id = cte.datid) LIMIT 1;
DEBUG: found no worker with all shard placements
DEBUG: generating subplan XXX_1 for CTE cte: SELECT datid FROM (SELECT s.datid FROM (public.number1() s(datid) LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))) num_db
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (public.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) cte ON ((r.id OPERATOR(pg_catalog.=) cte.datid))) LIMIT 1
DEBUG: generating subplan XXX_1 for CTE cte: SELECT datid FROM (SELECT s.datid FROM (multi_router_planner.number1() s(datid) LEFT JOIN pg_database d ON (((s.datid)::oid OPERATOR(pg_catalog.=) d.oid)))) num_db
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT 1 FROM (multi_router_planner.authors_reference r JOIN (SELECT intermediate_result.datid FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(datid integer)) cte ON ((r.id OPERATOR(pg_catalog.=) cte.datid))) LIMIT 1
DEBUG: Creating router plan
?column?
---------------------------------------------------------------------
@ -1769,7 +1771,7 @@ SET citus.log_remote_commands TO on;
-- single shard select queries are router plannable
SELECT * FROM articles_range where author_id = 1;
DEBUG: Creating router plan
NOTICE: issuing SELECT id, author_id, title, word_count FROM public.articles_range_840012 articles_range WHERE (author_id OPERATOR(pg_catalog.=) 1)
NOTICE: issuing SELECT id, author_id, title, word_count FROM multi_router_planner.articles_range_840012 articles_range WHERE (author_id OPERATOR(pg_catalog.=) 1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | author_id | title | word_count
---------------------------------------------------------------------
@ -1777,7 +1779,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SELECT * FROM articles_range where author_id = 1 or author_id = 5;
DEBUG: Creating router plan
NOTICE: issuing SELECT id, author_id, title, word_count FROM public.articles_range_840012 articles_range WHERE ((author_id OPERATOR(pg_catalog.=) 1) OR (author_id OPERATOR(pg_catalog.=) 5))
NOTICE: issuing SELECT id, author_id, title, word_count FROM multi_router_planner.articles_range_840012 articles_range WHERE ((author_id OPERATOR(pg_catalog.=) 1) OR (author_id OPERATOR(pg_catalog.=) 5))
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | author_id | title | word_count
---------------------------------------------------------------------
@ -1795,7 +1797,7 @@ NOTICE: executing the command locally: SELECT id, author_id, title, word_count
SELECT * FROM articles_range ar join authors_range au on (ar.author_id = au.id)
WHERE ar.author_id = 1;
DEBUG: Creating router plan
NOTICE: issuing SELECT ar.id, ar.author_id, ar.title, ar.word_count, au.name, au.id FROM (public.articles_range_840012 ar JOIN public.authors_range_840008 au ON ((ar.author_id OPERATOR(pg_catalog.=) au.id))) WHERE (ar.author_id OPERATOR(pg_catalog.=) 1)
NOTICE: issuing SELECT ar.id, ar.author_id, ar.title, ar.word_count, au.name, au.id FROM (multi_router_planner.articles_range_840012 ar JOIN multi_router_planner.authors_range_840008 au ON ((ar.author_id OPERATOR(pg_catalog.=) au.id))) WHERE (ar.author_id OPERATOR(pg_catalog.=) 1)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
id | author_id | title | word_count | name | id
---------------------------------------------------------------------
@ -2433,12 +2435,15 @@ SELECT create_distributed_table('failure_test', 'a', 'hash');
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
GRANT USAGE ON SCHEMA multi_router_planner TO router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA multi_router_planner TO router_user;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
GRANT USAGE ON SCHEMA multi_router_planner TO router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA multi_router_planner TO router_user;
\c - router_user - :master_port
SET search_path TO multi_router_planner;
-- we will fail to connect to worker 2, since the user does not exist
-- still, we never mark placements inactive. Instead, fail the transaction
BEGIN;
@ -2452,7 +2457,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
ORDER BY shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------------------------------------------------------------------
840017 | 1 | localhost | 57637
@ -2471,18 +2476,5 @@ DROP USER router_user;
\c - - - :master_port
DROP OWNED BY router_user;
DROP USER router_user;
DROP TABLE failure_test;
DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count();
DROP MATERIALIZED VIEW mv_articles_hash_empty;
DROP MATERIALIZED VIEW mv_articles_hash_data;
DROP VIEW num_db;
DROP FUNCTION number1();
DROP TABLE articles_hash;
DROP TABLE articles_single_shard_hash;
DROP TABLE authors_hash;
DROP TABLE authors_range;
DROP TABLE authors_reference;
DROP TABLE company_employees;
DROP TABLE articles_range;
DROP TABLE articles_append;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_router_planner CASCADE;

View File

@ -7,6 +7,8 @@ SET citus.coordinator_aggregation_strategy TO 'disabled';
-- ===================================================================
-- test end-to-end query functionality
-- ===================================================================
CREATE SCHEMA simple_queries_test;
SET search_path TO simple_queries_test;
CREATE TABLE articles (
id bigint NOT NULL,
author_id bigint NOT NULL,
@ -382,7 +384,7 @@ SELECT author_id FROM articles
8
(3 rows)
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM public.orders
GROUP BY o_orderstatus
HAVING count(*) > 1450 OR avg(o_totalprice) > 150000
ORDER BY o_orderstatus;
@ -392,7 +394,7 @@ SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders
P | 75 | 164847.914533333333
(2 rows)
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM public.lineitem, public.orders
WHERE l_orderkey = o_orderkey AND l_orderkey > 9030
GROUP BY o_orderstatus
HAVING sum(l_linenumber) > 1000
@ -541,7 +543,7 @@ DEBUG: query has a single distribution column value: 2
-- error out on unsupported aggregate
SET client_min_messages to 'NOTICE';
CREATE AGGREGATE public.invalid(int) (
CREATE AGGREGATE invalid(int) (
sfunc = int4pl,
stype = int
);
@ -812,10 +814,11 @@ SELECT * FROM (SELECT nextval('query_seq') FROM articles LIMIT 3) vals;
(3 rows)
-- but not elsewhere
SELECT sum(nextval('query_seq')) FROM articles;
ERROR: relation "public.query_seq" does not exist
SELECT sum(nextval('simple_queries_test.query_seq')) FROM articles;
ERROR: relation "simple_queries_test.query_seq" does not exist
CONTEXT: while executing command on localhost:xxxxx
SELECT n FROM (SELECT nextval('query_seq') n, random() FROM articles) vals;
ERROR: relation "public.query_seq" does not exist
SELECT n FROM (SELECT nextval('simple_queries_test.query_seq') n, random() FROM articles) vals;
ERROR: relation "simple_queries_test.query_seq" does not exist
CONTEXT: while executing command on localhost:xxxxx
DROP SEQUENCE query_seq;
SET client_min_messages TO WARNING;
DROP SCHEMA simple_queries_test CASCADE;

View File

@ -1,5 +1,7 @@
-- this test file aims to test UPSERT feature on Citus
SET citus.next_shard_id TO 980000;
CREATE SCHEMA upsert_test;
SET search_path TO upsert_test;
CREATE TABLE upsert_test
(
part_key int UNIQUE,
@ -244,3 +246,5 @@ ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET part_key = 15;
ERROR: modifying the partition value of rows is not allowed
SET client_min_messages TO WARNING;
DROP SCHEMA upsert_test CASCADE;

View File

@ -6,6 +6,15 @@
SET citus.next_shard_id TO 530000;
-- Given that other test files depend on the existence of types created in this file,
-- we cannot drop them at the end. Instead, we drop them at the beginning of the test
-- to make this file runnable multiple times via run_test.py.
BEGIN;
SET LOCAL client_min_messages TO WARNING;
DROP TYPE IF EXISTS test_composite_type, other_composite_type, bug_status CASCADE;
DROP OPERATOR FAMILY IF EXISTS cats_op_fam USING hash;
COMMIT;
-- create a custom type...
CREATE TYPE test_composite_type AS (
i integer,

View File

@ -1,6 +1,9 @@
SET citus.next_shard_id TO 1200000;
SET citus.next_placement_id TO 1200000;
CREATE SCHEMA multi_modifying_xacts;
SET search_path TO multi_modifying_xacts;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
@ -169,7 +172,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- but the DDL should correctly roll back
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='multi_modifying_xacts.labs'::regclass;
SELECT * FROM labs WHERE id = 6;
-- COPY can happen after single row INSERT
@ -294,7 +297,7 @@ CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
$rli$ LANGUAGE plpgsql;
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
SELECT * FROM run_command_on_placements('multi_modifying_xacts.researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE multi_modifying_xacts.reject_large_id()')
ORDER BY nodeport, shardid;
-- hide postgresql version dependend messages for next test only
@ -418,6 +421,7 @@ AND s.logicalrelid = 'objects'::regclass;
-- create trigger on one worker to reject certain values
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
@ -437,6 +441,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- test partial failure; worker_1 succeeds, 2 fails
-- in this case, we expect the transaction to abort
@ -465,6 +470,7 @@ DELETE FROM objects;
-- there cannot be errors on different shards at different times
-- because the first failure will fail the whole transaction
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad() RETURNS trigger AS $rb$
BEGIN
@ -483,6 +489,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
@ -506,6 +513,7 @@ AND (s.logicalrelid = 'objects'::regclass OR
-- what if the failures happen at COMMIT time?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
@ -515,6 +523,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- should be the same story as before, just at COMMIT time
-- as we use 2PC, the transaction is rollbacked
@ -547,6 +556,7 @@ AND s.logicalrelid = 'objects'::regclass;
-- what if all nodes have failures at COMMIT time?
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON labs_1200002;
@ -556,6 +566,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- reduce the log level for differences between PG14 and PG15
-- in PGconn->errorMessage
@ -586,10 +597,12 @@ AND (s.logicalrelid = 'objects'::regclass OR
-- what if one shard (objects) succeeds but another (labs) completely fails?
\c - - - :worker_2_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad ON objects_1200003;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200004;
BEGIN;
INSERT INTO objects VALUES (1, 'apple');
@ -682,6 +695,7 @@ SELECT * FROM reference_modifying_xacts;
-- lets fail on of the workers at before the commit time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_reference() RETURNS trigger AS $rb$
BEGIN
@ -700,6 +714,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
INSERT INTO reference_modifying_xacts VALUES (999, 3);
@ -711,6 +726,7 @@ COMMIT;
-- lets fail one of the workers at COMMIT time
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
CREATE CONSTRAINT TRIGGER reject_bad_reference
@ -719,6 +735,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- try without wrapping inside a transaction
@ -740,10 +757,12 @@ ORDER BY s.logicalrelid, sp.shardstate;
-- for the time-being drop the constraint
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_reference ON reference_modifying_xacts_1200006;
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
-- now create a hash distributed table and run tests
-- including both the reference table and the hash
@ -777,6 +796,7 @@ ABORT;
-- lets fail one of the workers before COMMIT time for the hash table
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.enable_metadata_sync TO OFF;
CREATE FUNCTION reject_bad_hash() RETURNS trigger AS $rb$
BEGIN
@ -795,6 +815,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
@ -809,6 +830,7 @@ SELECT * FROM reference_modifying_xacts WHERE key = 55;
-- now lets fail on of the workers for the hash distributed table table
-- when there is a reference table involved
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
DROP TRIGGER reject_bad_hash ON hash_modifying_xacts_1200007;
-- the trigger is on execution time
@ -818,6 +840,7 @@ DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW EXECUTE PROCEDURE reject_bad_hash();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
-- the transaction as a whole should fail
@ -844,6 +867,7 @@ ORDER BY s.logicalrelid, sp.shardstate;
-- change is rollbacked as well
\c - - - :worker_1_port
SET search_path TO multi_modifying_xacts;
CREATE CONSTRAINT TRIGGER reject_bad_reference
AFTER INSERT ON reference_modifying_xacts_1200006
@ -851,6 +875,7 @@ DEFERRABLE INITIALLY IMMEDIATE
FOR EACH ROW EXECUTE PROCEDURE reject_bad_reference();
\c - - - :master_port
SET search_path TO multi_modifying_xacts;
\set VERBOSITY terse
BEGIN;
@ -920,9 +945,11 @@ SELECT count(*) FROM pg_dist_transaction;
-- first create the new user on all nodes
CREATE USER test_user;
GRANT ALL ON SCHEMA multi_modifying_xacts TO test_user;
-- now connect back to the master with the new user
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200015;
CREATE TABLE reference_failure_test (key int, value int);
SELECT create_reference_table('reference_failure_test');
@ -934,16 +961,19 @@ SELECT create_distributed_table('numbers_hash_failure_test', 'key');
-- ensure that the shard is created for this user
\c - test_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
SET citus.override_table_visibility TO false;
\dt reference_failure_test_1200015
-- now connect with the default user,
-- and rename the existing user
\c - :default_user - :worker_1_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
-- connect back to master and query the reference table
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- should fail since the worker doesn't have test_user anymore
INSERT INTO reference_failure_test VALUES (1, '1');
@ -1007,15 +1037,18 @@ SELECT count(*) FROM numbers_hash_failure_test;
-- break the other node as well
\c - :default_user - :worker_2_port
SET search_path TO multi_modifying_xacts;
ALTER USER test_user RENAME TO test_user_new;
\c - test_user - :master_port
SET search_path TO multi_modifying_xacts;
-- fails on all shard placements
INSERT INTO numbers_hash_failure_test VALUES (2,2);
-- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port
SET search_path TO multi_modifying_xacts;
SET citus.next_shard_id TO 1200020;
SET citus.next_placement_id TO 1200033;
-- unbreak both nodes by renaming the user back to the original name
@ -1024,6 +1057,7 @@ SELECT * FROM run_command_on_workers('ALTER USER test_user_new RENAME TO test_us
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second,
reference_failure_test, numbers_hash_failure_test;
REVOKE ALL ON SCHEMA multi_modifying_xacts FROM test_user;
DROP USER test_user;
-- set up foreign keys to test transactions with co-located and reference tables
@ -1043,7 +1077,10 @@ CREATE TABLE itemgroups (
);
SELECT create_reference_table('itemgroups');
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS users ;
RESET client_min_messages;
CREATE TABLE users (
id int PRIMARY KEY,
name text,
@ -1199,5 +1236,5 @@ SELECT insert_abort();
SELECT name FROM labs WHERE id = 1001;
RESET citus.function_opens_transaction_block;
DROP FUNCTION insert_abort();
DROP TABLE items, users, itemgroups, usergroups, researchers, labs;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_modifying_xacts CASCADE;

View File

@ -2,6 +2,11 @@
-- MULTI_MX_COPY_DATA
--
-- We truncate them to make this test runnable multiple times.
-- Note that we cannot do that at the end of the test because
-- we need to keep the data for the other tests.
TRUNCATE lineitem_mx, orders_mx;
\set nation_data_file :abs_srcdir '/data/nation.data'
\set client_side_copy_command '\\copy nation_hash FROM ' :'nation_data_file' ' with delimiter '''|''';'
:client_side_copy_command
@ -96,3 +101,5 @@ SET search_path TO public;
:client_side_copy_command
\set client_side_copy_command '\\copy supplier_mx FROM ' :'supplier_data_file' ' with delimiter '''|''';'
:client_side_copy_command
DROP TABLE citus_mx_test_schema.nation_hash_replicated;

View File

@ -331,3 +331,7 @@ COMMIT;
-- no data should persists
SELECT * FROM objects_mx WHERE id = 1;
SELECT * FROM labs_mx WHERE id = 8;
TRUNCATE objects_mx, labs_mx, researchers_mx;
DROP TRIGGER reject_bad_mx ON labs_mx_1220102;
DROP FUNCTION reject_bad_mx;

View File

@ -657,3 +657,8 @@ INSERT INTO articles_hash_mx VALUES (51, 1, 'amateus', 1814);
SELECT id
FROM articles_hash_mx
WHERE author_id = 1;
SET client_min_messages to WARNING;
TRUNCATE articles_hash_mx, company_employees_mx, articles_single_shard_hash_mx;
DROP MATERIALIZED VIEW mv_articles_hash_mx_error;
DROP TABLE authors_hash_mx;

View File

@ -10,6 +10,9 @@ SET citus.next_shard_id TO 840000;
-- other tests that triggers fast-path-router planner
SET citus.enable_fast_path_router_planner TO false;
CREATE SCHEMA multi_router_planner;
SET search_path TO multi_router_planner;
CREATE TABLE articles_hash (
id bigint NOT NULL,
author_id bigint NOT NULL,
@ -1182,12 +1185,15 @@ SELECT create_distributed_table('failure_test', 'a', 'hash');
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
GRANT USAGE ON SCHEMA multi_router_planner TO router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA multi_router_planner TO router_user;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
GRANT USAGE ON SCHEMA multi_router_planner TO router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA multi_router_planner TO router_user;
\c - router_user - :master_port
SET search_path TO multi_router_planner;
-- we will fail to connect to worker 2, since the user does not exist
-- still, we never mark placements inactive. Instead, fail the transaction
BEGIN;
@ -1199,29 +1205,13 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'failure_test'::regclass
)
ORDER BY placementid;
ORDER BY shardid, nodeport;
\c - postgres - :worker_1_port
DROP OWNED BY router_user;
DROP USER router_user;
\c - - - :master_port
DROP OWNED BY router_user;
DROP USER router_user;
DROP TABLE failure_test;
DROP FUNCTION author_articles_max_id();
DROP FUNCTION author_articles_id_word_count();
DROP MATERIALIZED VIEW mv_articles_hash_empty;
DROP MATERIALIZED VIEW mv_articles_hash_data;
DROP VIEW num_db;
DROP FUNCTION number1();
DROP TABLE articles_hash;
DROP TABLE articles_single_shard_hash;
DROP TABLE authors_hash;
DROP TABLE authors_range;
DROP TABLE authors_reference;
DROP TABLE company_employees;
DROP TABLE articles_range;
DROP TABLE articles_append;
SET client_min_messages TO WARNING;
DROP SCHEMA multi_router_planner CASCADE;

View File

@ -11,6 +11,9 @@ SET citus.coordinator_aggregation_strategy TO 'disabled';
-- test end-to-end query functionality
-- ===================================================================
CREATE SCHEMA simple_queries_test;
SET search_path TO simple_queries_test;
CREATE TABLE articles (
id bigint NOT NULL,
author_id bigint NOT NULL,
@ -203,12 +206,12 @@ SELECT author_id FROM articles
HAVING author_id <= 2 OR author_id = 8
ORDER BY author_id;
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM orders
SELECT o_orderstatus, count(*), avg(o_totalprice) FROM public.orders
GROUP BY o_orderstatus
HAVING count(*) > 1450 OR avg(o_totalprice) > 150000
ORDER BY o_orderstatus;
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM lineitem, orders
SELECT o_orderstatus, sum(l_linenumber), avg(l_linenumber) FROM public.lineitem, public.orders
WHERE l_orderkey = o_orderkey AND l_orderkey > 9030
GROUP BY o_orderstatus
HAVING sum(l_linenumber) > 1000
@ -277,7 +280,7 @@ SELECT avg(word_count)
-- error out on unsupported aggregate
SET client_min_messages to 'NOTICE';
CREATE AGGREGATE public.invalid(int) (
CREATE AGGREGATE invalid(int) (
sfunc = int4pl,
stype = int
);
@ -355,7 +358,8 @@ SELECT nextval('query_seq')*2 FROM articles LIMIT 3;
SELECT * FROM (SELECT nextval('query_seq') FROM articles LIMIT 3) vals;
-- but not elsewhere
SELECT sum(nextval('query_seq')) FROM articles;
SELECT n FROM (SELECT nextval('query_seq') n, random() FROM articles) vals;
SELECT sum(nextval('simple_queries_test.query_seq')) FROM articles;
SELECT n FROM (SELECT nextval('simple_queries_test.query_seq') n, random() FROM articles) vals;
DROP SEQUENCE query_seq;
SET client_min_messages TO WARNING;
DROP SCHEMA simple_queries_test CASCADE;

View File

@ -3,6 +3,8 @@
SET citus.next_shard_id TO 980000;
CREATE SCHEMA upsert_test;
SET search_path TO upsert_test;
CREATE TABLE upsert_test
(
@ -207,3 +209,6 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_ke
-- error out on attempt to update the partition key
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO
UPDATE SET part_key = 15;
SET client_min_messages TO WARNING;
DROP SCHEMA upsert_test CASCADE;