Allow Citus local tables with 2PC

pull/4595/head
Onder Kalaci 2021-01-28 12:05:59 +01:00
parent 07d3b4fd04
commit 442449c4bf
5 changed files with 346 additions and 17 deletions

View File

@ -618,6 +618,7 @@ static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution
static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList);
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(List *taskList);
static bool TaskAccessesOnlyCitusLocalTables(Task *task);
static bool TaskListRequires2PC(List *taskList);
static bool SelectForUpdateOnReferenceTable(List *taskList);
static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
@ -1165,23 +1166,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
return xactProperties;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{
/*
* In case localExecutionHappened, we force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
xactProperties.errorOnAnyFailure = true;
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
xactProperties.requires2PC = true;
return xactProperties;
}
if (DistributedExecutionRequiresRollback(taskList))
{
/* transaction blocks are required if the task list needs to roll back */
@ -1361,6 +1345,25 @@ DistributedExecutionRequiresRollback(List *taskList)
Task *task = (Task *) linitial(taskList);
if (taskCount == 1 && TaskAccessesOnlyCitusLocalTables(task) &&
ShouldExecuteTasksLocally(taskList))
{
/*
* If the execution accesses Citus local tables via local execution,
* there is no need to start a coordinated transaction. This is
* especially important regarding the limitations of coordinated
* transactions such as a coordinated transaction cannot be part of
* a 2PC transaction.
*
* NB: It is not precisely accurate to rely on ShouldExecuteTasksLocally()
* at this point as remote execution may failover some tasks to local
* execution. However, we prefer this approach because it provides a more
* predictable user experience.
*/
return false;
}
bool selectForUpdate = task->relationRowLockList != NIL;
if (selectForUpdate)
{
@ -1423,6 +1426,27 @@ DistributedExecutionRequiresRollback(List *taskList)
}
/*
* TaskAccessesOnlyCitusLocalTable returns true if the input task
* has only accesses Citus local tables.
*/
static bool
TaskAccessesOnlyCitusLocalTables(Task *task)
{
List *relationShardList = task->relationShardList;
RelationShard *relationShard = NULL;
foreach_ptr(relationShard, relationShardList)
{
if (!IsCitusTableType(relationShard->relationId, CITUS_LOCAL_TABLE))
{
return false;
}
}
return true;
}
/*
* TaskListRequires2PC determines whether the given task list requires 2PC
* because the tasks provided operates on a reference table or there are multiple
@ -3199,6 +3223,11 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session)
*/
CoordinatedTransactionUse2PC();
}
else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{
/* we did local execution and are expanding to an additional node */
CoordinatedTransactionUse2PC();
}
}

View File

@ -1015,6 +1015,82 @@ BEGIN;
(0 rows)
COMMIT;
-- users are not allowed to use
-- citus local tables in prepared statements
-- from the worker nodes
BEGIN;
INSERT INTO citus_local_table VALUES (3), (4);
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
2
(1 row)
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM reference_table;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
2
(1 row)
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM reference_table;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM distributed_table;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM distributed_table;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM citus_local_table;
count
---------------------------------------------------------------------
2
(1 row)
PREPARE TRANSACTION 'citus_local_tx_on_mx';
ERROR: cannot use 2PC in transactions involving multiple servers
\c - - - :master_port
-- cleanup at exit
DROP SCHEMA citus_local_table_queries_mx CASCADE;

View File

@ -1207,6 +1207,128 @@ SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_ke
(1 row)
CREATE TABLE distributed_table_tx_blck (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_tx_blck', 'col_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table_tx_blck (col_1 INT UNIQUE);
SELECT create_reference_table('ref_table_tx_blck');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- make sure that we can do 2PC with a citus local table
BEGIN;
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
ROLLBACK PREPARED 'citus_local_tx';
-- make sure that we can do 2PC with citus local tables
-- multiple statements
BEGIN;
INSERT INTO local_table_1 VALUES (1);
INSERT INTO local_table_1 VALUES (2);
PREPARE TRANSACTION 'citus_local_tx';
ROLLBACK PREPARED 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local tables
-- when local execution is disabled
BEGIN;
SET citus.enable_local_execution TO false;
INSERT INTO local_table_1 VALUES (1);
INSERT INTO local_table_1 VALUES (2);
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
INSERT INTO local_table_1 VALUES (1);
SELECT count(*) FROM distributed_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM distributed_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
-- even if local execution is disabled
BEGIN;
SET citus.enable_local_execution TO false;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM distributed_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
-- make sure that we cannot do 2PC with citus local
-- tables and reference tables
BEGIN;
SELECT count(*) FROM ref_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
INSERT INTO local_table_1 VALUES (1);
SELECT count(*) FROM ref_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
SELECT count(*) FROM local_table_1;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM ref_table_tx_blck;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
BEGIN;
INSERT INTO ref_table_tx_blck VALUES (1000);
SELECT count(*) FROM local_table_1;
count
---------------------------------------------------------------------
0
(1 row)
PREPARE TRANSACTION 'citus_local_tx';
ERROR: cannot use 2PC in transactions involving multiple servers
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test (x) VALUES ($1);

View File

@ -658,6 +658,33 @@ BEGIN;
SELECT * FROM reference_table ORDER BY 1,2;
COMMIT;
-- users are not allowed to use
-- citus local tables in prepared statements
-- from the worker nodes
BEGIN;
INSERT INTO citus_local_table VALUES (3), (4);
PREPARE TRANSACTION 'citus_local_tx_on_mx';
BEGIN;
SELECT count(*) FROM citus_local_table;
PREPARE TRANSACTION 'citus_local_tx_on_mx';
BEGIN;
SELECT count(*) FROM reference_table;
SELECT count(*) FROM citus_local_table;
PREPARE TRANSACTION 'citus_local_tx_on_mx';
BEGIN;
SELECT count(*) FROM citus_local_table;
SELECT count(*) FROM reference_table;
PREPARE TRANSACTION 'citus_local_tx_on_mx';
BEGIN;
SELECT count(*) FROM citus_local_table;
SELECT count(*) FROM distributed_table;
PREPARE TRANSACTION 'citus_local_tx_on_mx';
BEGIN;
SELECT count(*) FROM distributed_table;
SELECT count(*) FROM citus_local_table;
PREPARE TRANSACTION 'citus_local_tx_on_mx';
\c - - - :master_port
-- cleanup at exit
DROP SCHEMA citus_local_table_queries_mx CASCADE;

View File

@ -654,6 +654,81 @@ ALTER TABLE local_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES l
SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_keys=>true);
CREATE TABLE distributed_table_tx_blck (col_1 INT UNIQUE);
SELECT create_distributed_table('distributed_table_tx_blck', 'col_1');
CREATE TABLE ref_table_tx_blck (col_1 INT UNIQUE);
SELECT create_reference_table('ref_table_tx_blck');
-- make sure that we can do 2PC with a citus local table
BEGIN;
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
ROLLBACK PREPARED 'citus_local_tx';
-- make sure that we can do 2PC with citus local tables
-- multiple statements
BEGIN;
INSERT INTO local_table_1 VALUES (1);
INSERT INTO local_table_1 VALUES (2);
PREPARE TRANSACTION 'citus_local_tx';
ROLLBACK PREPARED 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local tables
-- when local execution is disabled
BEGIN;
SET citus.enable_local_execution TO false;
INSERT INTO local_table_1 VALUES (1);
INSERT INTO local_table_1 VALUES (2);
PREPARE TRANSACTION 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
INSERT INTO local_table_1 VALUES (1);
SELECT count(*) FROM distributed_table_tx_blck;
PREPARE TRANSACTION 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM distributed_table_tx_blck;
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local tables
-- when it is used with distributed tables on the tx block
-- even if local execution is disabled
BEGIN;
SET citus.enable_local_execution TO false;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM distributed_table_tx_blck;
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
-- make sure that we cannot do 2PC with citus local
-- tables and reference tables
BEGIN;
SELECT count(*) FROM ref_table_tx_blck;
INSERT INTO local_table_1 VALUES (1);
PREPARE TRANSACTION 'citus_local_tx';
BEGIN;
INSERT INTO local_table_1 VALUES (1);
SELECT count(*) FROM ref_table_tx_blck;
PREPARE TRANSACTION 'citus_local_tx';
BEGIN;
SELECT count(*) FROM local_table_1;
SELECT count(*) FROM ref_table_tx_blck;
PREPARE TRANSACTION 'citus_local_tx';
BEGIN;
INSERT INTO ref_table_tx_blck VALUES (1000);
SELECT count(*) FROM local_table_1;
PREPARE TRANSACTION 'citus_local_tx';
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test (x) VALUES ($1);