WIP: Allow router to touch multiple shards in a transaction.

This primarily for now is to be able to test distributed deadlock
detection.

Todo:
- Fix regression tests properly
pull/1447/head
Andres Freund 2017-06-11 18:39:01 -07:00
parent 283ecf17de
commit d6ac345c23
3 changed files with 26 additions and 49 deletions

View File

@ -78,8 +78,7 @@ static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool expectResults); bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(List *taskPlacementList, bool markCritical, static List * GetModifyConnections(List *taskPlacementList, bool markCritical);
bool startedInTransaction);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
@ -666,8 +665,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
char *queryString = task->queryString; char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
bool startedInTransaction =
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{ {
@ -706,8 +703,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* table) and start a transaction (when in a transaction). * table) and start a transaction (when in a transaction).
*/ */
connectionList = GetModifyConnections(taskPlacementList, connectionList = GetModifyConnections(taskPlacementList,
taskRequiresTwoPhaseCommit, taskRequiresTwoPhaseCommit);
startedInTransaction);
/* prevent replicas of the same shard from diverging */ /* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLock(task, operation);
@ -808,7 +804,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
* transaction in progress. * transaction in progress.
*/ */
static List * static List *
GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) GetModifyConnections(List *taskPlacementList, bool markCritical)
{ {
ListCell *taskPlacementCell = NULL; ListCell *taskPlacementCell = NULL;
List *multiConnectionList = NIL; List *multiConnectionList = NIL;
@ -828,26 +824,6 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans
*/ */
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL);
/*
* If already in a transaction, disallow expanding set of remote
* transactions. That prevents some forms of distributed deadlocks.
*/
if (noNewTransactions)
{
RemoteTransaction *transaction = &multiConnection->remoteTransaction;
if (EnableDeadlockPrevention &&
transaction->transactionState == REMOTE_TRANS_INVALID)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
errmsg("no transaction participant matches %s:%d",
taskPlacement->nodeName, taskPlacement->nodePort),
errdetail("Transactions which modify distributed tables "
"may only target nodes affected by the "
"modification command which began the transaction.")));
}
}
if (markCritical) if (markCritical)
{ {
MarkRemoteTransactionCritical(multiConnection); MarkRemoteTransactionCritical(multiConnection);

View File

@ -136,14 +136,15 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT; COMMIT;
-- unless we disable deadlock prevention -- unless we disable deadlock prevention
BEGIN; BEGIN;
SET citus.enable_deadlock_prevention TO off; SET citus.enable_deadlock_prevention TO off;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists.
CONTEXT: while executing command on localhost:57638
ABORT; ABORT;
-- SELECTs may occur after a modification: First check that selecting -- SELECTs may occur after a modification: First check that selecting
-- from the modified node works. -- from the modified node works.
@ -152,7 +153,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
count count
------- -------
0 1
(1 row) (1 row)
ABORT; ABORT;
@ -168,7 +169,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
SELECT count(*) FROM researchers WHERE lab_id = 6; SELECT count(*) FROM researchers WHERE lab_id = 6;
count count
------- -------
0 1
(1 row) (1 row)
ABORT; ABORT;
@ -193,9 +194,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::
(2 rows) (2 rows)
SELECT * FROM labs WHERE id = 6; SELECT * FROM labs WHERE id = 6;
id | name id | name
----+------ ----+-----------
(0 rows) 6 | Bell Labs
(1 row)
-- COPY can happen after single row INSERT -- COPY can happen after single row INSERT
BEGIN; BEGIN;
@ -256,9 +258,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT; COMMIT;
-- verify rollback -- verify rollback
SELECT * FROM researchers WHERE lab_id = 6; SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name id | lab_id | name
----+--------+------ ----+--------+----------------
(0 rows) 9 | 6 | Leslie Lamport
(1 row)
SELECT count(*) FROM pg_dist_transaction; SELECT count(*) FROM pg_dist_transaction;
count count
@ -283,9 +286,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT; COMMIT;
-- verify rollback -- verify rollback
SELECT * FROM researchers WHERE lab_id = 6; SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name id | lab_id | name
----+--------+------ ----+--------+----------------
(0 rows) 9 | 6 | Leslie Lamport
(1 row)
SELECT count(*) FROM pg_dist_transaction; SELECT count(*) FROM pg_dist_transaction;
count count
@ -301,9 +305,10 @@ COMMIT;
SELECT * FROM researchers WHERE lab_id = 6; SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name id | lab_id | name
----+--------+---------------------- ----+--------+----------------------
9 | 6 | Leslie Lamport
17 | 6 | 'Bjarne Stroustrup' 17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie' 18 | 6 | 'Dennis Ritchie'
(2 rows) (3 rows)
-- verify 2pc -- verify 2pc
SELECT count(*) FROM pg_dist_transaction; SELECT count(*) FROM pg_dist_transaction;
@ -360,9 +365,10 @@ ERROR: could not commit transaction on any active node
SELECT * FROM researchers WHERE lab_id = 6; SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name id | lab_id | name
----+--------+---------------------- ----+--------+----------------------
9 | 6 | Leslie Lamport
17 | 6 | 'Bjarne Stroustrup' 17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie' 18 | 6 | 'Dennis Ritchie'
(2 rows) (3 rows)
-- cleanup triggers and the function -- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s') SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
@ -400,7 +406,7 @@ ALTER TABLE labs ADD COLUMN motto text;
SELECT master_modify_multiple_shards('DELETE FROM labs'); SELECT master_modify_multiple_shards('DELETE FROM labs');
master_modify_multiple_shards master_modify_multiple_shards
------------------------------- -------------------------------
7 8
(1 row) (1 row)
ALTER TABLE labs ADD COLUMN score float; ALTER TABLE labs ADD COLUMN score float;
@ -880,7 +886,6 @@ SELECT create_distributed_table('hash_modifying_xacts', 'key');
BEGIN; BEGIN;
INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO hash_modifying_xacts VALUES (1, 1);
INSERT INTO reference_modifying_xacts VALUES (10, 10); INSERT INTO reference_modifying_xacts VALUES (10, 10);
ERROR: no transaction participant matches localhost:57638
COMMIT; COMMIT;
-- it is allowed when turning off deadlock prevention -- it is allowed when turning off deadlock prevention
BEGIN; BEGIN;

View File

@ -140,8 +140,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
BEGIN; BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT; COMMIT;
-- have the same test on the other worker node -- have the same test on the other worker node
\c - - - :worker_2_port \c - - - :worker_2_port
@ -163,8 +161,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
BEGIN; BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
ERROR: no transaction participant matches localhost:57638
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
COMMIT; COMMIT;
-- switch back to the worker node -- switch back to the worker node
\c - - - :worker_1_port \c - - - :worker_1_port
@ -175,7 +171,7 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
SELECT count(*) FROM researchers_mx WHERE lab_id = 6; SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
count count
------- -------
0 2
(1 row) (1 row)
ABORT; ABORT;