diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 9d26a8702..3d76f25c0 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -335,12 +335,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return true; } -<<<<<<< HEAD - Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId); -======= Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId); ->>>>>>> add the support to execute copy locally newRte->relid = shardOid; diff --git a/src/test/regress/data/orders.2.data b/src/test/regress/data/orders.2.data index 264c368df..43dbde8de 100644 --- a/src/test/regress/data/orders.2.data +++ b/src/test/regress/data/orders.2.data @@ -1484,4 +1484,4 @@ 14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against 14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after 14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro -14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro +14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro \ No newline at end of file diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out new file mode 100644 index 000000000..b7521f9da --- /dev/null +++ b/src/test/regress/expected/local_shard_copy.out @@ -0,0 +1,418 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; +SET client_min_messages TO DEBUG; +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +DEBUG: schema "public" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: extension "plpgsql" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx + master_add_node +--------------------------------------------------------------------- + 32 +(1 row) + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +CREATE TABLE reference_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" +DEBUG: building index "reference_table_pkey" on table "reference_table" serially +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" +DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially +SELECT create_distributed_table('distributed_table','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE local_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" +DEBUG: building index "local_table_pkey" on table "local_table" serially +INSERT INTO local_table SELECT * from generate_series(1, 10); +-- connection worker and get ready for the tests +\c - - - :worker_1_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; +-- pick some example values that reside on the shards locally and remote +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(6); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(500); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(701); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +SELECT shard_of_distribution_column_is_local(12); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify the put ages. + SELECT * FROM distributed_table; +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + key | age +--------------------------------------------------------------------- + 20 | 20 + 24 | 24 + 25 | 25 + 26 | 26 + 31 | 31 + 33 | 33 + 35 | 35 + 1 | 100 + 5 | 500 + 21 | 21 + 28 | 28 + 34 | 34 + 38 | 38 + 39 | 39 + 36 | 36 + 37 | 37 + 40 | 40 + 3 | 300 + 4 | 400 + 22 | 22 + 23 | 23 + 27 | 27 + 29 | 29 + 30 | 30 + 32 | 32 + 2 | 200 +(26 rows) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- + 100 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" +ROLLBACK; +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +COPY distributed_table FROM STDIN WITH delimiter ','; +ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,9" +ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +-- different delimiters +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '|'; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1|10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '['; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1[10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "10,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "100,15" +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 2: "200,20" +ROLLBACK; +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,16" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + key +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +\c - - - :master_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +TRUNCATE TABLE reference_table; +TRUNCATE TABLE local_table; +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.enable_local_execution = 'on'; +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +NOTICE: executing the copy locally for shard +CONTEXT: COPY reference_table, line 1: "1" +ROLLBACK; +SET citus.enable_local_execution = 'off'; +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +ROLLBACK; +SET citus.enable_local_execution = 'on'; +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e926dd31e..8d3b35ca6 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -618,25 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" -ROLLBACK; --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index d245c324b..c235ed8ef 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -35,6 +35,7 @@ SELECT create_reference_table('ref_table'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage @@ -270,6 +271,7 @@ key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); +NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 @@ -328,7 +330,9 @@ DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 @@ -767,6 +771,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 714e8353e..e23342e00 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -499,11 +499,13 @@ SELECT create_reference_table('replicate_reference_table_copy'); (1 row) +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; +RESET citus.enable_local_execution; DROP TABLE replicate_reference_table_copy; -- test executing DDL command then adding a new node in a transaction CREATE TABLE replicate_reference_table_ddl(column1 int); diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index d3083367f..6c9a6c01b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -39,7 +39,7 @@ test: multi_mx_metadata test: master_evaluation master_evaluation_modify master_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution +test: multi_mx_modifications local_shard_execution local_shard_copy test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql new file mode 100644 index 000000000..e9b548a94 --- /dev/null +++ b/src/test/regress/sql/local_shard_copy.sql @@ -0,0 +1,298 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; + +SET client_min_messages TO DEBUG; + +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + + +CREATE TABLE reference_table (key int PRIMARY KEY); +SELECT create_reference_table('reference_table'); + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +SELECT create_distributed_table('distributed_table','key'); + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); + +CREATE TABLE local_table (key int PRIMARY KEY); +INSERT INTO local_table SELECT * from generate_series(1, 10); + + +-- connection worker and get ready for the tests +\c - - - :worker_1_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; + +-- pick some example values that reside on the shards locally and remote + +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); +SELECT shard_of_distribution_column_is_local(6); +SELECT shard_of_distribution_column_is_local(500); +SELECT shard_of_distribution_column_is_local(701); + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); +SELECT shard_of_distribution_column_is_local(12); + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify the put ages. + SELECT * FROM distributed_table; + +ROLLBACK; + + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +TRUNCATE distributed_table; + +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 9 +\. + +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +1,9 +\. +ROLLBACK; + +TRUNCATE distributed_table; + + +-- different delimiters +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '|'; +1|10 +2|30 +3|40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '['; +1[10 +2[30 +3[40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + + +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +2,20 +3,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +10,15 +20,20 +30,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +100,15 +200,20 +300,30 +\. +ROLLBACK; + +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +1,16 +\. +ROLLBACK; + + +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +ROLLBACK; + +\c - - - :master_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +TRUNCATE TABLE reference_table; +TRUNCATE TABLE local_table; + +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + +SET citus.enable_local_execution = 'on'; + +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'off'; + +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'on'; + +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b5ba49caa..e3688bd9a 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -356,15 +356,7 @@ ROLLBACK; BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ROLLBACK; - --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index ec476e706..8edfa40b7 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -323,6 +323,7 @@ DROP TABLE replicate_reference_table_insert; CREATE TABLE replicate_reference_table_copy(column1 int); SELECT create_reference_table('replicate_reference_table_copy'); +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; 1 @@ -334,6 +335,8 @@ COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ROLLBACK; +RESET citus.enable_local_execution; + DROP TABLE replicate_reference_table_copy;