mirror of https://github.com/citusdata/citus.git
Use IsMultiStatementTransaction for SELECT .. FOR UPDATE queries (#6288)
* Use IsMultiStatementTransaction instead of IsTransaction for row-locking operations. * Add regression test for SELECT..FOR UPDATE statementniupre/TestDeferredDropAndCleanup
parent
6f06ff78cc
commit
ac96370ddf
|
@ -1421,7 +1421,7 @@ DistributedExecutionRequiresRollback(List *taskList)
|
||||||
* Do not check SelectOpensTransactionBlock, always open transaction block
|
* Do not check SelectOpensTransactionBlock, always open transaction block
|
||||||
* if SELECT FOR UPDATE is executed inside a distributed transaction.
|
* if SELECT FOR UPDATE is executed inside a distributed transaction.
|
||||||
*/
|
*/
|
||||||
return IsTransactionBlock();
|
return IsMultiStatementTransaction();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ReadOnlyTask(task->taskType))
|
if (ReadOnlyTask(task->taskType))
|
||||||
|
|
|
@ -78,5 +78,36 @@ $$);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
END;
|
END;
|
||||||
|
CREATE OR REPLACE FUNCTION select_for_update()
|
||||||
|
RETURNS void
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
my int;
|
||||||
|
BEGIN
|
||||||
|
SELECT y INTO my FROM test WHERE x = 1 FOR UPDATE;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
-- so that we can prove that we open a transaction block by logging below:
|
||||||
|
-- "NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;..."
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
SELECT select_for_update();
|
||||||
|
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
CONTEXT: SQL statement "SELECT y FROM test WHERE x = 1 FOR UPDATE"
|
||||||
|
PL/pgSQL function select_for_update() line XX at SQL statement
|
||||||
|
NOTICE: issuing SELECT y FROM adaptive_executor.test_801009000 test WHERE (x OPERATOR(pg_catalog.=) 1) FOR UPDATE OF test
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
CONTEXT: SQL statement "SELECT y FROM test WHERE x = 1 FOR UPDATE"
|
||||||
|
PL/pgSQL function select_for_update() line XX at SQL statement
|
||||||
|
NOTICE: issuing COMMIT
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
select_for_update
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.log_remote_commands TO off;
|
||||||
DROP SCHEMA adaptive_executor CASCADE;
|
DROP SCHEMA adaptive_executor CASCADE;
|
||||||
NOTICE: drop cascades to table test
|
NOTICE: drop cascades to 2 other objects
|
||||||
|
DETAIL: drop cascades to table test
|
||||||
|
drop cascades to function select_for_update()
|
||||||
|
|
|
@ -40,4 +40,22 @@ SELECT sum(result::bigint) FROM run_command_on_workers($$
|
||||||
$$);
|
$$);
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION select_for_update()
|
||||||
|
RETURNS void
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
my int;
|
||||||
|
BEGIN
|
||||||
|
SELECT y INTO my FROM test WHERE x = 1 FOR UPDATE;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- so that we can prove that we open a transaction block by logging below:
|
||||||
|
-- "NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;..."
|
||||||
|
SET citus.log_remote_commands TO on;
|
||||||
|
|
||||||
|
SELECT select_for_update();
|
||||||
|
|
||||||
|
SET citus.log_remote_commands TO off;
|
||||||
|
|
||||||
DROP SCHEMA adaptive_executor CASCADE;
|
DROP SCHEMA adaptive_executor CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue