Run MX tests with replication = 2

allow_hash_replicated_on_mx_fix
Onder Kalaci 2021-10-19 11:05:13 +02:00
parent cb5034f91e
commit c8dcb9aa2b
12 changed files with 4087 additions and 35 deletions

View File

@ -316,7 +316,7 @@ class CitusSingleNodeSingleShardClusterConfig(CitusDefaultClusterConfig):
common.coordinator_should_haveshards(self.bindir, self.coordinator_port())
class CitusShardReplicationFactorClusterConfig(CitusDefaultClusterConfig):
class CitusShardReplicationFactorClusterConfig(CitusMXBaseClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)
self.new_settings = {"citus.shard_replication_factor": 2}

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,16 @@ SELECT create_distributed_table('on_update_fkey_table', 'id');
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES "refer'ence_table"(id) ON UPDATE CASCADE;
INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- also have one replicated table
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('replicated_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
@ -29,8 +39,24 @@ SELECT count(*) FROM on_update_fkey_table;
0
(1 row)
-- fill the table again
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
RESET citus.task_assignment_policy;
-- fill the tables again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -59,6 +85,16 @@ BEGIN;
0
(1 row)
ROLLBACK;
BEGIN;
ALTER TABLE replicated_table ADD COLUMN x INT;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
@ -70,8 +106,17 @@ SELECT count(*) FROM on_update_fkey_table;
0
(1 row)
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -95,6 +140,10 @@ BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE replicated_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -112,6 +161,11 @@ NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE replicated_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
@ -136,6 +190,26 @@ BEGIN;
0
(1 row)
ROLLBACK;
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
TRUNCATE replicated_table;
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
1001
(1 row)
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
@ -162,6 +236,16 @@ BEGIN;
t
(1 row)
ROLLBACK;
BEGIN;
-- should work since the schema is in the search path
SET search_path TO 'truncate_from_workers';
SELECT lock_relation_if_exists('replicated_table', 'ACCESS SHARE');
lock_relation_if_exists
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN;
-- should return false since there is no such table
@ -249,7 +333,8 @@ BEGIN;
COMMIT;
DROP SCHEMA truncate_from_workers CASCADE;
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table truncate_from_workers."refer'ence_table"
drop cascades to table truncate_from_workers.on_update_fkey_table
drop cascades to table truncate_from_workers.replicated_table
SET search_path TO public;

View File

@ -33,6 +33,23 @@ SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
SET citus.shard_replication_factor to 2;
CREATE TABLE table_1_rep (key int, value text);
SELECT create_distributed_table('table_1_rep', 'key', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table_2_rep (key int, value text);
SELECT create_distributed_table('table_2_rep', 'key', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table_1_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
set citus.log_intermediate_results TO ON;
set client_min_messages to debug1;
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
@ -96,6 +113,67 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
0
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count | key
---------------------------------------------------------------------
1 | 1
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_1_port
SET search_path TO mx_coordinator_shouldhaveshards;
set citus.log_intermediate_results TO ON;
@ -161,6 +239,67 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
0
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count | key
---------------------------------------------------------------------
1 | 1
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :master_port
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?

View File

@ -91,6 +91,42 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
(1 row)
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- add some replicated tables
SET citus.shard_replication_factor TO 2;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events_replicated(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_replicated_2021_jan PARTITION OF events_replicated
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_replicated_2021_feb PARTITION OF events_replicated
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events_replicated SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events_replicated', 'ts');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_table_set_access_method('events_replicated_2021_jan', 'columnar');
alter_table_set_access_method
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_replicated_1(col int unique, b tt2);
SELECT create_distributed_table('distributed_table_replicated_1', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX indrep1 ON distributed_table_replicated_1(b);
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
@ -120,12 +156,24 @@ SELECT * FROM test_matview;
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text;
<<<<<<< HEAD
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
(3 rows)
=======
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
events_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
events_replicated | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c
events_replicated_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c
events_replicated_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c
(6 rows)
>>>>>>> 6297b3a0f (Run MX tests with replication = 2)
SELECT count(*) > 0 FROM pg_dist_node;
?column?
@ -542,6 +590,24 @@ BEGIN;
(1 row)
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep(a int);
SELECT create_distributed_table('test_table_rep', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
@ -584,6 +650,49 @@ BEGIN;
(1 row)
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET citus.multi_shard_modify_mode TO sequential;
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
count
---------------------------------------------------------------------
101
(1 row)
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
count
---------------------------------------------------------------------
1
(1 row)
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- cleanup
\c - - - :master_port

View File

@ -10,6 +10,11 @@ SET search_path TO citus_mx_test_schema;
\COPY citus_mx_test_schema_join_1.nation_hash_2 FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_2.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
SET citus.shard_replication_factor TO 2;
CREATE TABLE citus_mx_test_schema.nation_hash_replicated AS SELECT * FROM citus_mx_test_schema.nation_hash;
SELECT create_distributed_table('citus_mx_test_schema.nation_hash_replicated', 'n_nationkey');
\COPY nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- now try loading data from worker node
\c - - - :worker_1_port
SET search_path TO public;
@ -17,10 +22,13 @@ SET search_path TO public;
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\c - - - :worker_2_port
-- and use second worker as well
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- get ready for the next test
TRUNCATE orders_mx;
@ -35,6 +43,8 @@ show citus.local_shared_pool_size;
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- set it back
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();

View File

@ -45,7 +45,7 @@ test: coordinator_evaluation_modify
test: coordinator_evaluation_select
test: multi_mx_call
test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution
test: multi_mx_modifications local_shard_execution local_shard_execution_replicated
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy
test: undistribute_table_cascade_mx

View File

@ -7,15 +7,30 @@ SET search_path TO citus_mx_test_schema;
\COPY citus_mx_test_schema_join_1.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_1.nation_hash_2 FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_2.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
SET citus.shard_replication_factor TO 2;
CREATE TABLE citus_mx_test_schema.nation_hash_replicated AS SELECT * FROM citus_mx_test_schema.nation_hash;
SELECT create_distributed_table('citus_mx_test_schema.nation_hash_replicated', 'n_nationkey');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$citus_mx_test_schema.nation_hash_replicated$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- now try loading data from worker node
\c - - - :worker_1_port
SET search_path TO public;
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\c - - - :worker_2_port
-- and use second worker as well
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- get ready for the next test
TRUNCATE orders_mx;
\c - - - :worker_2_port
@ -23,74 +38,83 @@ SET citus.log_local_commands TO ON;
-- simulate the case where there is no connection slots available
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
pg_reload_conf
----------------
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
----------
pg_sleep
---------------------------------------------------------------------
(1 row)
show citus.local_shared_pool_size;
citus.local_shared_pool_size
------------------------------
citus.local_shared_pool_size
---------------------------------------------------------------------
-1
(1 row)
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
NOTICE: executing the copy locally for shard 1220075
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..."
NOTICE: executing the copy locally for shard 1220071
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..."
NOTICE: executing the copy locally for shard 1220069
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request"
NOTICE: executing the copy locally for shard 1220079
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir"
NOTICE: executing the copy locally for shard 1220083
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..."
NOTICE: executing the copy locally for shard 1220073
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto"
NOTICE: executing the copy locally for shard 1220077
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..."
NOTICE: executing the copy locally for shard 1220081
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos"
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
NOTICE: executing the copy locally for shard 1220079
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo"
NOTICE: executing the copy locally for shard 1220077
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..."
NOTICE: executing the copy locally for shard 1220073
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully "
NOTICE: executing the copy locally for shard 1220071
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..."
NOTICE: executing the copy locally for shard 1220083
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..."
NOTICE: executing the copy locally for shard 1220081
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol"
NOTICE: executing the copy locally for shard 1220075
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..."
NOTICE: executing the copy locally for shard 1220069
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..."
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 1: "0|ALGERIA|0| haggle. carefully final deposits detect slyly agai"
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 2: "1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon"
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 3: "2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forg..."
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 7: "6|FRANCE|3|refully final requests. regular, ironi"
-- set it back
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
----------------
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
----------
pg_sleep
---------------------------------------------------------------------
(1 row)
show citus.local_shared_pool_size;
citus.local_shared_pool_size
------------------------------
citus.local_shared_pool_size
---------------------------------------------------------------------
50
(1 row)

File diff suppressed because it is too large Load Diff

View File

@ -18,12 +18,25 @@ ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERE
INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- also have one replicated table
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('replicated_table', 'id');
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- fill the table again
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM replicated_table;
RESET citus.task_assignment_policy;
-- fill the tables again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE "refer'ence_table" CASCADE;
@ -41,6 +54,12 @@ BEGIN;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
BEGIN;
ALTER TABLE replicated_table ADD COLUMN x INT;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
@ -49,8 +68,13 @@ SET search_path TO 'truncate_from_workers';
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE "refer'ence_table" CASCADE;
@ -62,6 +86,11 @@ BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE replicated_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
@ -74,6 +103,13 @@ BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE replicated_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
@ -92,6 +128,17 @@ BEGIN;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
TRUNCATE replicated_table;
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM replicated_table;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
@ -113,6 +160,13 @@ BEGIN;
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
ROLLBACK;
BEGIN;
-- should work since the schema is in the search path
SET search_path TO 'truncate_from_workers';
SELECT lock_relation_if_exists('replicated_table', 'ACCESS SHARE');
ROLLBACK;
BEGIN;
-- should return false since there is no such table
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE');

View File

@ -19,6 +19,18 @@ SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
SET citus.shard_replication_factor to 2;
CREATE TABLE table_1_rep (key int, value text);
SELECT create_distributed_table('table_1_rep', 'key', colocate_with := 'none');
CREATE TABLE table_2_rep (key int, value text);
SELECT create_distributed_table('table_2_rep', 'key', colocate_with := 'none');
INSERT INTO table_1_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
set citus.log_intermediate_results TO ON;
set client_min_messages to debug1;
@ -50,6 +62,34 @@ inserts AS (
RETURNING *
) SELECT count(*) FROM inserts;
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
\c - - - :worker_1_port
SET search_path TO mx_coordinator_shouldhaveshards;
@ -84,6 +124,34 @@ inserts AS (
RETURNING *
) SELECT count(*) FROM inserts;
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
\c - - - :master_port
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);

View File

@ -68,6 +68,35 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- add some replicated tables
SET citus.shard_replication_factor TO 2;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events_replicated(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_replicated_2021_jan PARTITION OF events_replicated
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_replicated_2021_feb PARTITION OF events_replicated
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events_replicated SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events_replicated', 'ts');
SELECT alter_table_set_access_method('events_replicated_2021_jan', 'columnar');
CREATE TABLE distributed_table_replicated_1(col int unique, b tt2);
SELECT create_distributed_table('distributed_table_replicated_1', 'col');
CREATE INDEX indrep1 ON distributed_table_replicated_1(b);
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
@ -230,6 +259,15 @@ BEGIN;
SELECT create_distributed_table('test_table', 'a');
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep(a int);
SELECT create_distributed_table('test_table_rep', 'a');
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
@ -248,6 +286,26 @@ BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.multi_shard_modify_mode TO sequential;
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- cleanup
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";