mirror of https://github.com/citusdata/citus.git
Merge pull request #707 from citusdata/feature/allow_single_ddl_xact_block
Permit single DDL commands in transaction blocks cr: @marcocituspull/762/head
commit
7168fdf62e
|
@ -233,6 +233,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
errmsg("unsupported partition method")));
|
||||
}
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
|||
char *effectiveDatabaseName = NULL;
|
||||
char *effectiveUserName = NULL;
|
||||
|
||||
if (IsModifyingTransaction)
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot open new connections after the first modification "
|
||||
|
@ -181,7 +181,7 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
|||
return connectionId;
|
||||
}
|
||||
|
||||
if (IsModifyingTransaction)
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot open new connections after the first modification "
|
||||
|
|
|
@ -138,6 +138,14 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
|||
{
|
||||
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_SCHEMA)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed data modifications must not appear in "
|
||||
"transaction blocks which contain distributed DDL "
|
||||
"commands")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We could naturally handle function-based transactions (i.e. those
|
||||
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
|
||||
|
@ -228,7 +236,7 @@ InitTransactionStateForTask(Task *task)
|
|||
participantEntry->connection = connection;
|
||||
}
|
||||
|
||||
IsModifyingTransaction = true;
|
||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1178,7 +1186,7 @@ RegisterRouterExecutorXactCallbacks(void)
|
|||
static void
|
||||
RouterTransactionCallback(XactEvent event, void *arg)
|
||||
{
|
||||
if (xactParticipantHash == NULL)
|
||||
if (XactModificationLevel != XACT_MODIFICATION_DATA)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -1235,7 +1243,7 @@ RouterTransactionCallback(XactEvent event, void *arg)
|
|||
}
|
||||
|
||||
/* reset transaction state */
|
||||
IsModifyingTransaction = false;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
xactParticipantHash = NULL;
|
||||
xactShardConnSetList = NIL;
|
||||
subXactAbortAttempted = false;
|
||||
|
@ -1275,6 +1283,11 @@ ExecuteTransactionEnd(bool commit)
|
|||
NodeConnectionEntry *participant;
|
||||
bool completed = !commit; /* aborts are assumed completed */
|
||||
|
||||
if (xactParticipantHash == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
hash_seq_init(&scan, xactParticipantHash);
|
||||
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
|
||||
{
|
||||
|
|
|
@ -1255,7 +1255,13 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
|||
{
|
||||
bool executionOK = false;
|
||||
|
||||
PreventTransactionChain(isTopLevel, "distributed DDL commands");
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed DDL commands must not appear within "
|
||||
"transaction blocks containing other modifications")));
|
||||
}
|
||||
|
||||
ShowNoticeIfNotUsing2PC();
|
||||
|
||||
executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString);
|
||||
|
@ -1265,6 +1271,8 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
|||
{
|
||||
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_SCHEMA;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -283,6 +283,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|||
|
||||
CloseConnections(shardPlacementConnectionList);
|
||||
shardPlacementConnectionList = NIL;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -30,8 +30,8 @@
|
|||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
/* state needed to prevent new connections during modifying transactions */
|
||||
bool IsModifyingTransaction = false;
|
||||
/* state needed to keep track of operations used during a transaction */
|
||||
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
|
||||
/*
|
||||
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
|
||||
|
@ -377,7 +377,7 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
|||
|
||||
sprintf(nodePortString, "%d", nodePort);
|
||||
|
||||
if (IsModifyingTransaction)
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot open new connections after the first modification "
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
/* SQL statement for testing */
|
||||
#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$"
|
||||
|
||||
|
||||
/*
|
||||
* NodeConnectionKey acts as the key to index into the (process-local) hash
|
||||
* keeping track of open connections. Node name and port are sufficient.
|
||||
|
@ -53,8 +52,18 @@ typedef struct NodeConnectionEntry
|
|||
} NodeConnectionEntry;
|
||||
|
||||
|
||||
/* describes what kind of modifications have occurred in the current transaction */
|
||||
typedef enum
|
||||
{
|
||||
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
|
||||
XACT_MODIFICATION_NONE, /* no modifications have taken place */
|
||||
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
|
||||
XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */
|
||||
} XactModificationType;
|
||||
|
||||
|
||||
/* state needed to prevent new connections during modifying transactions */
|
||||
extern bool IsModifyingTransaction;
|
||||
extern XactModificationType XactModificationLevel;
|
||||
|
||||
|
||||
/* function declarations for obtaining and using a connection */
|
||||
|
|
|
@ -100,6 +100,7 @@ SELECT name FROM researchers WHERE lab_id = 4;
|
|||
BEGIN;
|
||||
DO $$
|
||||
BEGIN
|
||||
INSERT INTO researchers VALUES (11, 11, 'Whitfield Diffie');
|
||||
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
|
||||
EXCEPTION
|
||||
WHEN not_null_violation THEN
|
||||
|
@ -150,19 +151,41 @@ SELECT count(*) FROM researchers WHERE lab_id = 6;
|
|||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
ABORT;
|
||||
-- applies to DDL or COPY, too
|
||||
-- applies to DDL, too
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ALTER TABLE labs ADD COLUMN text motto;
|
||||
ERROR: distributed DDL commands cannot run inside a transaction block
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
COMMIT;
|
||||
-- whether it occurs first or second
|
||||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands
|
||||
COMMIT;
|
||||
-- but the DDL should correctly roll back
|
||||
\d labs
|
||||
Table "public.labs"
|
||||
Column | Type | Modifiers
|
||||
--------+--------+-----------
|
||||
id | bigint | not null
|
||||
name | text | not null
|
||||
|
||||
SELECT * FROM labs WHERE id = 6;
|
||||
id | name
|
||||
----+------
|
||||
(0 rows)
|
||||
|
||||
-- COPY can't happen second,
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
|
||||
COMMIT;
|
||||
-- though the copy will work if before any modifications
|
||||
-- though it will work if before any modifications
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
SELECT name FROM labs WHERE id = 10;
|
||||
|
@ -173,6 +196,59 @@ SELECT name FROM labs WHERE id = 10;
|
|||
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
COMMIT;
|
||||
-- but a double-copy isn't allowed (the first will persist)
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
CONTEXT: COPY labs, line 1: "12,fsociety"
|
||||
COMMIT;
|
||||
SELECT name FROM labs WHERE id = 11;
|
||||
name
|
||||
----------------
|
||||
Planet Express
|
||||
(1 row)
|
||||
|
||||
-- finally, ALTER and copy aren't compatible
|
||||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
CONTEXT: COPY labs, line 1: "12,fsociety,lol"
|
||||
COMMIT;
|
||||
-- but the DDL should correctly roll back
|
||||
\d labs
|
||||
Table "public.labs"
|
||||
Column | Type | Modifiers
|
||||
--------+--------+-----------
|
||||
id | bigint | not null
|
||||
name | text | not null
|
||||
|
||||
SELECT * FROM labs WHERE id = 12;
|
||||
id | name
|
||||
----+------
|
||||
(0 rows)
|
||||
|
||||
-- and if the copy is before the ALTER...
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
COMMIT;
|
||||
-- the DDL fails, but copy persists
|
||||
\d labs
|
||||
Table "public.labs"
|
||||
Column | Type | Modifiers
|
||||
--------+--------+-----------
|
||||
id | bigint | not null
|
||||
name | text | not null
|
||||
|
||||
SELECT * FROM labs WHERE id = 12;
|
||||
id | name
|
||||
----+----------
|
||||
12 | fsociety
|
||||
(1 row)
|
||||
|
||||
-- now, for some special failures...
|
||||
CREATE TABLE objects (
|
||||
id bigint PRIMARY KEY,
|
||||
|
|
|
@ -165,7 +165,7 @@ COMMIT;
|
|||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
DROP INDEX temp_index_1;
|
||||
|
||||
-- verify that distributed ddl commands are not allowed inside a transaction block
|
||||
-- verify that single distributed ddl commands are allowed inside a transaction block
|
||||
SET citus.enable_ddl_propagation to true;
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
|
@ -173,6 +173,27 @@ COMMIT;
|
|||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
DROP INDEX temp_index_2;
|
||||
|
||||
-- but that multiple ddl statements in a block results in ROLLBACK
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
COMMIT;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
|
||||
-- and distributed SELECTs cannot appear after ALTER
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
COMMIT;
|
||||
|
||||
-- but are allowed before
|
||||
BEGIN;
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
COMMIT;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
DROP INDEX temp_index_2;
|
||||
|
||||
--- verify that distributed ddl commands can be used with 2pc
|
||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
|
||||
|
|
|
@ -424,19 +424,51 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
|||
(1 row)
|
||||
|
||||
DROP INDEX temp_index_1;
|
||||
-- verify that distributed ddl commands are not allowed inside a transaction block
|
||||
-- verify that single distributed ddl commands are allowed inside a transaction block
|
||||
SET citus.enable_ddl_propagation to true;
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ERROR: distributed DDL commands cannot run inside a transaction block
|
||||
COMMIT;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
--------------+----------------
|
||||
temp_index_2 | lineitem_alter
|
||||
(1 row)
|
||||
|
||||
DROP INDEX temp_index_2;
|
||||
-- but that multiple ddl statements in a block results in ROLLBACK
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
COMMIT;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- and distributed SELECTs cannot appear after ALTER
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
COMMIT;
|
||||
-- but are allowed before
|
||||
BEGIN;
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
l_orderkey
|
||||
------------
|
||||
(0 rows)
|
||||
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
COMMIT;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
--------------+----------------
|
||||
temp_index_2 | lineitem_alter
|
||||
(1 row)
|
||||
|
||||
DROP INDEX temp_index_2;
|
||||
ERROR: index "temp_index_2" does not exist
|
||||
--- verify that distributed ddl commands can be used with 2pc
|
||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
|
||||
|
|
|
@ -78,6 +78,7 @@ SELECT name FROM researchers WHERE lab_id = 4;
|
|||
BEGIN;
|
||||
DO $$
|
||||
BEGIN
|
||||
INSERT INTO researchers VALUES (11, 11, 'Whitfield Diffie');
|
||||
INSERT INTO researchers VALUES (NULL, 10, 'Edsger Dijkstra');
|
||||
EXCEPTION
|
||||
WHEN not_null_violation THEN
|
||||
|
@ -122,12 +123,23 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
|||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
ABORT;
|
||||
|
||||
-- applies to DDL or COPY, too
|
||||
-- applies to DDL, too
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ALTER TABLE labs ADD COLUMN text motto;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
COMMIT;
|
||||
|
||||
-- whether it occurs first or second
|
||||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
COMMIT;
|
||||
|
||||
-- but the DDL should correctly roll back
|
||||
\d labs
|
||||
SELECT * FROM labs WHERE id = 6;
|
||||
|
||||
-- COPY can't happen second,
|
||||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
\copy labs from stdin delimiter ','
|
||||
|
@ -135,7 +147,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
|||
\.
|
||||
COMMIT;
|
||||
|
||||
-- though the copy will work if before any modifications
|
||||
-- though it will work if before any modifications
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
10,Weyland-Yutani
|
||||
|
@ -144,6 +156,42 @@ SELECT name FROM labs WHERE id = 10;
|
|||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
COMMIT;
|
||||
|
||||
-- but a double-copy isn't allowed (the first will persist)
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
11,Planet Express
|
||||
\.
|
||||
\copy labs from stdin delimiter ','
|
||||
12,fsociety
|
||||
\.
|
||||
COMMIT;
|
||||
|
||||
SELECT name FROM labs WHERE id = 11;
|
||||
|
||||
-- finally, ALTER and copy aren't compatible
|
||||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
\copy labs from stdin delimiter ','
|
||||
12,fsociety,lol
|
||||
\.
|
||||
COMMIT;
|
||||
|
||||
-- but the DDL should correctly roll back
|
||||
\d labs
|
||||
SELECT * FROM labs WHERE id = 12;
|
||||
|
||||
-- and if the copy is before the ALTER...
|
||||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
12,fsociety
|
||||
\.
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
COMMIT;
|
||||
|
||||
-- the DDL fails, but copy persists
|
||||
\d labs
|
||||
SELECT * FROM labs WHERE id = 12;
|
||||
|
||||
-- now, for some special failures...
|
||||
CREATE TABLE objects (
|
||||
id bigint PRIMARY KEY,
|
||||
|
|
Loading…
Reference in New Issue