CREATE SCHEMA local_shard_copy; SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); DEBUG: schema "public" already exists, skipping DETAIL: from localhost:xxxxx DEBUG: extension "plpgsql" already exists, skipping DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema" already exists, skipping DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping DETAIL: from localhost:xxxxx DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping DETAIL: from localhost:xxxxx master_add_node --------------------------------------------------------------------- 32 (1 row) SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; CREATE TABLE reference_table (key int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" DEBUG: building index "reference_table_pkey" on table "reference_table" serially SELECT create_reference_table('reference_table'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially SELECT create_distributed_table('distributed_table','key'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO reference_table SELECT * FROM generate_series(1, 10); DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator CREATE TABLE local_table (key int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" DEBUG: building index "local_table_pkey" on table "local_table" serially INSERT INTO local_table SELECT * from generate_series(1, 10); -- partitioned table CREATE TABLE collections_list ( key bigserial, collection_id integer ) PARTITION BY LIST (collection_id ); DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key" SELECT create_distributed_table('collections_list', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE collections_list_0 PARTITION OF collections_list (key, collection_id) FOR VALUES IN ( 0 ); CREATE TABLE collections_list_1 PARTITION OF collections_list (key, collection_id) FOR VALUES IN ( 1 ); -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_copy; SET citus.log_local_commands TO ON; -- returns true of the distribution key filter -- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard -- placement which is local to this not CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ DECLARE shard_is_local BOOLEAN := FALSE; BEGIN WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) SELECT true INTO shard_is_local FROM local_shard_ids WHERE get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); IF shard_is_local IS NULL THEN shard_is_local = FALSE; END IF; RETURN shard_is_local; END; $$ LANGUAGE plpgsql; -- pick some example values that reside on the shards locally and remote -- distribution key values of 1,6, 500 and 701 are LOCAL to shards, -- we'll use these values in the tests SELECT shard_of_distribution_column_is_local(1); shard_of_distribution_column_is_local --------------------------------------------------------------------- t (1 row) SELECT shard_of_distribution_column_is_local(6); shard_of_distribution_column_is_local --------------------------------------------------------------------- t (1 row) SELECT shard_of_distribution_column_is_local(500); shard_of_distribution_column_is_local --------------------------------------------------------------------- t (1 row) SELECT shard_of_distribution_column_is_local(701); shard_of_distribution_column_is_local --------------------------------------------------------------------- t (1 row) -- distribution key values of 11 and 12 are REMOTE to shards SELECT shard_of_distribution_column_is_local(11); shard_of_distribution_column_is_local --------------------------------------------------------------------- f (1 row) SELECT shard_of_distribution_column_is_local(12); shard_of_distribution_column_is_local --------------------------------------------------------------------- f (1 row) BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 (1 row) -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 (1 row) ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 (1 row) -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify the put ages. SELECT * FROM distributed_table; NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true key | age --------------------------------------------------------------------- 20 | 20 24 | 24 25 | 25 26 | 26 31 | 31 33 | 33 35 | 35 1 | 100 5 | 500 21 | 21 28 | 28 34 | 34 38 | 38 39 | 39 36 | 36 37 | 37 40 | 40 3 | 300 4 | 400 22 | 22 23 | 23 27 | 27 29 | 29 30 | 30 32 | 32 2 | 200 (26 rows) ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 (1 row) -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 (1 row) ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM collections_list; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 0 (1 row) -- the local placements should be executed locally COPY collections_list FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY collections_list, line 1: "1, 0" -- verify that the copy is successful. SELECT count(*) FROM collections_list; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 5 (1 row) ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 (1 row) -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 (1 row) ROLLBACK; BEGIN; -- Since we are in a transaction, the copy should be locally executed. COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" ROLLBACK; -- Since we are not in a transaction, the copy should not be locally executed. COPY distributed_table FROM STDIN WITH delimiter ','; BEGIN; -- Since we are in a transaction, the copy should be locally executed. But -- we are putting duplicate key, so it should error. COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; BEGIN; -- insert a lot of data ( around 8MB), -- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx ROLLBACK; COPY distributed_table FROM STDIN WITH delimiter ','; ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). BEGIN; -- Since we are in a transaction, the execution will be local, however we are putting invalid age. -- The constaints should give an error COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,9" ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) -- initial size SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '|'; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1|10" -- new size SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 (1 row) ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) -- initial size SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '['; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1[10" -- new size SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 (1 row) ROLLBACK; -- multiple local copies BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "10,15" COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "100,15" NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 2: "200,20" ROLLBACK; -- local copy followed by local copy should see the changes -- and error since it is a duplicate primary key. BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,16" ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; -- local copy followed by local copy should see the changes BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" -- select should see the change SELECT key FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) key --------------------------------------------------------------------- 1 (1 row) ROLLBACK; \c - - - :master_port SET search_path TO local_shard_copy; SET citus.log_local_commands TO ON; TRUNCATE TABLE reference_table; NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE TRUNCATE TABLE local_table; SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; count --------------------------------------------------------------------- 0 (1 row) SET citus.enable_local_execution = 'on'; BEGIN; -- copy should be executed locally COPY reference_table FROM STDIN; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY reference_table, line 1: "1" ROLLBACK; SET citus.enable_local_execution = 'off'; BEGIN; -- copy should not be executed locally as citus.enable_local_execution = off COPY reference_table FROM STDIN; ROLLBACK; SET citus.enable_local_execution = 'on'; CREATE TABLE ref_table(a int); INSERT INTO ref_table VALUES(1); BEGIN; -- trigger local execution SELECT COUNT(*) FROM reference_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table count --------------------------------------------------------------------- 0 (1 row) -- shard creation should be done locally SELECT create_reference_table('ref_table'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$local_shard_copy.ref_table$$) create_reference_table --------------------------------------------------------------------- (1 row) INSERT INTO ref_table VALUES(2); NOTICE: executing the command locally: INSERT INTO local_shard_copy.ref_table_1330000 (a) VALUES (2) -- verify that it worked. SELECT COUNT(*) FROM ref_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.ref_table_1330000 ref_table count --------------------------------------------------------------------- 2 (1 row) ROLLBACK; SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE;