diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 74c42d95d..b5d1b260c 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -846,6 +846,18 @@ ConnectionModifiedPlacement(MultiConnection *connection) return false; } + if (dlist_is_empty(&connection->referencedPlacements)) + { + /* + * When referencesPlacements are empty, it means that we come here + * from an API that uses a node connection (e.g., not placement connection), + * which doesn't set placements. + * In that case, the command sent could be either write or read, so we assume + * it is write to be on the safe side. + */ + return true; + } + dlist_foreach(placementIter, &connection->referencedPlacements) { ConnectionReference *connectionReference = diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 575e62068..6b862cddc 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -347,6 +347,40 @@ SELECT recover_prepared_transactions(); 0 (1 row) +SELECT shardid INTO selected_shard FROM pg_dist_shard WHERE logicalrelid='test_2pcskip'::regclass LIMIT 1; +SELECT COUNT(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +BEGIN; +SET LOCAL citus.defer_drop_after_shard_move TO OFF; +SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_1_port, 'localhost', :worker_2_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +SELECT COUNT(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_2_port, 'localhost', :worker_1_port); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- for the following test, ensure that 6 and 7 go to different shards on different workers SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7)); count diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 8aeb97ce6..054b85931 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -193,6 +193,18 @@ SELECT create_distributed_table('test_2pcskip', 'a'); INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i; SELECT recover_prepared_transactions(); +SELECT shardid INTO selected_shard FROM pg_dist_shard WHERE logicalrelid='test_2pcskip'::regclass LIMIT 1; +SELECT COUNT(*) FROM pg_dist_transaction; +BEGIN; +SET LOCAL citus.defer_drop_after_shard_move TO OFF; +SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_1_port, 'localhost', :worker_2_port); +COMMIT; +SELECT COUNT(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +SELECT citus_move_shard_placement((SELECT * FROM selected_shard), 'localhost', :worker_2_port, 'localhost', :worker_1_port); + + -- for the following test, ensure that 6 and 7 go to different shards on different workers SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7)); -- only two of the connections will perform a write (INSERT)