Convert multi copy to use new connection api

This enables proper transactional behaviour for copy and relaxes some
restrictions like combining COPY with single-row modifications. It
also provides the basis for relaxing restrictions further, and for
optionally allowing connection caching.
pull/1133/head
Murat Tuncer 2017-01-13 20:25:28 +03:00 committed by Andres Freund
parent cc594e00ca
commit d76f781ae4
8 changed files with 620 additions and 593 deletions

File diff suppressed because it is too large Load Diff

View File

@ -202,12 +202,12 @@ GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFo
/* /*
* ConnectionList flattens the connection hash to a list of placement connections. * ShardConnectionList returns the list of ShardConnections in connectionHash.
*/ */
List * List *
ConnectionList(HTAB *connectionHash) ShardConnectionList(HTAB *connectionHash)
{ {
List *connectionList = NIL; List *shardConnectionsList = NIL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
@ -221,13 +221,12 @@ ConnectionList(HTAB *connectionHash)
shardConnections = (ShardConnections *) hash_seq_search(&status); shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL) while (shardConnections != NULL)
{ {
List *shardConnectionsList = list_copy(shardConnections->connectionList); shardConnectionsList = lappend(shardConnectionsList, shardConnections);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status); shardConnections = (ShardConnections *) hash_seq_search(&status);
} }
return connectionList; return shardConnectionsList;
} }

View File

@ -22,11 +22,7 @@ typedef struct ShardConnections
{ {
int64 shardId; int64 shardId;
/* /* list of MultiConnection structs */
* XXX: this list contains MultiConnection for multi-shard transactions
* or TransactionConnection for COPY, the latter should be converted to
* use MultiConnection as well.
*/
List *connectionList; List *connectionList;
} ShardConnections; } ShardConnections;
@ -36,7 +32,7 @@ extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound); extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound); bool *connectionsFound);
extern List * ConnectionList(HTAB *connectionHash); extern List * ShardConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList); extern void CloseConnections(List *connectionList);
extern void ResetShardPlacementTransactionState(void); extern void ResetShardPlacementTransactionState(void);

View File

@ -1456,17 +1456,15 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
ROLLBACK; ROLLBACK;
-- Insert after copy is currently disallowed because of the way the -- Insert after copy is currently disallowed because of the way the
-- transaction modification state is currently handled. Copy still -- transaction modification state is currently handled. Copy is also
-- goes through despite rollback. -- rolled back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
ROLLBACK; ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation. -- Insert after copy is currently allowed for single-shard operation.
-- Since the COPY commits immediately, the result is visible in the -- Both insert and copy are rolled back successfully.
-- next operation. Copy goes through despite rollback, while insert
-- rolls back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101;
@ -1477,9 +1475,7 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101;
(1 row) (1 row)
ROLLBACK; ROLLBACK;
-- Copy after insert is disallowed since the insert is not immediately -- Copy after insert is currently disallowed.
-- committed and the copy uses different connections that will not yet
-- see the result of the insert.
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
@ -1489,8 +1485,6 @@ ROLLBACK;
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY raw_events_first, line 1: "103,103"
ROLLBACK; ROLLBACK;
-- selecting from views works -- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first; CREATE VIEW test_view AS SELECT * FROM raw_events_first;
@ -1499,7 +1493,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
------- -------
11 9
(1 row) (1 row)
INSERT INTO raw_events_second SELECT * FROM test_view; INSERT INTO raw_events_second SELECT * FROM test_view;
@ -1509,7 +1503,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
------- -------
13 11
(1 row) (1 row)
-- inserting into views does not -- inserting into views does not

View File

@ -192,37 +192,178 @@ SELECT * FROM labs WHERE id = 6;
----+------ ----+------
(0 rows) (0 rows)
-- COPY can't happen second, -- COPY can happen after single row INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT; COMMIT;
-- though it will work if before any modifications -- COPY can happen before single row INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
SELECT name FROM labs WHERE id = 10; SELECT name FROM labs WHERE id = 10;
name name
---------------- ----------------
Weyland-Yutani Weyland-Yutani
(1 row) Weyland-Yutani
(2 rows)
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- two consecutive COPYs in a transaction are allowed
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety"
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
name name
---------------- ----------------
Planet Express Planet Express
fsociety
(2 rows)
-- 1pc failure test
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row) (1 row)
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- 2pc failure and success tests
SET citus.multi_shard_commit_protocol TO '2pc';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
COMMIT;
-- verify success
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
-- verify 2pc
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
RESET citus.multi_shard_commit_protocol;
-- create a check function
SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
BEGIN
IF (NEW.id > 30) THEN
RAISE ''illegal value'';
END IF;
RETURN NEW;
END;
$rli$ LANGUAGE plpgsql;')
ORDER BY nodeport;
nodename | nodeport | success | result
-----------+----------+---------+-----------------
localhost | 57637 | t | CREATE FUNCTION
localhost | 57638 | t | CREATE FUNCTION
(2 rows)
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+----------------
localhost | 57637 | 1200000 | t | CREATE TRIGGER
localhost | 57637 | 1200001 | t | CREATE TRIGGER
localhost | 57638 | 1200000 | t | CREATE TRIGGER
localhost | 57638 | 1200001 | t | CREATE TRIGGER
(4 rows)
-- hide postgresql version dependend messages for next test only
\set VERBOSITY terse
-- deferred check should abort the transaction
BEGIN;
DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
COMMIT;
WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638
WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637
WARNING: could not commit transaction for shard 1200001 on any active node
ERROR: could not commit transaction on any active node
\unset VERBOSITY
-- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
-- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+--------------
localhost | 57637 | 1200000 | t | DROP TRIGGER
localhost | 57637 | 1200001 | t | DROP TRIGGER
localhost | 57638 | 1200000 | t | DROP TRIGGER
localhost | 57638 | 1200001 | t | DROP TRIGGER
(4 rows)
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
ORDER BY nodeport;
nodename | nodeport | success | result
-----------+----------+---------+---------------
localhost | 57637 | t | DROP FUNCTION
localhost | 57638 | t | DROP FUNCTION
(2 rows)
-- finally, ALTER and copy aren't compatible -- finally, ALTER and copy aren't compatible
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
@ -238,11 +379,6 @@ COMMIT;
id | bigint | not null id | bigint | not null
name | text | not null name | text | not null
SELECT * FROM labs WHERE id = 12;
id | name
----+------
(0 rows)
-- and if the copy is before the ALTER... -- and if the copy is before the ALTER...
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
@ -269,7 +405,7 @@ ALTER TABLE labs ADD COLUMN motto text;
SELECT master_modify_multiple_shards('DELETE FROM labs'); SELECT master_modify_multiple_shards('DELETE FROM labs');
master_modify_multiple_shards master_modify_multiple_shards
------------------------------- -------------------------------
5 7
(1 row) (1 row)
ALTER TABLE labs ADD COLUMN score float; ALTER TABLE labs ADD COLUMN score float;

View File

@ -707,3 +707,4 @@ HINT: No function matches the given name and argument types. You might need to
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164 WARNING: could not get statistics for shard public.composite_partition_column_table_560164
DETAIL: Setting shard statistics to NULL DETAIL: Setting shard statistics to NULL
ERROR: failure on connection marked as essential: localhost:57637

View File

@ -706,8 +706,8 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ROLLBACK; ROLLBACK;
-- Insert after copy is currently disallowed because of the way the -- Insert after copy is currently disallowed because of the way the
-- transaction modification state is currently handled. Copy still -- transaction modification state is currently handled. Copy is also
-- goes through despite rollback. -- rolled back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
100,100 100,100
@ -716,9 +716,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ROLLBACK; ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation. -- Insert after copy is currently allowed for single-shard operation.
-- Since the COPY commits immediately, the result is visible in the -- Both insert and copy are rolled back successfully.
-- next operation. Copy goes through despite rollback, while insert
-- rolls back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
101,101 101,101
@ -727,9 +725,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101
SELECT user_id FROM raw_events_first WHERE user_id = 101; SELECT user_id FROM raw_events_first WHERE user_id = 101;
ROLLBACK; ROLLBACK;
-- Copy after insert is disallowed since the insert is not immediately -- Copy after insert is currently disallowed.
-- committed and the copy uses different connections that will not yet
-- see the result of the insert.
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';

View File

@ -148,7 +148,7 @@ COMMIT;
\d labs \d labs
SELECT * FROM labs WHERE id = 6; SELECT * FROM labs WHERE id = 6;
-- COPY can't happen second, -- COPY can happen after single row INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
@ -156,7 +156,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
\. \.
COMMIT; COMMIT;
-- though it will work if before any modifications -- COPY can happen before single row INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
10,Weyland-Yutani 10,Weyland-Yutani
@ -165,7 +165,7 @@ SELECT name FROM labs WHERE id = 10;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- two consecutive COPYs in a transaction are allowed
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
11,Planet Express 11,Planet Express
@ -175,7 +175,93 @@ BEGIN;
\. \.
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
-- 1pc failure test
SELECT recover_prepared_transactions();
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Bjarne Stroustrup'
\.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
SELECT count(*) FROM pg_dist_transaction;
-- 2pc failure and success tests
SET citus.multi_shard_commit_protocol TO '2pc';
SELECT recover_prepared_transactions();
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Bjarne Stroustrup'
\.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
SELECT count(*) FROM pg_dist_transaction;
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Dennis Ritchie'
\.
COMMIT;
-- verify success
SELECT * FROM researchers WHERE lab_id = 6;
-- verify 2pc
SELECT count(*) FROM pg_dist_transaction;
RESET citus.multi_shard_commit_protocol;
-- create a check function
SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
BEGIN
IF (NEW.id > 30) THEN
RAISE ''illegal value'';
END IF;
RETURN NEW;
END;
$rli$ LANGUAGE plpgsql;')
ORDER BY nodeport;
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
ORDER BY nodeport, shardid;
-- hide postgresql version dependend messages for next test only
\set VERBOSITY terse
-- deferred check should abort the transaction
BEGIN;
DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ','
31, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
30, 6, 'Dennis Ritchie'
\.
COMMIT;
\unset VERBOSITY
-- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6;
-- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
ORDER BY nodeport, shardid;
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
ORDER BY nodeport;
-- finally, ALTER and copy aren't compatible -- finally, ALTER and copy aren't compatible
BEGIN; BEGIN;
@ -187,7 +273,6 @@ COMMIT;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
\d labs \d labs
SELECT * FROM labs WHERE id = 12;
-- and if the copy is before the ALTER... -- and if the copy is before the ALTER...
BEGIN; BEGIN;