From 3acadd73219ae886bfbccabb4374221922eeb7a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halil=20Ozan=20Akg=C3=BCl?= Date: Fri, 9 Jun 2023 15:06:46 +0300 Subject: [PATCH] Citus Clock tests with Single Shard Tables (#6938) This PR tests Citus clock with single shard tables. --- .../expected/single_shard_table_udfs.out | 91 +++++++++++++++++++ .../regress/sql/single_shard_table_udfs.sql | 66 ++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/src/test/regress/expected/single_shard_table_udfs.out b/src/test/regress/expected/single_shard_table_udfs.out index da264a1c9..78b4ee25a 100644 --- a/src/test/regress/expected/single_shard_table_udfs.out +++ b/src/test/regress/expected/single_shard_table_udfs.out @@ -1211,5 +1211,96 @@ SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shar t (1 row) +-- test citus clock +SET citus.enable_cluster_clock TO ON; +CREATE TABLE clock_single(a INT); +SELECT create_distributed_table('clock_single', NULL, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_get_node_clock() AS nc1 \gset +SELECT citus_get_node_clock() AS nc2 \gset +SELECT citus_get_node_clock() AS nc3 \gset +SELECT citus_is_clock_after(:'nc2', :'nc1'); + citus_is_clock_after +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_is_clock_after(:'nc3', :'nc2'); + citus_is_clock_after +--------------------------------------------------------------------- + t +(1 row) + +BEGIN; +SELECT citus_get_node_clock() AS nc4 \gset +COPY clock_single FROM STDIN; +SELECT citus_get_node_clock() AS nc5 \gset +END; +SELECT citus_is_clock_after(:'nc4', :'nc3'); + citus_is_clock_after +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_is_clock_after(:'nc5', :'nc4'); + citus_is_clock_after +--------------------------------------------------------------------- + t +(1 row) + +BEGIN; +SELECT citus_get_transaction_clock(); + citus_get_transaction_clock +--------------------------------------------------------------------- + (xxxxxxxxxxxxx,x) +(1 row) + +END; +-- Transaction with single shard table access +SELECT nodeport AS clock_shard_nodeport FROM citus_shards +WHERE table_name::text = 'clock_single' AND nodeport IN (:worker_1_port, :worker_2_port) \gset +BEGIN; +COPY clock_single FROM STDIN; +SELECT get_current_transaction_id() \gset tid +SET client_min_messages TO DEBUG1; +-- Capture the transaction timestamp +SELECT citus_get_transaction_clock() as txnclock \gset +DEBUG: node xxxx transaction clock xxxxxx +DEBUG: node xxxx transaction clock xxxxxx +DEBUG: final global transaction clock xxxxxx +COMMIT; +-- Check to see if the clock is persisted in the sequence. +SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) +WHERE nodeport = :clock_shard_nodeport \gset +SELECT cluster_clock_logical(:'txnclock') as txnlog \gset +SELECT :logseq = :txnlog; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +BEGIN; +COPY clock_single FROM STDIN; +SELECT get_current_transaction_id() \gset tid +SET client_min_messages TO DEBUG1; +-- Capture the transaction timestamp +SELECT citus_get_transaction_clock() as txnclock \gset +DEBUG: node xxxx transaction clock xxxxxx +DEBUG: node xxxx transaction clock xxxxxx +DEBUG: final global transaction clock xxxxxx +ROLLBACK; +SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) +WHERE nodeport = :clock_shard_nodeport \gset +SELECT cluster_clock_logical(:'txnclock') as txnlog \gset +SELECT :logseq = :txnlog; + ?column? +--------------------------------------------------------------------- + t +(1 row) + SET client_min_messages TO WARNING; DROP SCHEMA null_dist_key_udfs CASCADE; diff --git a/src/test/regress/sql/single_shard_table_udfs.sql b/src/test/regress/sql/single_shard_table_udfs.sql index 68d04890f..aac01f085 100644 --- a/src/test/regress/sql/single_shard_table_udfs.sql +++ b/src/test/regress/sql/single_shard_table_udfs.sql @@ -588,5 +588,71 @@ SELECT 1 FROM citus_update_shard_statistics(:update_shard_stat_shard); SELECT shardlength > 0 FROM pg_dist_shard_placement WHERE shardid = :update_shard_stat_shard LIMIT 1; +-- test citus clock +SET citus.enable_cluster_clock TO ON; + +CREATE TABLE clock_single(a INT); +SELECT create_distributed_table('clock_single', NULL, colocate_with:='none'); + +SELECT citus_get_node_clock() AS nc1 \gset +SELECT citus_get_node_clock() AS nc2 \gset +SELECT citus_get_node_clock() AS nc3 \gset + +SELECT citus_is_clock_after(:'nc2', :'nc1'); +SELECT citus_is_clock_after(:'nc3', :'nc2'); + +BEGIN; +SELECT citus_get_node_clock() AS nc4 \gset +COPY clock_single FROM STDIN; +1 +2 +\. +SELECT citus_get_node_clock() AS nc5 \gset +END; + +SELECT citus_is_clock_after(:'nc4', :'nc3'); +SELECT citus_is_clock_after(:'nc5', :'nc4'); + +BEGIN; +SELECT citus_get_transaction_clock(); +END; + +-- Transaction with single shard table access +SELECT nodeport AS clock_shard_nodeport FROM citus_shards +WHERE table_name::text = 'clock_single' AND nodeport IN (:worker_1_port, :worker_2_port) \gset + +BEGIN; +COPY clock_single FROM STDIN; +1 +2 +\. +SELECT get_current_transaction_id() \gset tid +SET client_min_messages TO DEBUG1; +-- Capture the transaction timestamp +SELECT citus_get_transaction_clock() as txnclock \gset +COMMIT; + +-- Check to see if the clock is persisted in the sequence. +SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) +WHERE nodeport = :clock_shard_nodeport \gset +SELECT cluster_clock_logical(:'txnclock') as txnlog \gset +SELECT :logseq = :txnlog; + +BEGIN; +COPY clock_single FROM STDIN; +1 +2 +\. +SELECT get_current_transaction_id() \gset tid +SET client_min_messages TO DEBUG1; +-- Capture the transaction timestamp +SELECT citus_get_transaction_clock() as txnclock \gset +ROLLBACK; + +SELECT result as logseq from run_command_on_workers($$SELECT last_value FROM pg_dist_clock_logical_seq$$) +WHERE nodeport = :clock_shard_nodeport \gset +SELECT cluster_clock_logical(:'txnclock') as txnlog \gset +SELECT :logseq = :txnlog; + SET client_min_messages TO WARNING; DROP SCHEMA null_dist_key_udfs CASCADE;