mirror of https://github.com/citusdata/citus.git
Citus Clock tests with Single Shard Tables (#6938)
This PR tests Citus clock with single shard tables.pull/6990/head
parent
2ba3bffe1e
commit
3acadd7321
|
@ -1211,5 +1211,96 @@ SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shar
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- test citus clock
|
||||||
|
SET citus.enable_cluster_clock TO ON;
|
||||||
|
CREATE TABLE clock_single(a INT);
|
||||||
|
SELECT create_distributed_table('clock_single', NULL, colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_get_node_clock() AS nc1 \gset
|
||||||
|
SELECT citus_get_node_clock() AS nc2 \gset
|
||||||
|
SELECT citus_get_node_clock() AS nc3 \gset
|
||||||
|
SELECT citus_is_clock_after(:'nc2', :'nc1');
|
||||||
|
citus_is_clock_after
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_is_clock_after(:'nc3', :'nc2');
|
||||||
|
citus_is_clock_after
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_get_node_clock() AS nc4 \gset
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
SELECT citus_get_node_clock() AS nc5 \gset
|
||||||
|
END;
|
||||||
|
SELECT citus_is_clock_after(:'nc4', :'nc3');
|
||||||
|
citus_is_clock_after
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_is_clock_after(:'nc5', :'nc4');
|
||||||
|
citus_is_clock_after
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_get_transaction_clock();
|
||||||
|
citus_get_transaction_clock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(xxxxxxxxxxxxx,x)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- Transaction with single shard table access
|
||||||
|
SELECT nodeport AS clock_shard_nodeport FROM citus_shards
|
||||||
|
WHERE table_name::text = 'clock_single' AND nodeport IN (:worker_1_port, :worker_2_port) \gset
|
||||||
|
BEGIN;
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
SELECT get_current_transaction_id() \gset tid
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Capture the transaction timestamp
|
||||||
|
SELECT citus_get_transaction_clock() as txnclock \gset
|
||||||
|
DEBUG: node xxxx transaction clock xxxxxx
|
||||||
|
DEBUG: node xxxx transaction clock xxxxxx
|
||||||
|
DEBUG: final global transaction clock xxxxxx
|
||||||
|
COMMIT;
|
||||||
|
-- Check to see if the clock is persisted in the sequence.
|
||||||
|
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$)
|
||||||
|
WHERE nodeport = :clock_shard_nodeport \gset
|
||||||
|
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
|
||||||
|
SELECT :logseq = :txnlog;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
SELECT get_current_transaction_id() \gset tid
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Capture the transaction timestamp
|
||||||
|
SELECT citus_get_transaction_clock() as txnclock \gset
|
||||||
|
DEBUG: node xxxx transaction clock xxxxxx
|
||||||
|
DEBUG: node xxxx transaction clock xxxxxx
|
||||||
|
DEBUG: final global transaction clock xxxxxx
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$)
|
||||||
|
WHERE nodeport = :clock_shard_nodeport \gset
|
||||||
|
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
|
||||||
|
SELECT :logseq = :txnlog;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA null_dist_key_udfs CASCADE;
|
DROP SCHEMA null_dist_key_udfs CASCADE;
|
||||||
|
|
|
@ -588,5 +588,71 @@ SELECT 1 FROM citus_update_shard_statistics(:update_shard_stat_shard);
|
||||||
|
|
||||||
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
|
SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1;
|
||||||
|
|
||||||
|
-- test citus clock
|
||||||
|
SET citus.enable_cluster_clock TO ON;
|
||||||
|
|
||||||
|
CREATE TABLE clock_single(a INT);
|
||||||
|
SELECT create_distributed_table('clock_single', NULL, colocate_with:='none');
|
||||||
|
|
||||||
|
SELECT citus_get_node_clock() AS nc1 \gset
|
||||||
|
SELECT citus_get_node_clock() AS nc2 \gset
|
||||||
|
SELECT citus_get_node_clock() AS nc3 \gset
|
||||||
|
|
||||||
|
SELECT citus_is_clock_after(:'nc2', :'nc1');
|
||||||
|
SELECT citus_is_clock_after(:'nc3', :'nc2');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_get_node_clock() AS nc4 \gset
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
1
|
||||||
|
2
|
||||||
|
\.
|
||||||
|
SELECT citus_get_node_clock() AS nc5 \gset
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT citus_is_clock_after(:'nc4', :'nc3');
|
||||||
|
SELECT citus_is_clock_after(:'nc5', :'nc4');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT citus_get_transaction_clock();
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- Transaction with single shard table access
|
||||||
|
SELECT nodeport AS clock_shard_nodeport FROM citus_shards
|
||||||
|
WHERE table_name::text = 'clock_single' AND nodeport IN (:worker_1_port, :worker_2_port) \gset
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
1
|
||||||
|
2
|
||||||
|
\.
|
||||||
|
SELECT get_current_transaction_id() \gset tid
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Capture the transaction timestamp
|
||||||
|
SELECT citus_get_transaction_clock() as txnclock \gset
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- Check to see if the clock is persisted in the sequence.
|
||||||
|
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$)
|
||||||
|
WHERE nodeport = :clock_shard_nodeport \gset
|
||||||
|
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
|
||||||
|
SELECT :logseq = :txnlog;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
COPY clock_single FROM STDIN;
|
||||||
|
1
|
||||||
|
2
|
||||||
|
\.
|
||||||
|
SELECT get_current_transaction_id() \gset tid
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
-- Capture the transaction timestamp
|
||||||
|
SELECT citus_get_transaction_clock() as txnclock \gset
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$)
|
||||||
|
WHERE nodeport = :clock_shard_nodeport \gset
|
||||||
|
SELECT cluster_clock_logical(:'txnclock') as txnlog \gset
|
||||||
|
SELECT :logseq = :txnlog;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA null_dist_key_udfs CASCADE;
|
DROP SCHEMA null_dist_key_udfs CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue