mirror of https://github.com/citusdata/citus.git
Use 2PC when using a node connection (#4997)
parent
82f34a8d88
commit
87e3a5e24a
|
@ -846,6 +846,18 @@ ConnectionModifiedPlacement(MultiConnection *connection)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dlist_is_empty(&connection->referencedPlacements))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When referencesPlacements are empty, it means that we come here
|
||||||
|
* from an API that uses a node connection (e.g., not placement connection),
|
||||||
|
* which doesn't set placements.
|
||||||
|
* In that case, the command sent could be either write or read, so we assume
|
||||||
|
* it is write to be on the safe side.
|
||||||
|
*/
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
dlist_foreach(placementIter, &connection->referencedPlacements)
|
dlist_foreach(placementIter, &connection->referencedPlacements)
|
||||||
{
|
{
|
||||||
ConnectionReference *connectionReference =
|
ConnectionReference *connectionReference =
|
||||||
|
|
|
@ -347,6 +347,40 @@ SELECT recover_prepared_transactions();
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shardid INTO selected_shard FROM pg_dist_shard WHERE logicalrelid='test_2pcskip'::regclass LIMIT 1;
|
||||||
|
SELECT COUNT(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.defer_drop_after_shard_move TO OFF;
|
||||||
|
SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
SELECT COUNT(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_2_port, 'localhost', :worker_1_port);
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- for the following test, ensure that 6 and 7 go to different shards on different workers
|
-- for the following test, ensure that 6 and 7 go to different shards on different workers
|
||||||
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
|
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
|
||||||
count
|
count
|
||||||
|
|
|
@ -193,6 +193,18 @@ SELECT create_distributed_table('test_2pcskip', 'a');
|
||||||
INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i;
|
INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i;
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
SELECT shardid INTO selected_shard FROM pg_dist_shard WHERE logicalrelid='test_2pcskip'::regclass LIMIT 1;
|
||||||
|
SELECT COUNT(*) FROM pg_dist_transaction;
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.defer_drop_after_shard_move TO OFF;
|
||||||
|
SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_1_port, 'localhost', :worker_2_port);
|
||||||
|
COMMIT;
|
||||||
|
SELECT COUNT(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_2_port, 'localhost', :worker_1_port);
|
||||||
|
|
||||||
|
|
||||||
-- for the following test, ensure that 6 and 7 go to different shards on different workers
|
-- for the following test, ensure that 6 and 7 go to different shards on different workers
|
||||||
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
|
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
|
||||||
-- only two of the connections will perform a write (INSERT)
|
-- only two of the connections will perform a write (INSERT)
|
||||||
|
|
Loading…
Reference in New Issue