mirror of https://github.com/citusdata/citus.git
163 lines
7.3 KiB
Plaintext
163 lines
7.3 KiB
Plaintext
--
|
|
-- MULTI_DISTRIBUTED_TRANSACTION_ID
|
|
--
|
|
-- Unit tests for distributed transaction id functionality
|
|
--
|
|
-- get the current transaction id, which should be uninitialized
|
|
-- note that we skip printing the databaseId, which might change
|
|
-- per run
|
|
-- set timezone to a specific value to prevent
|
|
-- different values on different servers
|
|
SET TIME ZONE 'PST8PDT';
|
|
-- should return uninitialized values if not in a transaction
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp
|
|
---------------------------------------------------------------------
|
|
0 | 0 |
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
-- we should still see the uninitialized values
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- now assign a value
|
|
SELECT assign_distributed_transaction_id(50, 50, '2016-01-01 00:00:00+0');
|
|
assign_distributed_transaction_id
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- see the assigned value
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
50 | 50 | Thu Dec 31 16:00:00 2015 PST | t
|
|
(1 row)
|
|
|
|
-- a backend cannot be assigned another tx id if already assigned
|
|
SELECT assign_distributed_transaction_id(51, 51, '2017-01-01 00:00:00+0');
|
|
ERROR: the backend has already been assigned a transaction id
|
|
ROLLBACK;
|
|
-- since the transaction finished, we should see the uninitialized values
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- also see that ROLLBACK (i.e., failures in the transaction) clears the shared memory
|
|
BEGIN;
|
|
-- we should still see the uninitialized values
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- now assign a value
|
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
|
assign_distributed_transaction_id
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT 5 / 0;
|
|
ERROR: division by zero
|
|
COMMIT;
|
|
-- since the transaction errored, we should see the uninitialized values again
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- we should also see that a new connection means an uninitialized transaction id
|
|
BEGIN;
|
|
SELECT assign_distributed_transaction_id(52, 52, '2015-01-01 00:00:00+0');
|
|
assign_distributed_transaction_id
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
52 | 52 | Wed Dec 31 16:00:00 2014 PST | t
|
|
(1 row)
|
|
|
|
\c - - - :master_port
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- now show that PREPARE resets the distributed transaction id
|
|
BEGIN;
|
|
SELECT assign_distributed_transaction_id(120, 120, '2015-01-01 00:00:00+0');
|
|
assign_distributed_transaction_id
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
120 | 120 | Wed Dec 31 16:00:00 2014 PST | t
|
|
(1 row)
|
|
|
|
PREPARE TRANSACTION 'dist_xact_id_test';
|
|
-- after the prepare we should see that transaction id is cleared
|
|
SELECT initiator_node_identifier, transaction_number, transaction_stamp, (process_id = pg_backend_pid()) FROM get_current_transaction_id();
|
|
initiator_node_identifier | transaction_number | transaction_stamp | ?column?
|
|
---------------------------------------------------------------------
|
|
0 | 0 | | t
|
|
(1 row)
|
|
|
|
-- cleanup the transaction
|
|
ROLLBACK PREPARED 'dist_xact_id_test';
|
|
-- set back to the original zone
|
|
SET TIME ZONE DEFAULT;
|
|
-- parallel safe wrapper for getting the current transaction number
|
|
CREATE OR REPLACE FUNCTION parallel_worker_transaction_id_test()
|
|
RETURNS bigint STRICT VOLATILE PARALLEL SAFE AS $$
|
|
SELECT transaction_number FROM get_current_transaction_id();
|
|
$$ LANGUAGE sql;
|
|
-- force the transaction ID to be used in a parallel plan
|
|
BEGIN;
|
|
SELECT assign_distributed_transaction_id(50, 1234567, '2016-01-01 00:00:00+0');
|
|
assign_distributed_transaction_id
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- create >8MB table
|
|
CREATE UNLOGGED TABLE parallel_id_test AS
|
|
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
|
INSERT INTO parallel_id_test VALUES (1234567), (1234567), (1234568), (1234568);
|
|
ANALYSE parallel_id_test;
|
|
SET LOCAL max_parallel_workers_per_gather TO 2;
|
|
SET LOCAL parallel_tuple_cost TO 0;
|
|
EXPLAIN (COSTS OFF)
|
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Gather
|
|
Workers Planned: 1
|
|
-> Parallel Seq Scan on parallel_id_test
|
|
Filter: (a = parallel_worker_transaction_id_test())
|
|
(4 rows)
|
|
|
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
|
a
|
|
---------------------------------------------------------------------
|
|
1234567
|
|
1234567
|
|
(2 rows)
|
|
|
|
ROLLBACK;
|