mirror of https://github.com/citusdata/citus.git
add tests for local copy execution
parent
f9c4431885
commit
39bbec0f30
|
@ -335,12 +335,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId);
|
|
||||||
=======
|
|
||||||
Oid shardOid = GetShardLocalTableOid(relationShard->relationId,
|
Oid shardOid = GetShardLocalTableOid(relationShard->relationId,
|
||||||
relationShard->shardId);
|
relationShard->shardId);
|
||||||
>>>>>>> add the support to execute copy locally
|
|
||||||
|
|
||||||
newRte->relid = shardOid;
|
newRte->relid = shardOid;
|
||||||
|
|
||||||
|
|
|
@ -1484,4 +1484,4 @@
|
||||||
14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against
|
14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against
|
||||||
14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after
|
14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after
|
||||||
14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
||||||
14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
|
@ -0,0 +1,418 @@
|
||||||
|
CREATE SCHEMA local_shard_copy;
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
SET client_min_messages TO DEBUG;
|
||||||
|
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||||
|
DEBUG: schema "public" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
DEBUG: extension "plpgsql" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
DEBUG: schema "citus_mx_test_schema" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping
|
||||||
|
DETAIL: NOTICE from localhost:xxxxx
|
||||||
|
NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx
|
||||||
|
NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx
|
||||||
|
NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx
|
||||||
|
NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx
|
||||||
|
master_add_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
32
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
CREATE TABLE reference_table (key int PRIMARY KEY);
|
||||||
|
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
|
||||||
|
DEBUG: building index "reference_table_pkey" on table "reference_table" serially
|
||||||
|
SELECT create_reference_table('reference_table');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10));
|
||||||
|
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
|
||||||
|
DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially
|
||||||
|
SELECT create_distributed_table('distributed_table','key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40);
|
||||||
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
INSERT INTO reference_table SELECT * FROM generate_series(1, 10);
|
||||||
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
CREATE TABLE local_table (key int PRIMARY KEY);
|
||||||
|
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table"
|
||||||
|
DEBUG: building index "local_table_pkey" on table "local_table" serially
|
||||||
|
INSERT INTO local_table SELECT * from generate_series(1, 10);
|
||||||
|
-- connection worker and get ready for the tests
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
-- returns true of the distribution key filter
|
||||||
|
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
|
||||||
|
-- placement which is local to this not
|
||||||
|
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
|
||||||
|
|
||||||
|
DECLARE shard_is_local BOOLEAN := FALSE;
|
||||||
|
|
||||||
|
BEGIN
|
||||||
|
|
||||||
|
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)),
|
||||||
|
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
|
||||||
|
SELECT
|
||||||
|
true INTO shard_is_local
|
||||||
|
FROM
|
||||||
|
local_shard_ids
|
||||||
|
WHERE
|
||||||
|
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
|
||||||
|
|
||||||
|
IF shard_is_local IS NULL THEN
|
||||||
|
shard_is_local = FALSE;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN shard_is_local;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
-- pick some example values that reside on the shards locally and remote
|
||||||
|
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||||
|
-- we'll use these values in the tests
|
||||||
|
SELECT shard_of_distribution_column_is_local(1);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shard_of_distribution_column_is_local(6);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shard_of_distribution_column_is_local(500);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shard_of_distribution_column_is_local(701);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- distribution key values of 11 and 12 are REMOTE to shards
|
||||||
|
SELECT shard_of_distribution_column_is_local(11);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shard_of_distribution_column_is_local(12);
|
||||||
|
shard_of_distribution_column_is_local
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
26
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
-- verify the put ages.
|
||||||
|
SELECT * FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
key | age
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
20 | 20
|
||||||
|
24 | 24
|
||||||
|
25 | 25
|
||||||
|
26 | 26
|
||||||
|
31 | 31
|
||||||
|
33 | 33
|
||||||
|
35 | 35
|
||||||
|
1 | 100
|
||||||
|
5 | 500
|
||||||
|
21 | 21
|
||||||
|
28 | 28
|
||||||
|
34 | 34
|
||||||
|
38 | 38
|
||||||
|
39 | 39
|
||||||
|
36 | 36
|
||||||
|
37 | 37
|
||||||
|
40 | 40
|
||||||
|
3 | 300
|
||||||
|
4 | 400
|
||||||
|
22 | 22
|
||||||
|
23 | 23
|
||||||
|
27 | 27
|
||||||
|
29 | 29
|
||||||
|
30 | 30
|
||||||
|
32 | 32
|
||||||
|
2 | 200
|
||||||
|
(26 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
26
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT age FROM distributed_table WHERE key = 1;
|
||||||
|
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
age
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
26
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
26
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the copy should be locally executed.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
ROLLBACK;
|
||||||
|
-- Since we are not in a transaction, the copy should not be locally executed.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the copy should be locally executed. But
|
||||||
|
-- we are putting duplicate key, so it should error.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||||
|
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001"
|
||||||
|
DETAIL: Key (key)=(1) already exists.
|
||||||
|
CONTEXT: COPY distributed_table_1330001, line 1
|
||||||
|
ROLLBACK;
|
||||||
|
TRUNCATE distributed_table;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check"
|
||||||
|
DETAIL: Failing row contains (1, 9).
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the execution will be local, however we are putting invalid age.
|
||||||
|
-- The constaints should give an error
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1,9"
|
||||||
|
ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check"
|
||||||
|
DETAIL: Failing row contains (1, 9).
|
||||||
|
CONTEXT: COPY distributed_table_1330001, line 1
|
||||||
|
ROLLBACK;
|
||||||
|
TRUNCATE distributed_table;
|
||||||
|
-- different delimiters
|
||||||
|
BEGIN;
|
||||||
|
-- initial size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter '|';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1|10"
|
||||||
|
-- new size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- initial size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter '[';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1[10"
|
||||||
|
-- new size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- multiple local copies
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "10,15"
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "100,15"
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 2: "200,20"
|
||||||
|
ROLLBACK;
|
||||||
|
-- local copy followed by local copy should see the changes
|
||||||
|
-- and error since it is a duplicate primary key.
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1,16"
|
||||||
|
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001"
|
||||||
|
DETAIL: Key (key)=(1) already exists.
|
||||||
|
CONTEXT: COPY distributed_table_1330001, line 1
|
||||||
|
ROLLBACK;
|
||||||
|
-- local copy followed by local copy should see the changes
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||||
|
-- select should see the change
|
||||||
|
SELECT key FROM distributed_table WHERE key = 1;
|
||||||
|
NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
TRUNCATE TABLE reference_table;
|
||||||
|
TRUNCATE TABLE local_table;
|
||||||
|
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'on';
|
||||||
|
BEGIN;
|
||||||
|
-- copy should be executed locally
|
||||||
|
COPY reference_table FROM STDIN;
|
||||||
|
NOTICE: executing the copy locally for shard
|
||||||
|
CONTEXT: COPY reference_table, line 1: "1"
|
||||||
|
ROLLBACK;
|
||||||
|
SET citus.enable_local_execution = 'off';
|
||||||
|
BEGIN;
|
||||||
|
-- copy should not be executed locally as citus.enable_local_execution = off
|
||||||
|
COPY reference_table FROM STDIN;
|
||||||
|
ROLLBACK;
|
||||||
|
SET citus.enable_local_execution = 'on';
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SET search_path TO public;
|
||||||
|
DROP SCHEMA local_shard_copy CASCADE;
|
|
@ -618,25 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- even no need to supply any data
|
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i;
|
||||||
COPY distributed_table FROM STDIN WITH CSV;
|
|
||||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
|
||||||
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
|
|
||||||
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
|
|
||||||
ROLLBACK;
|
|
||||||
-- a local query is followed by a command that cannot be executed locally
|
|
||||||
BEGIN;
|
|
||||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
|
||||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
|
||||||
count
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
0
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
|
|
||||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
|
||||||
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
|
|
||||||
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- a local query is followed by a command that cannot be executed locally
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -35,6 +35,7 @@ SELECT create_reference_table('ref_table');
|
||||||
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||||
INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||||
|
NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text)
|
||||||
-- prevent PG 11 - PG 12 outputs to diverge
|
-- prevent PG 11 - PG 12 outputs to diverge
|
||||||
-- and have a lot more CTEs recursively planned for the
|
-- and have a lot more CTEs recursively planned for the
|
||||||
-- sake of increasing the test coverage
|
-- sake of increasing the test coverage
|
||||||
|
@ -270,6 +271,7 @@ key
|
||||||
FROM a JOIN ref_table USING (key)
|
FROM a JOIN ref_table USING (key)
|
||||||
GROUP BY key
|
GROUP BY key
|
||||||
HAVING (max(ref_table.value) <= (SELECT value FROM a));
|
HAVING (max(ref_table.value) <= (SELECT value FROM a));
|
||||||
|
NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1))
|
||||||
count | key
|
count | key
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1 | 6
|
1 | 6
|
||||||
|
@ -328,7 +330,9 @@ DEBUG: Subplan XXX_2 will be written to local file
|
||||||
NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1
|
NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2
|
NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2
|
||||||
|
NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key))
|
||||||
key | value
|
key | value
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
4 | 4
|
4 | 4
|
||||||
|
@ -767,6 +771,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
DEBUG: Subplan XXX_2 will be written to local file
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
key | value
|
key | value
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -499,11 +499,13 @@ SELECT create_reference_table('replicate_reference_table_copy');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'off';
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY replicate_reference_table_copy FROM STDIN;
|
COPY replicate_reference_table_copy FROM STDIN;
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
RESET citus.enable_local_execution;
|
||||||
DROP TABLE replicate_reference_table_copy;
|
DROP TABLE replicate_reference_table_copy;
|
||||||
-- test executing DDL command then adding a new node in a transaction
|
-- test executing DDL command then adding a new node in a transaction
|
||||||
CREATE TABLE replicate_reference_table_ddl(column1 int);
|
CREATE TABLE replicate_reference_table_ddl(column1 int);
|
||||||
|
|
|
@ -39,7 +39,7 @@ test: multi_mx_metadata
|
||||||
test: master_evaluation master_evaluation_modify master_evaluation_select
|
test: master_evaluation master_evaluation_modify master_evaluation_select
|
||||||
test: multi_mx_call
|
test: multi_mx_call
|
||||||
test: multi_mx_function_call_delegation
|
test: multi_mx_function_call_delegation
|
||||||
test: multi_mx_modifications local_shard_execution
|
test: multi_mx_modifications local_shard_execution local_shard_copy
|
||||||
test: multi_mx_transaction_recovery
|
test: multi_mx_transaction_recovery
|
||||||
test: multi_mx_modifying_xacts
|
test: multi_mx_modifying_xacts
|
||||||
test: multi_mx_explain
|
test: multi_mx_explain
|
||||||
|
|
|
@ -0,0 +1,298 @@
|
||||||
|
CREATE SCHEMA local_shard_copy;
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG;
|
||||||
|
|
||||||
|
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE reference_table (key int PRIMARY KEY);
|
||||||
|
SELECT create_reference_table('reference_table');
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10));
|
||||||
|
SELECT create_distributed_table('distributed_table','key');
|
||||||
|
|
||||||
|
INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40);
|
||||||
|
INSERT INTO reference_table SELECT * FROM generate_series(1, 10);
|
||||||
|
|
||||||
|
CREATE TABLE local_table (key int PRIMARY KEY);
|
||||||
|
INSERT INTO local_table SELECT * from generate_series(1, 10);
|
||||||
|
|
||||||
|
|
||||||
|
-- connection worker and get ready for the tests
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- returns true of the distribution key filter
|
||||||
|
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
|
||||||
|
-- placement which is local to this not
|
||||||
|
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
|
||||||
|
|
||||||
|
DECLARE shard_is_local BOOLEAN := FALSE;
|
||||||
|
|
||||||
|
BEGIN
|
||||||
|
|
||||||
|
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)),
|
||||||
|
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
|
||||||
|
SELECT
|
||||||
|
true INTO shard_is_local
|
||||||
|
FROM
|
||||||
|
local_shard_ids
|
||||||
|
WHERE
|
||||||
|
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
|
||||||
|
|
||||||
|
IF shard_is_local IS NULL THEN
|
||||||
|
shard_is_local = FALSE;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN shard_is_local;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- pick some example values that reside on the shards locally and remote
|
||||||
|
|
||||||
|
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||||
|
-- we'll use these values in the tests
|
||||||
|
SELECT shard_of_distribution_column_is_local(1);
|
||||||
|
SELECT shard_of_distribution_column_is_local(6);
|
||||||
|
SELECT shard_of_distribution_column_is_local(500);
|
||||||
|
SELECT shard_of_distribution_column_is_local(701);
|
||||||
|
|
||||||
|
-- distribution key values of 11 and 12 are REMOTE to shards
|
||||||
|
SELECT shard_of_distribution_column_is_local(11);
|
||||||
|
SELECT shard_of_distribution_column_is_local(12);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
-- verify the put ages.
|
||||||
|
SELECT * FROM distributed_table;
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- the local placements should be executed locally
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
-- run select with local execution
|
||||||
|
SELECT age FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
-- verify that the copy is successful.
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the copy should be locally executed.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Since we are not in a transaction, the copy should not be locally executed.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the copy should be locally executed. But
|
||||||
|
-- we are putting duplicate key, so it should error.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 100
|
||||||
|
2, 200
|
||||||
|
3, 300
|
||||||
|
4, 400
|
||||||
|
5, 500
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
TRUNCATE distributed_table;
|
||||||
|
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1, 9
|
||||||
|
\.
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- Since we are in a transaction, the execution will be local, however we are putting invalid age.
|
||||||
|
-- The constaints should give an error
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1,9
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
TRUNCATE distributed_table;
|
||||||
|
|
||||||
|
|
||||||
|
-- different delimiters
|
||||||
|
BEGIN;
|
||||||
|
-- initial size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter '|';
|
||||||
|
1|10
|
||||||
|
2|30
|
||||||
|
3|40
|
||||||
|
\.
|
||||||
|
-- new size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- initial size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter '[';
|
||||||
|
1[10
|
||||||
|
2[30
|
||||||
|
3[40
|
||||||
|
\.
|
||||||
|
-- new size
|
||||||
|
SELECT count(*) FROM distributed_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
-- multiple local copies
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1,15
|
||||||
|
2,20
|
||||||
|
3,30
|
||||||
|
\.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
10,15
|
||||||
|
20,20
|
||||||
|
30,30
|
||||||
|
\.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
100,15
|
||||||
|
200,20
|
||||||
|
300,30
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- local copy followed by local copy should see the changes
|
||||||
|
-- and error since it is a duplicate primary key.
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1,15
|
||||||
|
\.
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1,16
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
-- local copy followed by local copy should see the changes
|
||||||
|
BEGIN;
|
||||||
|
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||||
|
1,15
|
||||||
|
\.
|
||||||
|
-- select should see the change
|
||||||
|
SELECT key FROM distributed_table WHERE key = 1;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
SET search_path TO local_shard_copy;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
TRUNCATE TABLE reference_table;
|
||||||
|
TRUNCATE TABLE local_table;
|
||||||
|
|
||||||
|
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'on';
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- copy should be executed locally
|
||||||
|
COPY reference_table FROM STDIN;
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'off';
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- copy should not be executed locally as citus.enable_local_execution = off
|
||||||
|
COPY reference_table FROM STDIN;
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
\.
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'on';
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SET search_path TO public;
|
||||||
|
DROP SCHEMA local_shard_copy CASCADE;
|
|
@ -356,15 +356,7 @@ ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||||
|
|
||||||
-- even no need to supply any data
|
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i;
|
||||||
COPY distributed_table FROM STDIN WITH CSV;
|
|
||||||
ROLLBACK;
|
|
||||||
|
|
||||||
-- a local query is followed by a command that cannot be executed locally
|
|
||||||
BEGIN;
|
|
||||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
|
||||||
|
|
||||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- a local query is followed by a command that cannot be executed locally
|
-- a local query is followed by a command that cannot be executed locally
|
||||||
|
|
|
@ -323,6 +323,7 @@ DROP TABLE replicate_reference_table_insert;
|
||||||
CREATE TABLE replicate_reference_table_copy(column1 int);
|
CREATE TABLE replicate_reference_table_copy(column1 int);
|
||||||
SELECT create_reference_table('replicate_reference_table_copy');
|
SELECT create_reference_table('replicate_reference_table_copy');
|
||||||
|
|
||||||
|
SET citus.enable_local_execution = 'off';
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY replicate_reference_table_copy FROM STDIN;
|
COPY replicate_reference_table_copy FROM STDIN;
|
||||||
1
|
1
|
||||||
|
@ -334,6 +335,8 @@ COPY replicate_reference_table_copy FROM STDIN;
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
RESET citus.enable_local_execution;
|
||||||
|
|
||||||
DROP TABLE replicate_reference_table_copy;
|
DROP TABLE replicate_reference_table_copy;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue