From ac96370ddf06ad8c850f5a8d146a6b22f3ecc095 Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Tue, 6 Sep 2022 17:38:41 +0300 Subject: [PATCH] Use IsMultiStatementTransaction for SELECT .. FOR UPDATE queries (#6288) * Use IsMultiStatementTransaction instead of IsTransaction for row-locking operations. * Add regression test for SELECT..FOR UPDATE statement --- .../distributed/executor/adaptive_executor.c | 2 +- .../regress/expected/adaptive_executor.out | 33 ++++++++++++++++++- src/test/regress/sql/adaptive_executor.sql | 18 ++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a0e16876a..948d65791 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1421,7 +1421,7 @@ DistributedExecutionRequiresRollback(List *taskList) * Do not check SelectOpensTransactionBlock, always open transaction block * if SELECT FOR UPDATE is executed inside a distributed transaction. */ - return IsTransactionBlock(); + return IsMultiStatementTransaction(); } if (ReadOnlyTask(task->taskType)) diff --git a/src/test/regress/expected/adaptive_executor.out b/src/test/regress/expected/adaptive_executor.out index aeaa553f2..4372747d6 100644 --- a/src/test/regress/expected/adaptive_executor.out +++ b/src/test/regress/expected/adaptive_executor.out @@ -78,5 +78,36 @@ $$); (1 row) END; +CREATE OR REPLACE FUNCTION select_for_update() +RETURNS void +AS $$ +DECLARE + my int; +BEGIN + SELECT y INTO my FROM test WHERE x = 1 FOR UPDATE; +END; +$$ LANGUAGE plpgsql; +-- so that we can prove that we open a transaction block by logging below: +-- "NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;..." +SET citus.log_remote_commands TO on; +SELECT select_for_update(); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: SQL statement "SELECT y FROM test WHERE x = 1 FOR UPDATE" +PL/pgSQL function select_for_update() line XX at SQL statement +NOTICE: issuing SELECT y FROM adaptive_executor.test_801009000 test WHERE (x OPERATOR(pg_catalog.=) 1) FOR UPDATE OF test +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +CONTEXT: SQL statement "SELECT y FROM test WHERE x = 1 FOR UPDATE" +PL/pgSQL function select_for_update() line XX at SQL statement +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + select_for_update +--------------------------------------------------------------------- + +(1 row) + +SET citus.log_remote_commands TO off; DROP SCHEMA adaptive_executor CASCADE; -NOTICE: drop cascades to table test +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test +drop cascades to function select_for_update() diff --git a/src/test/regress/sql/adaptive_executor.sql b/src/test/regress/sql/adaptive_executor.sql index f7d6c6f1e..5dd14e6f8 100644 --- a/src/test/regress/sql/adaptive_executor.sql +++ b/src/test/regress/sql/adaptive_executor.sql @@ -40,4 +40,22 @@ SELECT sum(result::bigint) FROM run_command_on_workers($$ $$); END; +CREATE OR REPLACE FUNCTION select_for_update() +RETURNS void +AS $$ +DECLARE + my int; +BEGIN + SELECT y INTO my FROM test WHERE x = 1 FOR UPDATE; +END; +$$ LANGUAGE plpgsql; + +-- so that we can prove that we open a transaction block by logging below: +-- "NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;..." +SET citus.log_remote_commands TO on; + +SELECT select_for_update(); + +SET citus.log_remote_commands TO off; + DROP SCHEMA adaptive_executor CASCADE;