mirror of https://github.com/citusdata/citus.git
1174 lines
87 KiB
Plaintext
1174 lines
87 KiB
Plaintext
-- Test queries on a distributed table with shards on the coordinator
|
|
CREATE SCHEMA coordinator_shouldhaveshards;
|
|
SET search_path TO coordinator_shouldhaveshards;
|
|
SET citus.next_shard_id TO 1503000;
|
|
SET citus.next_placement_id TO 1503000;
|
|
-- idempotently add node to allow this test to run without add_coordinator
|
|
SET client_min_messages TO WARNING;
|
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
RESET client_min_messages;
|
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SET citus.shard_replication_factor TO 1;
|
|
CREATE TABLE test (x int, y int);
|
|
SELECT create_distributed_table('test','x', colocate_with := 'none');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
|
|
WHERE logicalrelid = 'test'::regclass AND groupid = 0;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
--- enable logging to see which tasks are executed locally
|
|
SET client_min_messages TO LOG;
|
|
SET citus.log_local_commands TO ON;
|
|
-- INSERT..SELECT with COPY under the covers
|
|
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
-- router queries execute locally
|
|
INSERT INTO test VALUES (1, 1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- multi-shard queries connect to localhost
|
|
SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
WITH a AS (SELECT * FROM test) SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
-- multi-shard queries in transaction blocks execute locally
|
|
BEGIN;
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
END;
|
|
BEGIN;
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
END;
|
|
-- INSERT..SELECT with re-partitioning after local execution
|
|
BEGIN;
|
|
INSERT INTO test VALUES (0,1000);
|
|
CREATE TABLE repart_test (x int primary key, y int);
|
|
SELECT create_distributed_table('repart_test','x', colocate_with := 'none');
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)')
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO repart_test (x, y) SELECT y, x FROM test;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer)
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer)
|
|
SELECT y FROM repart_test WHERE x = 1000;
|
|
y
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
INSERT INTO repart_test (x, y) SELECT y, x FROM test ON CONFLICT (x) DO UPDATE SET y = -1;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer
|
|
SELECT y FROM repart_test WHERE x = 1000;
|
|
y
|
|
---------------------------------------------------------------------
|
|
-1
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT..SELECT with re-partitioning in EXPLAIN ANALYZE after local execution
|
|
BEGIN;
|
|
INSERT INTO test VALUES (0,1000);
|
|
EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) INSERT INTO test (x, y) SELECT y, x FROM test;
|
|
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
|
|
ROLLBACK;
|
|
-- DDL connects to locahost
|
|
ALTER TABLE test ADD COLUMN z int;
|
|
-- DDL after local execution
|
|
BEGIN;
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
ALTER TABLE test DROP COLUMN z;
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
|
|
ROLLBACK;
|
|
BEGIN;
|
|
ALTER TABLE test DROP COLUMN z;
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
END;
|
|
SET citus.shard_count TO 6;
|
|
SET citus.log_remote_commands TO OFF;
|
|
BEGIN;
|
|
SET citus.log_local_commands TO ON;
|
|
CREATE TABLE dist_table (a int);
|
|
INSERT INTO dist_table SELECT * FROM generate_series(1, 100);
|
|
-- trigger local execution
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- this should be run locally
|
|
SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none');
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres')
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: Copying data from local table...
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: copying the data has completed
|
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$)
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM dist_table;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503008 dist_table WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503011 dist_table WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
CREATE TABLE dist_table (a int);
|
|
INSERT INTO dist_table SELECT * FROM generate_series(1, 100);
|
|
BEGIN;
|
|
SET citus.log_local_commands TO ON;
|
|
-- trigger local execution
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- this should be run locally
|
|
SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none');
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres')
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: Copying data from local table...
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: copying the data has completed
|
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$)
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM dist_table;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503014 dist_table WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- repartition queries should work fine
|
|
SET citus.enable_repartition_joins TO ON;
|
|
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
SET citus.enable_unique_job_ids TO off;
|
|
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_25_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_25_1','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503000 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_25_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_25_4','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503003 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_26_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_26_1','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503000 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_26_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_26_4','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503003 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_1_0']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_2_0']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_3_0']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_4_0']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_1_0']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_2_0']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_3_0']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_4_0']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_1_3']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_2_3']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_3_3']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_25_4_3']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_1_3']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_2_3']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_3_3']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_26_4_3']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_25_1_0,repartition_25_2_0,repartition_25_3_0,repartition_25_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_26_1_0,repartition_26_2_0,repartition_26_3_0,repartition_26_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_25_1_3,repartition_25_2_3,repartition_25_3_3,repartition_25_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_26_1_3,repartition_26_2_3,repartition_26_3_3,repartition_26_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
END;
|
|
BEGIN;
|
|
SET citus.enable_repartition_joins TO ON;
|
|
-- trigger local execution
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_29_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_29_1','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503000 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_29_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_29_4','SELECT x AS column1 FROM coordinator_shouldhaveshards.test_1503003 t1 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_30_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_30_1','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503000 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartition_30_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_30_4','SELECT y AS column1 FROM coordinator_shouldhaveshards.test_1503003 t2 WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true,true,true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_1_2']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_2_2']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_3_2']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_4_2']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_1_2']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_2_2']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_3_2']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_4_2']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_1_5']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_2_5']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_3_5']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_29_4_5']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_1_5']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_2_5']::text[],'localhost',57637) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_3_5']::text[],'localhost',57638) bytes
|
|
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_30_4_5']::text[],'localhost',57636) bytes
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_29_1_2,repartition_29_2_2,repartition_29_3_2,repartition_29_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_30_1_2,repartition_30_2_2,repartition_30_3_2,repartition_30_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_29_1_5,repartition_29_2_5,repartition_29_3_5,repartition_29_4_5}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 integer) JOIN read_intermediate_results('{repartition_30_1_5,repartition_30_2_5,repartition_30_3_5,repartition_30_4_5}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 integer) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
CREATE TABLE ref (a int, b int);
|
|
SELECT create_reference_table('ref');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE local (x int, y int);
|
|
BEGIN;
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
SELECT * FROM ref JOIN local ON (a = x);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
a | b | x | y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
TRUNCATE ref;
|
|
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
|
ROLLBACK;
|
|
BEGIN;
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
TRUNCATE ref;
|
|
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
|
SELECT * FROM ref JOIN local ON (a = x);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
a | b | x | y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
ROLLBACK;
|
|
BEGIN;
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
INSERT INTO ref VALUES (1,2);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2)
|
|
INSERT INTO local VALUES (1,2);
|
|
SELECT * FROM ref JOIN local ON (a = x);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
a | b | x | y
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
BEGIN;
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
-- we wont see the modifying cte in this query because we will use local execution and
|
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, NULL::integer AS y FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
|
count | x | y | a | b | count
|
|
---------------------------------------------------------------------
|
|
100 | 3 | 2 | 3 | 2 | 0
|
|
(1 row)
|
|
|
|
TRUNCATE ref;
|
|
NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
|
SELECT * FROM ref JOIN local ON (a = x);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
a | b | x | y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- we wont see the modifying cte in this query because we will use local execution and
|
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, NULL::integer AS y FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
|
count | x | y | a | b | count
|
|
---------------------------------------------------------------------
|
|
100 | 3 | 2 | 3 | 2 | 0
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- we wont see the modifying cte in this query because we will use local execution and
|
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, NULL::integer AS y FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
|
count | x | y | a | b | count
|
|
---------------------------------------------------------------------
|
|
100 | 3 | 2 | 3 | 2 | 0
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- we wont see the modifying cte in this query because we will use local execution and
|
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, NULL::integer AS y FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
|
count | x | y | a | b | count
|
|
---------------------------------------------------------------------
|
|
100 | 3 | 2 | 1 | 1 | 0
|
|
100 | 3 | 2 | 2 | 2 | 0
|
|
100 | 3 | 2 | 3 | 3 | 0
|
|
100 | 3 | 2 | 4 | 4 | 0
|
|
100 | 3 | 2 | 5 | 5 | 0
|
|
100 | 3 | 2 | 6 | 6 | 0
|
|
100 | 3 | 2 | 7 | 7 | 0
|
|
100 | 3 | 2 | 8 | 8 | 0
|
|
100 | 3 | 2 | 9 | 9 | 0
|
|
100 | 3 | 2 | 10 | 10 | 0
|
|
(10 rows)
|
|
|
|
ROLLBACK;
|
|
-- same local table reference table tests, but outside a transaction block
|
|
INSERT INTO ref VALUES (1,2);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2)
|
|
INSERT INTO local VALUES (1,2);
|
|
SELECT * FROM ref JOIN local ON (a = x);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
a | b | x | y
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2
|
|
(1 row)
|
|
|
|
-- we wont see the modifying cte in this query because we will use local execution and
|
|
-- in postgres we wouldn't see this modifying cte, so it is consistent with postgres.
|
|
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, NULL::integer AS y FROM (SELECT intermediate_result.x FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(x integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
|
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
|
count | x | y | a | b | count
|
|
---------------------------------------------------------------------
|
|
100 | 3 | 2 | 3 | 2 | 1
|
|
(1 row)
|
|
|
|
-- joins between local tables and distributed tables are disallowed
|
|
CREATE TABLE dist_table(a int);
|
|
ERROR: relation "dist_table" already exists
|
|
SELECT create_distributed_table('dist_table', 'a');
|
|
NOTICE: Copying data from local table...
|
|
NOTICE: copying the data has completed
|
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$coordinator_shouldhaveshards.dist_table$$)
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO dist_table VALUES(1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503021 (a) VALUES (1)
|
|
SELECT * FROM local JOIN dist_table ON (a = x) ORDER BY 1,2,3;
|
|
x | y | a
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1
|
|
1 | 2 | 1
|
|
3 | 2 | 3
|
|
(3 rows)
|
|
|
|
SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1 ORDER BY 1,2,3;
|
|
NOTICE: executing the command locally: SELECT local.x, local.y, dist_table.a FROM ((SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local JOIN coordinator_shouldhaveshards.dist_table_1503021 dist_table ON ((dist_table.a OPERATOR(pg_catalog.=) local.x))) WHERE (dist_table.a OPERATOR(pg_catalog.=) 1) ORDER BY local.x, local.y, dist_table.a
|
|
x | y | a
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1
|
|
1 | 2 | 1
|
|
(2 rows)
|
|
|
|
-- intermediate results are allowed
|
|
WITH cte_1 AS (SELECT * FROM dist_table ORDER BY 1 LIMIT 1)
|
|
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
|
|
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503021 dist_table WHERE true ORDER BY a LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503024 dist_table WHERE true ORDER BY a LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
|
|
a | b | x | y | a
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2 | 1
|
|
(1 row)
|
|
|
|
-- full router query with CTE and local
|
|
WITH cte_1 AS (SELECT * FROM ref LIMIT 1)
|
|
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);
|
|
NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503020 ref_1 LIMIT 1) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a)))
|
|
a | b | x | y | a | b
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2 | 1 | 2
|
|
(1 row)
|
|
|
|
DROP TABLE dist_table;
|
|
-- issue #3801
|
|
SET citus.shard_replication_factor TO 2;
|
|
CREATE TABLE dist_table(a int);
|
|
SELECT create_distributed_table('dist_table', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
-- this will use perPlacementQueryStrings, make sure it works correctly with
|
|
-- copying task
|
|
INSERT INTO dist_table SELECT a + 1 FROM dist_table;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503027_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503027_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503027 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503029_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503029_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503029 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503030_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503030_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503030 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503032_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503032_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503032 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0
|
|
ROLLBACK;
|
|
SET citus.shard_replication_factor TO 1;
|
|
BEGIN;
|
|
SET citus.shard_replication_factor TO 2;
|
|
CREATE TABLE dist_table1(a int);
|
|
-- this will use queryStringList, make sure it works correctly with
|
|
-- copying task
|
|
SELECT create_distributed_table('dist_table1', 'a');
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres')
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
CREATE table ref_table(x int PRIMARY KEY, y int);
|
|
-- this will be replicated to the coordinator because of add_coordinator test
|
|
SELECT create_reference_table('ref_table');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
TRUNCATE TABLE test;
|
|
BEGIN;
|
|
INSERT INTO test SELECT *, * FROM generate_series(1, 100);
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
SELECT COUNT(*) FROM test JOIN ref_table USING(x);
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503000 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503003 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- writing to local file and remote intermediate files
|
|
-- at the same time
|
|
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
WITH cte_1 AS (
|
|
INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *)
|
|
SELECT count(*) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT x, y FROM coordinator_shouldhaveshards.ref_table_1503039 ref_table LIMIT 10000
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_table_1503039 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_result('insert_select_XXX_1503039'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = (excluded.y OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.x, citus_table_alias.y
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) cte_1
|
|
count
|
|
---------------------------------------------------------------------
|
|
100
|
|
(1 row)
|
|
|
|
-- issue #4237: preventing empty placement creation on coordinator
|
|
CREATE TABLE test_append_table(a int);
|
|
SELECT create_distributed_table('test_append_table', 'a', 'append');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- this will fail since it will try to create an empty placement in the
|
|
-- coordinator as well
|
|
SET citus.shard_replication_factor TO 3;
|
|
SELECT master_create_empty_shard('test_append_table');
|
|
NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ...
|
|
ERROR: could only create 2 of 3 of required shard replicas
|
|
-- this will create an empty shard with replicas in the two worker nodes
|
|
SET citus.shard_replication_factor TO 2;
|
|
SELECT 1 FROM master_create_empty_shard('test_append_table');
|
|
NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ...
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- verify groupid is not 0 for each placement
|
|
SELECT COUNT(*) FROM pg_dist_placement p, pg_dist_shard s WHERE p.shardid = s.shardid AND s.logicalrelid = 'test_append_table'::regclass AND p.groupid > 0;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
SET citus.shard_replication_factor TO 1;
|
|
-- test partitioned index creation with long name
|
|
CREATE TABLE test_index_creation1
|
|
(
|
|
tenant_id integer NOT NULL,
|
|
timeperiod timestamp without time zone NOT NULL,
|
|
field1 integer NOT NULL,
|
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
|
PRIMARY KEY(tenant_id, timeperiod)
|
|
) PARTITION BY RANGE (timeperiod);
|
|
CREATE TABLE test_index_creation1_p2020_09_26
|
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
|
CREATE TABLE test_index_creation1_p2020_09_27
|
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- should be able to create indexes with INCLUDE/WHERE
|
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
|
USING btree(tenant_id, timeperiod)
|
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
|
NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503042 ON coordinator_shouldhaveshards.test_index_creation1_1503042 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 ) WHERE (tenant_id = 100)
|
|
NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503045 ON coordinator_shouldhaveshards.test_index_creation1_1503045 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 ) WHERE (tenant_id = 100)
|
|
NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503048', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503048');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503049', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503049');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503050', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503050');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503051', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503051');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503052', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503052');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503053', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503053');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503054', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503054');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503055', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503055');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503056', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503056');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503057', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503057');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503058', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503058');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503042'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503059', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503059')$$)
|
|
NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503048', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503048');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503049', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503049');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503050', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503050');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503051', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503051');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503052', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503052');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_26_1503053', 'test_index_creation1_p2020_09_2_tenant_id_time_6020e8f8_1503053');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503054', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503054');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503055', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503055');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503056', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503056');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503057', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503057');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503058', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503058');SELECT worker_fix_partition_shard_index_names('coordinator_shouldhaveshards.ix_test_index_creation5_1503045'::regclass, 'coordinator_shouldhaveshards.test_index_creation1_p2020_09_27_1503059', 'test_index_creation1_p2020_09__tenant_id_timep_624f7e94_1503059')$$)
|
|
-- test if indexes are created
|
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
|
created
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- test alter_distributed_table UDF
|
|
SET citus.shard_count TO 4;
|
|
CREATE TABLE adt_table (a INT, b INT);
|
|
CREATE TABLE adt_col (a INT UNIQUE, b INT);
|
|
CREATE TABLE adt_ref (a INT REFERENCES adt_col(a));
|
|
SELECT create_distributed_table('adt_table', 'a', colocate_with:='none');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('adt_col', 'a', colocate_with:='adt_table');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('adt_ref', 'a', colocate_with:='adt_table');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO adt_table VALUES (1, 2), (3, 4), (5, 6);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_table_1503060 AS citus_table_alias (a, b) VALUES (1,2), (5,6)
|
|
INSERT INTO adt_col VALUES (3, 4), (5, 6), (7, 8);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_col_1503064 AS citus_table_alias (a, b) VALUES (5,6)
|
|
INSERT INTO adt_ref VALUES (3), (5);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.adt_ref_1503068 AS citus_table_alias (a) VALUES (5)
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 4
|
|
adt_ref | distributed | a | 4
|
|
adt_table | distributed | a | 4
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref, adt_table
|
|
(1 row)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SET client_min_messages TO WARNING;
|
|
SELECT alter_distributed_table('adt_table', shard_count:=6, cascade_to_colocated:=true);
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET client_min_messages TO DEFAULT;
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 6
|
|
adt_ref | distributed | a | 6
|
|
adt_table | distributed | a | 6
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref, adt_table
|
|
(1 row)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SELECT alter_distributed_table('adt_table', distribution_column:='b', colocate_with:='none');
|
|
NOTICE: creating a new table for coordinator_shouldhaveshards.adt_table
|
|
NOTICE: moving the data of coordinator_shouldhaveshards.adt_table
|
|
NOTICE: dropping the old coordinator_shouldhaveshards.adt_table
|
|
NOTICE: renaming the new table to coordinator_shouldhaveshards.adt_table
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 6
|
|
adt_ref | distributed | a | 6
|
|
adt_table | distributed | b | 6
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref
|
|
adt_table
|
|
(2 rows)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SELECT * FROM adt_table ORDER BY 1;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
3 | 4
|
|
5 | 6
|
|
(3 rows)
|
|
|
|
SELECT * FROM adt_col ORDER BY 1;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
3 | 4
|
|
5 | 6
|
|
7 | 8
|
|
(3 rows)
|
|
|
|
SELECT * FROM adt_ref ORDER BY 1;
|
|
a
|
|
---------------------------------------------------------------------
|
|
3
|
|
5
|
|
(2 rows)
|
|
|
|
SET client_min_messages TO WARNING;
|
|
BEGIN;
|
|
INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x;
|
|
SELECT alter_distributed_table('adt_table', distribution_column:='a');
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT COUNT(*) FROM adt_table;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1003
|
|
(1 row)
|
|
|
|
END;
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_table | distributed | a | 6
|
|
(1 row)
|
|
|
|
SET client_min_messages TO DEFAULT;
|
|
-- issue 4508 table_1 and table_2 are used to test
|
|
-- some edge cases around intermediate result pruning
|
|
CREATE TABLE table_1 (key int, value text);
|
|
SELECT create_distributed_table('table_1', 'key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE table_2 (key int, value text);
|
|
SELECT create_distributed_table('table_2', 'key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text)
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text)
|
|
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text)
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text)
|
|
SET citus.log_intermediate_results TO ON;
|
|
SET client_min_messages to debug1;
|
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
|
SELECT count(*),
|
|
key
|
|
FROM a JOIN table_2 USING (key)
|
|
GROUP BY key
|
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
|
DEBUG: push down of limit count: 1
|
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
|
DEBUG: Subplan XXX_1 will be written to local file
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
|
NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
|
count | key
|
|
---------------------------------------------------------------------
|
|
1 | 1
|
|
(1 row)
|
|
|
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
|
INSERT INTO table_1 SELECT count(*),
|
|
key
|
|
FROM a JOIN table_2 USING (key)
|
|
GROUP BY key
|
|
HAVING (max(table_2.value) >= (SELECT value FROM a));
|
|
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
|
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1
|
|
DEBUG: push down of limit count: 1
|
|
DEBUG: generating subplan XXX_2 for subquery SELECT int4(count(*)) AS auto_coerced_by_citus_0, (a.key)::text AS auto_coerced_by_citus_1 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
DEBUG: Subplan XXX_1 will be written to local file
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint
|
|
DEBUG: Subplan XXX_2 will be written to local file
|
|
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
|
NOTICE: executing the command locally: SELECT count(*) AS auto_coerced_by_citus_0, (worker_column_1)::text AS auto_coerced_by_citus_1, worker_column_1 AS discarded_target_item_1, max(worker_column_2) AS worker_column_4 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1
|
|
NOTICE: executing the command locally: SELECT auto_coerced_by_citus_0 AS key, auto_coerced_by_citus_1 AS value FROM (SELECT intermediate_result.auto_coerced_by_citus_0, intermediate_result.auto_coerced_by_citus_1 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 text)) citus_insert_select_subquery
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
WITH stats AS (
|
|
SELECT count(key) m FROM table_1
|
|
),
|
|
inserts AS (
|
|
INSERT INTO table_2
|
|
SELECT key, count(*)
|
|
FROM table_1
|
|
WHERE key >= (SELECT m FROM stats)
|
|
GROUP BY key
|
|
HAVING count(*) <= (SELECT m FROM stats)
|
|
LIMIT 1
|
|
RETURNING *
|
|
) SELECT count(*) FROM inserts;
|
|
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1
|
|
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
|
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
|
DEBUG: push down of limit count: 1
|
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
|
DEBUG: Subplan XXX_1 will be written to local file
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
|
NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true
|
|
NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true
|
|
DEBUG: Subplan XXX_2 will be written to local file
|
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
-- a helper function which return true if the coordinated
|
|
-- trannsaction uses 2PC
|
|
SET citus.enable_metadata_sync TO OFF;
|
|
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
|
|
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
|
|
$$coordinated_transaction_should_use_2PC$$;
|
|
RESET citus.enable_metadata_sync;
|
|
-- a local SELECT followed by remote SELECTs
|
|
-- does not trigger 2PC
|
|
BEGIN;
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
f
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- remote SELECTs followed by local SELECTs
|
|
-- does not trigger 2PC
|
|
BEGIN;
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
f
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a local SELECT followed by a remote Modify
|
|
-- triggers 2PC
|
|
BEGIN;
|
|
SELECT y FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
UPDATE test SET y = y +1;
|
|
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
|
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a local modify followed by a remote SELECT
|
|
-- triggers 2PC
|
|
BEGIN;
|
|
INSERT INTO test VALUES (1,1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
SELECT count(*) FROM test;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a local modify followed by a remote MODIFY
|
|
-- triggers 2PC
|
|
BEGIN;
|
|
INSERT INTO test VALUES (1,1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
UPDATE test SET y = y +1;
|
|
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
|
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a local modify followed by a remote single shard MODIFY
|
|
-- triggers 2PC
|
|
BEGIN;
|
|
INSERT INTO test VALUES (1,1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
INSERT INTO test VALUES (3,3);
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a remote single shard modify followed by a local single
|
|
-- shard MODIFY triggers 2PC
|
|
BEGIN;
|
|
INSERT INTO test VALUES (3,3);
|
|
INSERT INTO test VALUES (1,1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
COMMIT;
|
|
-- a remote single shard select followed by a local single
|
|
-- shard MODIFY triggers 2PC. But, the transaction manager
|
|
-- is smart enough to skip sending 2PC as the remote
|
|
-- command is read only
|
|
BEGIN;
|
|
SELECT count(*) FROM test WHERE x = 3;
|
|
count
|
|
---------------------------------------------------------------------
|
|
2
|
|
(1 row)
|
|
|
|
INSERT INTO test VALUES (1,1);
|
|
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
SET LOCAL citus.log_remote_commands TO ON;
|
|
COMMIT;
|
|
NOTICE: issuing COMMIT
|
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
|
-- a local single shard select followed by a remote single
|
|
-- shard modify does not trigger 2PC
|
|
BEGIN;
|
|
SELECT count(*) FROM test WHERE x = 1;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
|
count
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
INSERT INTO test VALUES (3,3);
|
|
SELECT coordinated_transaction_should_use_2PC();
|
|
coordinated_transaction_should_use_2pc
|
|
---------------------------------------------------------------------
|
|
f
|
|
(1 row)
|
|
|
|
SET LOCAL citus.log_remote_commands TO ON;
|
|
COMMIT;
|
|
NOTICE: issuing COMMIT
|
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
|
RESET client_min_messages;
|
|
\set VERBOSITY terse
|
|
DROP TABLE ref_table;
|
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE
|
|
DELETE FROM test;
|
|
DROP TABLE test;
|
|
DROP TABLE dist_table;
|
|
DROP TABLE ref;
|
|
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
|
DROP TABLE test_append_table;
|
|
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
|
NOTICE: drop cascades to 20 other objects
|
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|