From 442449c4bf3aca89bffd41e4354dbe082e9cd783 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 28 Jan 2021 12:05:59 +0100 Subject: [PATCH] Allow Citus local tables with 2PC --- .../distributed/executor/adaptive_executor.c | 63 ++++++--- .../citus_local_tables_queries_mx.out | 76 +++++++++++ src/test/regress/expected/single_node.out | 122 ++++++++++++++++++ .../sql/citus_local_tables_queries_mx.sql | 27 ++++ src/test/regress/sql/single_node.sql | 75 +++++++++++ 5 files changed, 346 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4e4292f8f..d0ac6f998 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -618,6 +618,7 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); +static bool TaskAccessesOnlyCitusLocalTables(Task *task); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(List *taskList); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); @@ -1165,23 +1166,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) - { - /* - * In case localExecutionHappened, we force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ - xactProperties.errorOnAnyFailure = true; - xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; - xactProperties.requires2PC = true; - return xactProperties; - } - if (DistributedExecutionRequiresRollback(taskList)) { /* transaction blocks are required if the task list needs to roll back */ @@ -1361,6 +1345,25 @@ DistributedExecutionRequiresRollback(List *taskList) Task *task = (Task *) linitial(taskList); + if (taskCount == 1 && TaskAccessesOnlyCitusLocalTables(task) && + ShouldExecuteTasksLocally(taskList)) + { + /* + * If the execution accesses Citus local tables via local execution, + * there is no need to start a coordinated transaction. This is + * especially important regarding the limitations of coordinated + * transactions such as a coordinated transaction cannot be part of + * a 2PC transaction. + * + * NB: It is not precisely accurate to rely on ShouldExecuteTasksLocally() + * at this point as remote execution may failover some tasks to local + * execution. However, we prefer this approach because it provides a more + * predictable user experience. + */ + return false; + } + + bool selectForUpdate = task->relationRowLockList != NIL; if (selectForUpdate) { @@ -1423,6 +1426,27 @@ DistributedExecutionRequiresRollback(List *taskList) } +/* + * TaskAccessesOnlyCitusLocalTable returns true if the input task + * has only accesses Citus local tables. + */ +static bool +TaskAccessesOnlyCitusLocalTables(Task *task) +{ + List *relationShardList = task->relationShardList; + RelationShard *relationShard = NULL; + foreach_ptr(relationShard, relationShardList) + { + if (!IsCitusTableType(relationShard->relationId, CITUS_LOCAL_TABLE)) + { + return false; + } + } + + return true; +} + + /* * TaskListRequires2PC determines whether the given task list requires 2PC * because the tasks provided operates on a reference table or there are multiple @@ -3199,6 +3223,11 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session) */ CoordinatedTransactionUse2PC(); } + else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) + { + /* we did local execution and are expanding to an additional node */ + CoordinatedTransactionUse2PC(); + } } diff --git a/src/test/regress/expected/citus_local_tables_queries_mx.out b/src/test/regress/expected/citus_local_tables_queries_mx.out index 322cd791e..b39cb47fa 100644 --- a/src/test/regress/expected/citus_local_tables_queries_mx.out +++ b/src/test/regress/expected/citus_local_tables_queries_mx.out @@ -1015,6 +1015,82 @@ BEGIN; (0 rows) COMMIT; +-- users are not allowed to use +-- citus local tables in prepared statements +-- from the worker nodes +BEGIN; + INSERT INTO citus_local_table VALUES (3), (4); +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers +BEGIN; + SELECT count(*) FROM citus_local_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers +BEGIN; + SELECT count(*) FROM reference_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM citus_local_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers +BEGIN; + SELECT count(*) FROM citus_local_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + + SELECT count(*) FROM reference_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers +BEGIN; + SELECT count(*) FROM citus_local_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + + SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers +BEGIN; + SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM citus_local_table; + count +--------------------------------------------------------------------- + 2 +(1 row) + +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +ERROR: cannot use 2PC in transactions involving multiple servers \c - - - :master_port -- cleanup at exit DROP SCHEMA citus_local_table_queries_mx CASCADE; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 77b532c86..75decb04f 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1207,6 +1207,128 @@ SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_ke (1 row) +CREATE TABLE distributed_table_tx_blck (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_tx_blck', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table_tx_blck (col_1 INT UNIQUE); +SELECT create_reference_table('ref_table_tx_blck'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- make sure that we can do 2PC with a citus local table +BEGIN; + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; + ROLLBACK PREPARED 'citus_local_tx'; +-- make sure that we can do 2PC with citus local tables +-- multiple statements +BEGIN; + INSERT INTO local_table_1 VALUES (1); + INSERT INTO local_table_1 VALUES (2); + PREPARE TRANSACTION 'citus_local_tx'; + ROLLBACK PREPARED 'citus_local_tx'; +-- make sure that we cannot do 2PC with citus local tables +-- when local execution is disabled +BEGIN; + SET citus.enable_local_execution TO false; + INSERT INTO local_table_1 VALUES (1); + INSERT INTO local_table_1 VALUES (2); + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO local_table_1 VALUES (1); + SELECT count(*) FROM distributed_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM distributed_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +-- even if local execution is disabled +BEGIN; + SET citus.enable_local_execution TO false; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM distributed_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers + -- make sure that we cannot do 2PC with citus local + -- tables and reference tables + BEGIN; + SELECT count(*) FROM ref_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers + BEGIN; + INSERT INTO local_table_1 VALUES (1); + SELECT count(*) FROM ref_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers + BEGIN; + SELECT count(*) FROM local_table_1; + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM ref_table_tx_blck; + count +--------------------------------------------------------------------- + 0 +(1 row) + + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers + BEGIN; + INSERT INTO ref_table_tx_blck VALUES (1000); + SELECT count(*) FROM local_table_1; + count +--------------------------------------------------------------------- + 0 +(1 row) + + PREPARE TRANSACTION 'citus_local_tx'; +ERROR: cannot use 2PC in transactions involving multiple servers CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO test (x) VALUES ($1); diff --git a/src/test/regress/sql/citus_local_tables_queries_mx.sql b/src/test/regress/sql/citus_local_tables_queries_mx.sql index b1a9e4622..5411826d2 100644 --- a/src/test/regress/sql/citus_local_tables_queries_mx.sql +++ b/src/test/regress/sql/citus_local_tables_queries_mx.sql @@ -658,6 +658,33 @@ BEGIN; SELECT * FROM reference_table ORDER BY 1,2; COMMIT; +-- users are not allowed to use +-- citus local tables in prepared statements +-- from the worker nodes +BEGIN; + INSERT INTO citus_local_table VALUES (3), (4); +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +BEGIN; + SELECT count(*) FROM citus_local_table; +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +BEGIN; + SELECT count(*) FROM reference_table; + SELECT count(*) FROM citus_local_table; +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +BEGIN; + SELECT count(*) FROM citus_local_table; + SELECT count(*) FROM reference_table; +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +BEGIN; + SELECT count(*) FROM citus_local_table; + SELECT count(*) FROM distributed_table; +PREPARE TRANSACTION 'citus_local_tx_on_mx'; +BEGIN; + SELECT count(*) FROM distributed_table; + SELECT count(*) FROM citus_local_table; +PREPARE TRANSACTION 'citus_local_tx_on_mx'; + + \c - - - :master_port -- cleanup at exit DROP SCHEMA citus_local_table_queries_mx CASCADE; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 568a956cb..f7694ceca 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -654,6 +654,81 @@ ALTER TABLE local_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES l SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_keys=>true); + +CREATE TABLE distributed_table_tx_blck (col_1 INT UNIQUE); +SELECT create_distributed_table('distributed_table_tx_blck', 'col_1'); +CREATE TABLE ref_table_tx_blck (col_1 INT UNIQUE); +SELECT create_reference_table('ref_table_tx_blck'); + +-- make sure that we can do 2PC with a citus local table +BEGIN; + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; + ROLLBACK PREPARED 'citus_local_tx'; + +-- make sure that we can do 2PC with citus local tables +-- multiple statements +BEGIN; + INSERT INTO local_table_1 VALUES (1); + INSERT INTO local_table_1 VALUES (2); + PREPARE TRANSACTION 'citus_local_tx'; + ROLLBACK PREPARED 'citus_local_tx'; + +-- make sure that we cannot do 2PC with citus local tables +-- when local execution is disabled +BEGIN; + SET citus.enable_local_execution TO false; + INSERT INTO local_table_1 VALUES (1); + INSERT INTO local_table_1 VALUES (2); + PREPARE TRANSACTION 'citus_local_tx'; + +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO local_table_1 VALUES (1); + SELECT count(*) FROM distributed_table_tx_blck; + PREPARE TRANSACTION 'citus_local_tx'; + +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM distributed_table_tx_blck; + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; + +-- make sure that we cannot do 2PC with citus local tables +-- when it is used with distributed tables on the tx block +-- even if local execution is disabled +BEGIN; + SET citus.enable_local_execution TO false; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT count(*) FROM distributed_table_tx_blck; + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; + + + -- make sure that we cannot do 2PC with citus local + -- tables and reference tables + BEGIN; + SELECT count(*) FROM ref_table_tx_blck; + INSERT INTO local_table_1 VALUES (1); + PREPARE TRANSACTION 'citus_local_tx'; + BEGIN; + INSERT INTO local_table_1 VALUES (1); + SELECT count(*) FROM ref_table_tx_blck; + PREPARE TRANSACTION 'citus_local_tx'; + BEGIN; + SELECT count(*) FROM local_table_1; + SELECT count(*) FROM ref_table_tx_blck; + PREPARE TRANSACTION 'citus_local_tx'; + BEGIN; + INSERT INTO ref_table_tx_blck VALUES (1000); + SELECT count(*) FROM local_table_1; + PREPARE TRANSACTION 'citus_local_tx'; + + CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO test (x) VALUES ($1);