mirror of https://github.com/citusdata/citus.git
Remove XactModificationLevel distinction between DML and multi-shard
parent
710fe8666b
commit
d3785b97c0
|
@ -604,13 +604,6 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
List *relationShardList = task->relationShardList;
|
List *relationShardList = task->relationShardList;
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("single-shard query may not appear in transaction blocks "
|
|
||||||
"which contain multi-shard data modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to run the query to completion on one placement. If the query fails
|
* Try to run the query to completion on one placement. If the query fails
|
||||||
* attempt the query on the next placement.
|
* attempt the query on the next placement.
|
||||||
|
@ -745,14 +738,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
bool startedInTransaction =
|
bool startedInTransaction =
|
||||||
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("single-shard DML commands must not appear in "
|
|
||||||
"transaction blocks which contain multi-shard data "
|
|
||||||
"modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Modifications for reference tables are always done using 2PC. First
|
* Modifications for reference tables are always done using 2PC. First
|
||||||
* ensure that distributed transaction is started. Then force the
|
* ensure that distributed transaction is started. Then force the
|
||||||
|
@ -1039,14 +1024,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("multi-shard data modifications must not appear in "
|
|
||||||
"transaction blocks which contain single-shard DML "
|
|
||||||
"commands")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* ensure that there are no concurrent modifications on the same shards */
|
/* ensure that there are no concurrent modifications on the same shards */
|
||||||
AcquireExecutorMultiShardLocks(taskList);
|
AcquireExecutorMultiShardLocks(taskList);
|
||||||
|
|
||||||
|
@ -1072,7 +1049,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
/* open connection to all relevant placements, if not already open */
|
/* open connection to all relevant placements, if not already open */
|
||||||
shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags);
|
shardConnectionHash = OpenTransactionsForAllTasks(taskList, connectionFlags);
|
||||||
|
|
||||||
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||||
|
|
||||||
/* iterate over placements in rounds, to ensure in-order execution */
|
/* iterate over placements in rounds, to ensure in-order execution */
|
||||||
while (tasksPending)
|
while (tasksPending)
|
||||||
|
|
|
@ -2494,14 +2494,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
{
|
{
|
||||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
|
||||||
errmsg("distributed DDL commands must not appear within "
|
|
||||||
"transaction blocks containing single-shard data "
|
|
||||||
"modifications")));
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
if (!ddlJob->concurrentIndexCmd)
|
if (!ddlJob->concurrentIndexCmd)
|
||||||
|
|
|
@ -146,7 +146,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
uint64 totalRelationSize = 0;
|
uint64 totalRelationSize = 0;
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
errmsg("citus size functions cannot be called in transaction"
|
errmsg("citus size functions cannot be called in transaction"
|
||||||
|
|
|
@ -441,9 +441,9 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO products VALUES(1,'product_1', 5);
|
INSERT INTO products VALUES(1,'product_1', 5);
|
||||||
-- Should error out since conflicts with the above single-shard data modification command.
|
-- DDL may error out after an INSERT because it might pick the wrong connection
|
||||||
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ERROR: cannot establish new placement connection when DML has been executed on existing placement connection
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- Add constraints
|
-- Add constraints
|
||||||
|
@ -451,9 +451,7 @@ ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
||||||
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
||||||
-- Single shard DML command can't be located in the same transaction with above commands.
|
|
||||||
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- There should be no constraint on master and worker(s)
|
-- There should be no constraint on master and worker(s)
|
||||||
SELECT "Constraint", "Definition" FROM table_checks WHERE relid='products'::regclass;
|
SELECT "Constraint", "Definition" FROM table_checks WHERE relid='products'::regclass;
|
||||||
|
|
|
@ -1636,15 +1636,24 @@ ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE raw_events_second DROP COLUMN value_4;
|
ALTER TABLE raw_events_second DROP COLUMN value_4;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Insert after copy is currently disallowed because of the way the
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- transaction modification state is currently handled. Copy is also
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- rolled back.
|
-- would read from the reference table over others connections than the ones
|
||||||
|
-- that performed the DDL.
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ERROR: cannot establish new placement connection when DDL has been executed on existing placement connection
|
||||||
|
ROLLBACK;
|
||||||
|
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
|
||||||
|
-- to use a connection for one shard, while the connection already modified
|
||||||
|
-- another shard.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
|
ERROR: cannot establish new placement connection when DML has been executed on existing placement connection
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Insert after copy is currently allowed for single-shard operation.
|
-- Insert after copy is currently allowed for single-shard operation.
|
||||||
-- Both insert and copy are rolled back successfully.
|
-- Both insert and copy are rolled back successfully.
|
||||||
|
@ -1658,7 +1667,6 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Copy after insert is currently disallowed.
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -172,18 +172,16 @@ SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ABORT;
|
ABORT;
|
||||||
-- applies to DDL, too
|
-- we can mix DDL and INSERT
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ABORT;
|
||||||
COMMIT;
|
|
||||||
-- whether it occurs first or second
|
-- whether it occurs first or second
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
ABORT;
|
||||||
COMMIT;
|
|
||||||
-- but the DDL should correctly roll back
|
-- but the DDL should correctly roll back
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
||||||
Column | Type | Modifiers
|
Column | Type | Modifiers
|
||||||
|
@ -383,16 +381,29 @@ ORDER BY nodeport;
|
||||||
localhost | 57638 | t | DROP FUNCTION
|
localhost | 57638 | t | DROP FUNCTION
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY
|
-- ALTER and copy are compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- but not if COPY precedes ALTER TABLE
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
ABORT;
|
||||||
|
-- cannot perform DDL once a connection is used for multiple shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
|
||||||
|
lab_id
|
||||||
|
--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
|
||||||
|
lab_id
|
||||||
|
--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
ALTER TABLE researchers ADD COLUMN motto text;
|
||||||
|
ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- multi-shard operations can co-exist with DDL in a transactional way
|
-- multi-shard operations can co-exist with DDL in a transactional way
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
@ -1283,3 +1294,212 @@ SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
-- set up foreign keys to test transactions with co-located and reference tables
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.shard_replication_factor TO 1;
|
||||||
|
SET LOCAL citus.shard_count TO 4;
|
||||||
|
CREATE TABLE usergroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('usergroups');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE itemgroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('itemgroups');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
id int PRIMARY KEY,
|
||||||
|
name text,
|
||||||
|
user_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('users', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE items (
|
||||||
|
user_id int REFERENCES users (id) ON DELETE CASCADE,
|
||||||
|
item_name text,
|
||||||
|
item_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('items', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Table to find values that live in different shards on the same node
|
||||||
|
SELECT id, shard_name('users', shardid), nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
JOIN
|
||||||
|
( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids
|
||||||
|
USING (shardid)
|
||||||
|
ORDER BY
|
||||||
|
id;
|
||||||
|
id | shard_name | nodename | nodeport
|
||||||
|
----+---------------+-----------+----------
|
||||||
|
1 | users_1200022 | localhost | 57637
|
||||||
|
2 | users_1200025 | localhost | 57638
|
||||||
|
3 | users_1200023 | localhost | 57638
|
||||||
|
4 | users_1200023 | localhost | 57638
|
||||||
|
5 | users_1200022 | localhost | 57637
|
||||||
|
6 | users_1200024 | localhost | 57637
|
||||||
|
7 | users_1200023 | localhost | 57638
|
||||||
|
8 | users_1200022 | localhost | 57637
|
||||||
|
9 | users_1200025 | localhost | 57638
|
||||||
|
10 | users_1200022 | localhost | 57637
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- the INSERTs into items should see the users
|
||||||
|
BEGIN;
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
INSERT INTO items VALUES (1, 'item-1');
|
||||||
|
INSERT INTO items VALUES (6, 'item-6');
|
||||||
|
END;
|
||||||
|
SELECT user_id FROM items ORDER BY user_id;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
1
|
||||||
|
6
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- should not be able to open multiple connections per node after INSERTing over one connection
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO users VALUES (2, 'burak');
|
||||||
|
INSERT INTO users VALUES (3, 'burak');
|
||||||
|
\COPY items FROM STDIN WITH CSV
|
||||||
|
ERROR: cannot establish new placement connection when DML has been executed on existing placement connection
|
||||||
|
END;
|
||||||
|
-- cannot perform DDL after a co-located table has been read over 1 connection
|
||||||
|
BEGIN;
|
||||||
|
SELECT id FROM users WHERE id = 1;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT id FROM users WHERE id = 6;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
ERROR: cannot perform parallel DDL command because multiple placements have been accessed over the same connection
|
||||||
|
END;
|
||||||
|
-- but the other way around is fine
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6;
|
||||||
|
id
|
||||||
|
----
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- now read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- perform a DDL command on the reference table
|
||||||
|
ALTER TABLE itemgroups ADD COLUMN last_update timestamptz;
|
||||||
|
ERROR: cannot perform DDL on a placement which has been read over multiple connections
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- perform a DDL command on a co-located reference table
|
||||||
|
ALTER TABLE usergroups ADD COLUMN last_update timestamptz;
|
||||||
|
ERROR: cannot perform DDL on a placement if a co-located placement has been read over multiple connections
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- make a modification over connection 1
|
||||||
|
INSERT INTO usergroups VALUES (0,'istanbul');
|
||||||
|
-- copy over connections 1 and 2
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- cannot read modifications made over different connections
|
||||||
|
SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3;
|
||||||
|
ERROR: cannot perform query with placements that were modified over multiple connections
|
||||||
|
END;
|
||||||
|
-- make sure we can see cascading deletes
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM users');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6;
|
||||||
|
user_id
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- test visibility after COPY
|
||||||
|
INSERT INTO usergroups VALUES (2,'group');
|
||||||
|
BEGIN;
|
||||||
|
-- opens two separate connections to node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
-- Uses first connection, which wrote the row with id = 2
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
|
||||||
|
id | name | user_group | gid | name
|
||||||
|
----+-------+------------+-----+-------
|
||||||
|
2 | onder | 2 | 2 | group
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Should use second connection, which wrote the row with id = 4
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
|
id | name | user_group | gid | name
|
||||||
|
----+-------+------------+-----+-------
|
||||||
|
4 | murat | 2 | 2 | group
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
DROP TABLE items, users, itemgroups, usergroups;
|
||||||
|
|
|
@ -179,12 +179,6 @@ SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ABORT;
|
ABORT;
|
||||||
-- applies to DDL
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
|
||||||
ALTER TABLE labs_mx ADD COLUMN motto text;
|
|
||||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
|
||||||
COMMIT;
|
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
|
|
|
@ -1609,19 +1609,22 @@ SELECT * FROM reference_table_test;
|
||||||
10 | 2 | 2 | Fri Dec 02 00:00:00 2016
|
10 | 2 | 2 | Fri Dec 02 00:00:00 2016
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- do not allow mixing transactions
|
-- DML+master_modify_multiple_shards is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
||||||
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
10
|
||||||
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Do not allow DDL and modification in the same transaction
|
-- DDL+DML is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- clean up tables
|
-- clean up tables
|
||||||
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
|
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
|
||||||
|
|
|
@ -382,7 +382,7 @@ SELECT create_distributed_table('products', 'product_no');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO products VALUES(1,'product_1', 5);
|
INSERT INTO products VALUES(1,'product_1', 5);
|
||||||
|
|
||||||
-- Should error out since conflicts with the above single-shard data modification command.
|
-- DDL may error out after an INSERT because it might pick the wrong connection
|
||||||
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
ALTER TABLE products ADD CONSTRAINT unn_pno UNIQUE(product_no);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
@ -393,7 +393,6 @@ ALTER TABLE products ADD CONSTRAINT check_price CHECK(price > discounted_price);
|
||||||
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
ALTER TABLE products ALTER COLUMN product_no SET NOT NULL;
|
||||||
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
ALTER TABLE products ADD CONSTRAINT p_key_product PRIMARY KEY(product_no);
|
||||||
|
|
||||||
-- Single shard DML command can't be located in the same transaction with above commands.
|
|
||||||
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
INSERT INTO products VALUES(1,'product_1', 10, 8);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
|
@ -1332,9 +1332,19 @@ ALTER TABLE raw_events_second DROP COLUMN value_4;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Insert after copy is currently disallowed because of the way the
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- transaction modification state is currently handled. Copy is also
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- rolled back.
|
-- would read from the reference table over others connections than the ones
|
||||||
|
-- that performed the DDL.
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- Insert after copy is disallowed when the INSERT INTO ... SELECT chooses
|
||||||
|
-- to use a connection for one shard, while the connection already modified
|
||||||
|
-- another shard.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
100,100
|
100,100
|
||||||
|
@ -1352,7 +1362,6 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101
|
||||||
SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
SELECT user_id FROM raw_events_first WHERE user_id = 101;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Copy after insert is currently disallowed.
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
|
||||||
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -138,17 +138,17 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- applies to DDL, too
|
-- we can mix DDL and INSERT
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
COMMIT;
|
ABORT;
|
||||||
|
|
||||||
-- whether it occurs first or second
|
-- whether it occurs first or second
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||||
COMMIT;
|
ABORT;
|
||||||
|
|
||||||
-- but the DDL should correctly roll back
|
-- but the DDL should correctly roll back
|
||||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
|
||||||
|
@ -289,7 +289,7 @@ ORDER BY nodeport, shardid;
|
||||||
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
|
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
|
||||||
ORDER BY nodeport;
|
ORDER BY nodeport;
|
||||||
|
|
||||||
-- ALTER TABLE and COPY are compatible if ALTER TABLE precedes COPY
|
-- ALTER and copy are compatible
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
|
@ -297,12 +297,18 @@ ALTER TABLE labs ADD COLUMN motto text;
|
||||||
\.
|
\.
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- but not if COPY precedes ALTER TABLE
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
\copy labs from stdin delimiter ','
|
\copy labs from stdin delimiter ','
|
||||||
12,fsociety
|
12,fsociety
|
||||||
\.
|
\.
|
||||||
ALTER TABLE labs ADD COLUMN motto text;
|
ALTER TABLE labs ADD COLUMN motto text;
|
||||||
|
ABORT;
|
||||||
|
|
||||||
|
-- cannot perform DDL once a connection is used for multiple shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
|
||||||
|
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
|
||||||
|
ALTER TABLE researchers ADD COLUMN motto text;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- multi-shard operations can co-exist with DDL in a transactional way
|
-- multi-shard operations can co-exist with DDL in a transactional way
|
||||||
|
@ -944,3 +950,147 @@ DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts
|
||||||
|
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
|
||||||
|
-- set up foreign keys to test transactions with co-located and reference tables
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.shard_replication_factor TO 1;
|
||||||
|
SET LOCAL citus.shard_count TO 4;
|
||||||
|
|
||||||
|
CREATE TABLE usergroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('usergroups');
|
||||||
|
|
||||||
|
CREATE TABLE itemgroups (
|
||||||
|
gid int PRIMARY KEY,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
SELECT create_reference_table('itemgroups');
|
||||||
|
|
||||||
|
CREATE TABLE users (
|
||||||
|
id int PRIMARY KEY,
|
||||||
|
name text,
|
||||||
|
user_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('users', 'id');
|
||||||
|
|
||||||
|
CREATE TABLE items (
|
||||||
|
user_id int REFERENCES users (id) ON DELETE CASCADE,
|
||||||
|
item_name text,
|
||||||
|
item_group int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('items', 'user_id');
|
||||||
|
|
||||||
|
-- Table to find values that live in different shards on the same node
|
||||||
|
SELECT id, shard_name('users', shardid), nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
JOIN
|
||||||
|
( SELECT id, get_shard_id_for_distribution_column('users', id) shardid FROM generate_series(1,10) id ) ids
|
||||||
|
USING (shardid)
|
||||||
|
ORDER BY
|
||||||
|
id;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- the INSERTs into items should see the users
|
||||||
|
BEGIN;
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
1,brian,0
|
||||||
|
6,metin,0
|
||||||
|
\.
|
||||||
|
INSERT INTO items VALUES (1, 'item-1');
|
||||||
|
INSERT INTO items VALUES (6, 'item-6');
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT user_id FROM items ORDER BY user_id;
|
||||||
|
|
||||||
|
-- should not be able to open multiple connections per node after INSERTing over one connection
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO users VALUES (2, 'burak');
|
||||||
|
INSERT INTO users VALUES (3, 'burak');
|
||||||
|
\COPY items FROM STDIN WITH CSV
|
||||||
|
2,item-2,0
|
||||||
|
3,item-3,0
|
||||||
|
\.
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- cannot perform DDL after a co-located table has been read over 1 connection
|
||||||
|
BEGIN;
|
||||||
|
SELECT id FROM users WHERE id = 1;
|
||||||
|
SELECT id FROM users WHERE id = 6;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- but the other way around is fine
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE items ADD COLUMN last_update timestamptz;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 1;
|
||||||
|
SELECT id FROM users JOIN items ON (id = user_id) WHERE id = 6;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- now read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
-- perform a DDL command on the reference table
|
||||||
|
ALTER TABLE itemgroups ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- establish multiple connections to a node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- read from the reference table over each connection
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 2;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 3;
|
||||||
|
-- perform a DDL command on a co-located reference table
|
||||||
|
ALTER TABLE usergroups ADD COLUMN last_update timestamptz;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- make a modification over connection 1
|
||||||
|
INSERT INTO usergroups VALUES (0,'istanbul');
|
||||||
|
-- copy over connections 1 and 2
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,burak,0
|
||||||
|
3,burak,0
|
||||||
|
\.
|
||||||
|
-- cannot read modifications made over different connections
|
||||||
|
SELECT id FROM users JOIN usergroups ON (gid = user_group) WHERE id = 3;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- make sure we can see cascading deletes
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM users');
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 1;
|
||||||
|
SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id = 6;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- test visibility after COPY
|
||||||
|
|
||||||
|
INSERT INTO usergroups VALUES (2,'group');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- opens two separate connections to node
|
||||||
|
\COPY users FROM STDIN WITH CSV
|
||||||
|
2,onder,2
|
||||||
|
4,murat,2
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Uses first connection, which wrote the row with id = 2
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 2;
|
||||||
|
|
||||||
|
-- Should use second connection, which wrote the row with id = 4
|
||||||
|
SELECT * FROM users JOIN usergroups ON (user_group = gid) WHERE id = 4;
|
||||||
|
END;
|
||||||
|
|
||||||
|
DROP TABLE items, users, itemgroups, usergroups;
|
||||||
|
|
|
@ -150,12 +150,6 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
-- applies to DDL
|
|
||||||
BEGIN;
|
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
|
||||||
ALTER TABLE labs_mx ADD COLUMN motto text;
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
-- doesn't apply to COPY after modifications
|
-- doesn't apply to COPY after modifications
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||||
|
|
|
@ -994,13 +994,13 @@ UPDATE reference_table_test SET value_1 = 10 WHERE value_1 = 2;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT * FROM reference_table_test;
|
SELECT * FROM reference_table_test;
|
||||||
|
|
||||||
-- do not allow mixing transactions
|
-- DML+master_modify_multiple_shards is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- Do not allow DDL and modification in the same transaction
|
-- DDL+DML is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
|
|
Loading…
Reference in New Issue