mirror of https://github.com/citusdata/citus.git
Do not trigger 2PC for reads on local execution
Before this commit, Citus used 2PC no matter what kind of local query execution happens. For example, if the coordinator has shards (and the workers as well), even a simple SELECT query could start 2PC: ```SQL WITH cte_1 AS (SELECT * FROM test LIMIT 10) SELECT count(*) FROM cte_1; ``` In this query, the local execution of the shards (and also intermediate result reads) triggers the 2PC. To prevent that, Citus now distinguishes local reads and local writes. And, Citus switches to 2PC only if a modification happens. This may still lead to unnecessary 2PCs when there is a local modification and remote SELECTs only. Though, we handle that separately via #4587.pull/4806/head
parent
874d5fd962
commit
6a7ed7b309
|
@ -1165,23 +1165,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
|
|||
return xactProperties;
|
||||
}
|
||||
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
/*
|
||||
* In case localExecutionHappened, we force the executor to use 2PC.
|
||||
* The primary motivation is that at this point we're definitely expanding
|
||||
* the nodes participated in the transaction. And, by re-generating the
|
||||
* remote task lists during local query execution, we might prevent the adaptive
|
||||
* executor to kick-in 2PC (or even start coordinated transaction, that's why
|
||||
* we prefer adding this check here instead of
|
||||
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
|
||||
*/
|
||||
xactProperties.errorOnAnyFailure = true;
|
||||
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
|
||||
xactProperties.requires2PC = true;
|
||||
return xactProperties;
|
||||
}
|
||||
|
||||
if (DistributedExecutionRequiresRollback(taskList))
|
||||
{
|
||||
/* transaction blocks are required if the task list needs to roll back */
|
||||
|
|
|
@ -236,6 +236,17 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
{
|
||||
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||
}
|
||||
|
||||
if (!ReadOnlyTask(task->taskType))
|
||||
{
|
||||
/*
|
||||
* Any modification on the local execution should enable 2PC. If remote
|
||||
* queries are also ReadOnly, our 2PC logic is smart enough to skip sending
|
||||
* PREPARE to those connections.
|
||||
*/
|
||||
CoordinatedTransactionUse2PC();
|
||||
}
|
||||
|
||||
LogLocalCommand(task);
|
||||
|
||||
if (isUtilityCommand)
|
||||
|
|
|
@ -18,9 +18,14 @@
|
|||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include "distributed/transaction_management.h"
|
||||
|
||||
|
||||
static Size MemoryContextTotalSpace(MemoryContext context);
|
||||
|
||||
PG_FUNCTION_INFO_V1(top_transaction_context_size);
|
||||
PG_FUNCTION_INFO_V1(coordinated_transaction_uses_2PC);
|
||||
|
||||
|
||||
/*
|
||||
* top_transaction_context_size returns current size of TopTransactionContext.
|
||||
|
@ -54,3 +59,20 @@ MemoryContextTotalSpace(MemoryContext context)
|
|||
|
||||
return totalSpace;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* coordinated_transaction_uses_2PC returns true if the transaction is in a
|
||||
* coordinated transaction and uses 2PC. If the transaction is nott in a
|
||||
* coordinated transaction, the function throws an error.
|
||||
*/
|
||||
Datum
|
||||
coordinated_transaction_uses_2PC(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (!InCoordinatedTransaction())
|
||||
{
|
||||
ereport(ERROR, (errmsg("The transaction is not a coordinated transaction")));
|
||||
}
|
||||
|
||||
PG_RETURN_BOOL(GetCoordinatedTransactionUses2PC());
|
||||
}
|
||||
|
|
|
@ -195,6 +195,17 @@ CoordinatedTransactionUse2PC(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetCoordinatedTransactionUses2PC is a wrapper function to read the value
|
||||
* of CoordinatedTransactionUses2PC.
|
||||
*/
|
||||
bool
|
||||
GetCoordinatedTransactionUses2PC(void)
|
||||
{
|
||||
return CoordinatedTransactionUses2PC;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
InitializeTransactionManagement(void)
|
||||
{
|
||||
|
|
|
@ -112,6 +112,7 @@ extern bool TransactionModifiedNodeMetadata;
|
|||
extern void UseCoordinatedTransaction(void);
|
||||
extern bool InCoordinatedTransaction(void);
|
||||
extern void CoordinatedTransactionUse2PC(void);
|
||||
extern bool GetCoordinatedTransactionUses2PC(void);
|
||||
extern bool IsMultiStatementTransaction(void);
|
||||
extern void EnsureDistributedTransactionId(void);
|
||||
|
||||
|
|
|
@ -895,6 +895,215 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- a helper function which return true if the coordinated trannsaction uses
|
||||
-- 2PC
|
||||
CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC()
|
||||
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
|
||||
$$coordinated_transaction_uses_2PC$$;
|
||||
-- a local SELECT followed by remote SELECTs
|
||||
-- does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
||||
y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- remote SELECTs followed by local SELECTs
|
||||
-- does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
||||
y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a local SELECT followed by a remote Modify
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
||||
y
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
UPDATE test SET y = y +1;
|
||||
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
||||
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a local modify followed by a remote SELECT
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
SELECT count(*) FROM test;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a local modify followed by a remote MODIFY
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
UPDATE test SET y = y +1;
|
||||
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
||||
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a local modify followed by a remote single shard MODIFY
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
INSERT INTO test VALUES (3,3);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a remote single shard modify followed by a local single
|
||||
-- shard MODIFY triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (3,3);
|
||||
INSERT INTO test VALUES (1,1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- a remote single shard select followed by a local single
|
||||
-- shard MODIFY triggers 2PC. But, the transaction manager
|
||||
-- is smart enough to skip sending 2PC as the remote
|
||||
-- command is read only
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test WHERE x = 3;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test VALUES (1,1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
COMMIT;
|
||||
NOTICE: issuing COMMIT
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
-- a local single shard select followed by a remote single
|
||||
-- shard modify does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test WHERE x = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test VALUES (3,3);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
COMMIT;
|
||||
NOTICE: issuing COMMIT
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
RESET client_min_messages;
|
||||
\set VERBOSITY terse
|
||||
DROP TABLE ref_table;
|
||||
|
@ -906,7 +1115,7 @@ DROP TABLE ref;
|
|||
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
|
||||
DROP TABLE test_append_table;
|
||||
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
|
||||
NOTICE: drop cascades to 19 other objects
|
||||
NOTICE: drop cascades to 20 other objects
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -1855,6 +1855,138 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and
|
|||
(1 row)
|
||||
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC()
|
||||
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
|
||||
$$coordinated_transaction_uses_2PC$$;
|
||||
-- a multi-shard/single-shard select that is failed over to local
|
||||
-- execution doesn't start a 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM another_schema_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
10001
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM another_schema_table WHERE a = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- same without a transaction block
|
||||
WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000),
|
||||
cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc)
|
||||
SELECT cnt, enabled_2pc FROM cte_1, cte_2;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2
|
||||
cnt | enabled_2pc
|
||||
---------------------------------------------------------------------
|
||||
10001 | f
|
||||
(1 row)
|
||||
|
||||
-- a multi-shard modification that is failed over to local
|
||||
-- execution starts a 2PC
|
||||
BEGIN;
|
||||
UPDATE another_schema_table SET b = b + 1;
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- a multi-shard modification that is failed over to local
|
||||
-- execution starts a 2PC
|
||||
BEGIN;
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
10001
|
||||
(1 row)
|
||||
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- same without transaction block
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
|
||||
NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- a single-shard modification that is failed over to local
|
||||
-- starts 2PC execution
|
||||
BEGIN;
|
||||
UPDATE another_schema_table SET b = b + 1 WHERE a = 1;
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
coordinated_transaction_uses_2pc
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- same without transaction block
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
|
||||
SELECT coordinated_transaction_uses_2PC() FROM cte_1;
|
||||
NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc FROM cte_1
|
||||
ERROR: The transaction is not a coordinated transaction
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
|
|
@ -372,9 +372,93 @@ inserts AS (
|
|||
RETURNING *
|
||||
) SELECT count(*) FROM inserts;
|
||||
|
||||
-- a helper function which return true if the coordinated trannsaction uses
|
||||
-- 2PC
|
||||
CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC()
|
||||
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
|
||||
$$coordinated_transaction_uses_2PC$$;
|
||||
|
||||
-- a local SELECT followed by remote SELECTs
|
||||
-- does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
||||
SELECT count(*) FROM test;
|
||||
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- remote SELECTs followed by local SELECTs
|
||||
-- does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test;
|
||||
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a local SELECT followed by a remote Modify
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
SELECT y FROM test WHERE x = 1;
|
||||
UPDATE test SET y = y +1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a local modify followed by a remote SELECT
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
SELECT count(*) FROM test;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a local modify followed by a remote MODIFY
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
UPDATE test SET y = y +1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a local modify followed by a remote single shard MODIFY
|
||||
-- triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
INSERT INTO test VALUES (3,3);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a remote single shard modify followed by a local single
|
||||
-- shard MODIFY triggers 2PC
|
||||
BEGIN;
|
||||
INSERT INTO test VALUES (3,3);
|
||||
INSERT INTO test VALUES (1,1);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
COMMIT;
|
||||
|
||||
-- a remote single shard select followed by a local single
|
||||
-- shard MODIFY triggers 2PC. But, the transaction manager
|
||||
-- is smart enough to skip sending 2PC as the remote
|
||||
-- command is read only
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test WHERE x = 3;
|
||||
INSERT INTO test VALUES (1,1);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
COMMIT;
|
||||
|
||||
-- a local single shard select followed by a remote single
|
||||
-- shard modify does not trigger 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test WHERE x = 1;
|
||||
INSERT INTO test VALUES (3,3);
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
COMMIT;
|
||||
|
||||
RESET client_min_messages;
|
||||
|
||||
|
||||
\set VERBOSITY terse
|
||||
DROP TABLE ref_table;
|
||||
|
||||
|
|
|
@ -906,6 +906,60 @@ WITH cte_1 AS
|
|||
SELECT bool_and(z is null) FROM cte_1;
|
||||
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
|
||||
CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC()
|
||||
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
|
||||
$$coordinated_transaction_uses_2PC$$;
|
||||
|
||||
-- a multi-shard/single-shard select that is failed over to local
|
||||
-- execution doesn't start a 2PC
|
||||
BEGIN;
|
||||
SELECT count(*) FROM another_schema_table;
|
||||
SELECT count(*) FROM another_schema_table WHERE a = 1;
|
||||
WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10)
|
||||
SELECT count(*) FROM cte_1;
|
||||
WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10)
|
||||
SELECT count(*) FROM cte_1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
ROLLBACK;
|
||||
|
||||
-- same without a transaction block
|
||||
WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000),
|
||||
cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc)
|
||||
SELECT cnt, enabled_2pc FROM cte_1, cte_2;
|
||||
|
||||
-- a multi-shard modification that is failed over to local
|
||||
-- execution starts a 2PC
|
||||
BEGIN;
|
||||
UPDATE another_schema_table SET b = b + 1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
ROLLBACK;
|
||||
|
||||
-- a multi-shard modification that is failed over to local
|
||||
-- execution starts a 2PC
|
||||
BEGIN;
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
ROLLBACK;
|
||||
|
||||
-- same without transaction block
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
|
||||
-- a single-shard modification that is failed over to local
|
||||
-- starts 2PC execution
|
||||
BEGIN;
|
||||
UPDATE another_schema_table SET b = b + 1 WHERE a = 1;
|
||||
SELECT coordinated_transaction_uses_2PC();
|
||||
ROLLBACK;
|
||||
|
||||
-- same without transaction block
|
||||
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
|
||||
SELECT coordinated_transaction_uses_2PC() FROM cte_1;
|
||||
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
|
Loading…
Reference in New Issue