diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a7858d4c2..bf762cc7e 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -270,6 +270,14 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) int32 nodePort = masterNodeAddress->nodePort; char *nodeUser = CurrentUserName(); + if (XactModificationLevel > XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("distributed copy operations must not appear in " + "transaction blocks containing other distributed " + "modifications"))); + } + masterConnection = ConnectToNode(nodeName, nodePort, nodeUser); PG_TRY(); @@ -363,7 +371,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) ShardInterval **shardIntervalCache = NULL; bool useBinarySearch = false; - HTAB *shardConnectionHash = NULL; + HTAB *copyConnectionHash = NULL; ShardConnections *shardConnections = NULL; List *connectionList = NIL; @@ -465,7 +473,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) * PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp * documentation). */ - shardConnectionHash = CreateShardConnectionHash(); + copyConnectionHash = CreateShardConnectionHash(TopTransactionContext); /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ PG_TRY(); @@ -534,9 +542,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) MemoryContextSwitchTo(oldContext); /* get existing connections to the shard placements, if any */ - shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); + shardConnections = GetShardHashConnections(copyConnectionHash, shardId, + &shardConnectionsFound); if (!shardConnectionsFound) { /* open connections and initiate COPY on shard placements */ @@ -560,7 +567,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) processedRowCount += 1; } - connectionList = ConnectionList(shardConnectionHash); + connectionList = ConnectionList(copyConnectionHash); /* send copy binary footers to all shard placements */ if (copyOutState->binary) @@ -590,7 +597,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) List *abortConnectionList = NIL; /* roll back all transactions */ - abortConnectionList = ConnectionList(shardConnectionHash); + abortConnectionList = ConnectionList(copyConnectionHash); EndRemoteCopy(abortConnectionList, false); AbortRemoteTransactions(abortConnectionList); CloseConnections(abortConnectionList); @@ -936,6 +943,14 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections MemoryContextSwitchTo(oldContext); + if (XactModificationLevel > XACT_MODIFICATION_NONE) + { + ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("distributed copy operations must not appear in " + "transaction blocks containing other distributed " + "modifications"))); + } + foreach(placementCell, finalizedPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 1f7b0f206..d51914acf 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -1255,11 +1255,11 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, { bool executionOK = false; - if (XactModificationLevel > XACT_MODIFICATION_NONE) + if (XactModificationLevel == XACT_MODIFICATION_DATA) { ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("distributed DDL commands must not appear within " - "transaction blocks containing other modifications"))); + "transaction blocks containing data modifications"))); } ShowNoticeIfNotUsing2PC(); @@ -1322,18 +1322,12 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) { List *shardIntervalList = LoadShardIntervalList(relationId); char *tableOwner = TableOwner(relationId); - HTAB *shardConnectionHash = NULL; ListCell *shardIntervalCell = NULL; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); - MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); - LockShards(shardIntervalList, ShareLock); - - shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, - tableOwner); - MemoryContextSwitchTo(oldContext); + OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); foreach(shardIntervalCell, shardIntervalList) { @@ -1345,9 +1339,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) char *escapedCommandString = quote_literal_cstr(commandString); StringInfo applyCommand = makeStringInfo(); - shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); Assert(shardConnectionsFound); /* build the shard ddl command */ diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 008909e09..fb9b2dd6e 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -214,15 +214,9 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) { int affectedTupleCount = 0; char *relationOwner = TableOwner(relationId); - HTAB *shardConnectionHash = NULL; ListCell *shardIntervalCell = NULL; - MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); - - shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, - relationOwner); - - MemoryContextSwitchTo(oldContext); + OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner); foreach(shardIntervalCell, shardIntervalList) { @@ -236,9 +230,7 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) char *shardQueryStringData = NULL; int shardAffectedTupleCount = -1; - shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); Assert(shardConnectionsFound); deparse_shard_query(query, relationId, shardId, shardQueryString); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bc8abd0b7..b486b590e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -29,6 +29,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" @@ -152,8 +153,9 @@ _PG_init(void) /* initialize worker node manager */ WorkerNodeRegister(); - /* initialize router executor callbacks */ + /* initialize transaction callbacks */ InstallRouterExecutorShmemHook(); + InstallMultiShardXactShmemHook(); } diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 272e4bf31..92b66f786 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -18,72 +18,57 @@ #include "distributed/master_metadata_utility.h" #include "distributed/multi_shard_transaction.h" #include "nodes/pg_list.h" +#include "storage/ipc.h" #include "utils/memutils.h" #define INITIAL_CONNECTION_CACHE_SIZE 1001 -/* Local functions forward declarations */ -static void RegisterShardPlacementXactCallback(void); - - /* Global variables used in commit handler */ -static List *shardPlacementConnectionList = NIL; -static bool isXactCallbackRegistered = false; +static HTAB *shardConnectionHash = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static bool subXactAbortAttempted = false; + +/* functions needed by callbacks and hooks */ +static void RegisterShardPlacementXactCallbacks(void); +static void CompleteShardPlacementTransactions(XactEvent event, void *arg); +static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg); /* - * OpenTransactionsToAllShardPlacements opens connections to all placements of - * the given shard Id Pointer List and returns the hash table containing the connections. - * The resulting hash table maps shardIds to ShardConnection structs. + * OpenTransactionsToAllShardPlacements opens connections to all placements + * using the provided shard identifier list. Connections accumulate in a global + * shardConnectionHash variable for use (and re-use) within this transaction. */ -HTAB * +void OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) { - HTAB *shardConnectionHash = CreateShardConnectionHash(); ListCell *shardIntervalCell = NULL; - ListCell *connectionCell = NULL; - List *connectionList = NIL; + + if (shardConnectionHash == NULL) + { + shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); + } foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; - OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName); + BeginTransactionOnShardPlacements(shardId, userName); } - - connectionList = ConnectionList(shardConnectionHash); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = NULL; - - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(connection, result); - } - } - - shardPlacementConnectionList = ConnectionList(shardConnectionHash); - - RegisterShardPlacementXactCallback(); - - return shardConnectionHash; } /* - * CreateShardConnectionHash constructs a hash table used for shardId->Connection - * mapping. + * CreateShardConnectionHash constructs a hash table which maps from shard + * identifier to connection lists, passing the provided MemoryContext to + * hash_create for hash allocations. */ HTAB * -CreateShardConnectionHash(void) +CreateShardConnectionHash(MemoryContext memoryContext) { HTAB *shardConnectionsHash = NULL; int hashFlags = 0; @@ -92,10 +77,9 @@ CreateShardConnectionHash(void) memset(&info, 0, sizeof(info)); info.keysize = sizeof(int64); info.entrysize = sizeof(ShardConnections); - info.hash = tag_hash; - info.hcxt = TopTransactionContext; + info.hcxt = memoryContext; + hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); - hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; shardConnectionsHash = hash_create("Shard Connections Hash", INITIAL_CONNECTION_CACHE_SIZE, &info, hashFlags); @@ -105,80 +89,106 @@ CreateShardConnectionHash(void) /* - * OpenConnectionsToShardPlacements opens connections to all placements of the - * shard with the given shardId and populates the shardConnectionHash table - * accordingly. + * BeginTransactionOnShardPlacements opens new connections (if necessary) to + * all placements of a shard (specified by shard identifier). After sending a + * BEGIN command on all connections, they are added to shardConnectionHash for + * use within this transaction. Exits early if connections already exist for + * the specified shard, and errors if no placements can be found, a connection + * cannot be made, or if the BEGIN command fails. */ void -OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, - char *userName) +BeginTransactionOnShardPlacements(uint64 shardId, char *userName) { + List *shardPlacementList = NIL; + ListCell *placementCell = NULL; + + ShardConnections *shardConnections = NULL; bool shardConnectionsFound = false; - /* get existing connections to the shard placements, if any */ - ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); - - List *shardPlacementList = FinalizedShardPlacementList(shardId); - ListCell *shardPlacementCell = NULL; - List *connectionList = NIL; - - Assert(!shardConnectionsFound); + MemoryContext oldContext = NULL; + shardPlacementList = FinalizedShardPlacementList(shardId); if (shardPlacementList == NIL) { + /* going to have to have some placements to do any work */ ereport(ERROR, (errmsg("could not find any shard placements for the shard " UINT64_FORMAT, shardId))); } - foreach(shardPlacementCell, shardPlacementList) + /* get existing connections to the shard placements, if any */ + shardConnections = GetShardConnections(shardId, &shardConnectionsFound); + if (shardConnectionsFound) { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( - shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - PGconn *connection = ConnectToNode(workerName, workerPort, userName); + /* exit early if we've already established shard transactions */ + return; + } + + foreach(placementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); + PGconn *connection = NULL; TransactionConnection *transactionConnection = NULL; + PGresult *result = NULL; + + connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort, + userName); if (connection == NULL) { - List *abortConnectionList = ConnectionList(shardConnectionHash); - CloseConnections(abortConnectionList); - ereport(ERROR, (errmsg("could not establish a connection to all " "placements of shard %lu", shardId))); } + /* entries must last through the whole top-level transaction */ + oldContext = MemoryContextSwitchTo(TopTransactionContext); + transactionConnection = palloc0(sizeof(TransactionConnection)); transactionConnection->connectionId = shardConnections->shardId; transactionConnection->transactionState = TRANSACTION_STATE_INVALID; transactionConnection->connection = connection; - connectionList = lappend(connectionList, transactionConnection); - } + shardConnections->connectionList = lappend(shardConnections->connectionList, + transactionConnection); - shardConnections->connectionList = connectionList; + MemoryContextSwitchTo(oldContext); + + /* now that connection is tracked, issue BEGIN */ + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(connection, result); + } + } } /* - * GetShardConnections finds existing connections for a shard in the hash. - * If not found, then a ShardConnections structure with empty connectionList - * is returned. + * GetShardConnections finds existing connections for a shard in the global + * connection hash. If not found, then a ShardConnections structure with empty + * connectionList is returned and the shardConnectionsFound output parameter + * will be set to false. */ ShardConnections * -GetShardConnections(HTAB *shardConnectionHash, int64 shardId, - bool *shardConnectionsFound) +GetShardConnections(int64 shardId, bool *shardConnectionsFound) +{ + return GetShardHashConnections(shardConnectionHash, shardId, shardConnectionsFound); +} + + +/* + * GetShardHashConnections finds existing connections for a shard in the + * provided hash. If not found, then a ShardConnections structure with empty + * connectionList is returned. + */ +ShardConnections * +GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound) { ShardConnections *shardConnections = NULL; - shardConnections = (ShardConnections *) hash_search(shardConnectionHash, - &shardId, - HASH_ENTER, - shardConnectionsFound); - if (!*shardConnectionsFound) + shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId, + HASH_ENTER, connectionsFound); + if (!*connectionsFound) { shardConnections->shardId = shardId; shardConnections->connectionList = NIL; @@ -198,6 +208,11 @@ ConnectionList(HTAB *connectionHash) HASH_SEQ_STATUS status; ShardConnections *shardConnections = NULL; + if (connectionHash == NULL) + { + return NIL; + } + hash_seq_init(&status, connectionHash); shardConnections = (ShardConnections *) hash_seq_search(&status); @@ -214,16 +229,31 @@ ConnectionList(HTAB *connectionHash) /* - * EnableXactCallback ensures the XactCallback for committing/aborting - * remote worker transactions is registered. + * InstallMultiShardXactShmemHook simply installs a hook (intended to be called + * once during backend startup), which will itself register all the transaction + * callbacks needed by multi-shard transaction logic. */ void -RegisterShardPlacementXactCallback(void) +InstallMultiShardXactShmemHook(void) { - if (!isXactCallbackRegistered) + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = RegisterShardPlacementXactCallbacks; +} + + +/* + * RegisterShardPlacementXactCallbacks registers transaction callbacks needed + * for multi-shard transactions before calling previous shmem startup hooks. + */ +static void +RegisterShardPlacementXactCallbacks(void) +{ + RegisterXactCallback(CompleteShardPlacementTransactions, NULL); + RegisterSubXactCallback(MultiShardSubXactCallback, NULL); + + if (prev_shmem_startup_hook != NULL) { - RegisterXactCallback(CompleteShardPlacementTransactions, NULL); - isXactCallbackRegistered = true; + prev_shmem_startup_hook(); } } @@ -232,16 +262,28 @@ RegisterShardPlacementXactCallback(void) * CompleteShardPlacementTransactions commits or aborts pending shard placement * transactions when the local transaction commits or aborts. */ -void +static void CompleteShardPlacementTransactions(XactEvent event, void *arg) { - if (shardPlacementConnectionList == NIL) + List *connectionList = ConnectionList(shardConnectionHash); + + if (shardConnectionHash == NULL) { /* nothing to do */ return; } - else if (event == XACT_EVENT_PRE_COMMIT) + + if (event == XACT_EVENT_PRE_COMMIT) { + if (subXactAbortAttempted) + { + subXactAbortAttempted = false; + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot ROLLBACK TO SAVEPOINT in transactions " + "which modify distributed tables"))); + } + /* * Any failure here will cause local changes to be rolled back, * and remote changes to either roll back (1PC) or, in case of @@ -251,7 +293,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { - PrepareRemoteTransactions(shardPlacementConnectionList); + PrepareRemoteTransactions(connectionList); } return; @@ -265,7 +307,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) * changes have already been committed. */ - CommitRemoteTransactions(shardPlacementConnectionList, false); + CommitRemoteTransactions(connectionList, false); } else if (event == XACT_EVENT_ABORT) { @@ -276,16 +318,28 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg) * already been rolled back. */ - AbortRemoteTransactions(shardPlacementConnectionList); + AbortRemoteTransactions(connectionList); } else { return; } - CloseConnections(shardPlacementConnectionList); - shardPlacementConnectionList = NIL; + CloseConnections(connectionList); + shardConnectionHash = NULL; XactModificationLevel = XACT_MODIFICATION_NONE; + subXactAbortAttempted = false; +} + + +static void +MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, + SubTransactionId parentSubid, void *arg) +{ + if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB)) + { + subXactAbortAttempted = true; + } } diff --git a/src/backend/distributed/utils/connection_cache.c b/src/backend/distributed/utils/connection_cache.c index 22644a890..191a25c4b 100644 --- a/src/backend/distributed/utils/connection_cache.c +++ b/src/backend/distributed/utils/connection_cache.c @@ -377,13 +377,6 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser) sprintf(nodePortString, "%d", nodePort); - if (XactModificationLevel > XACT_MODIFICATION_NONE) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot open new connections after the first modification " - "command within a transaction"))); - } - Assert(sizeof(keywordArray) == sizeof(valueArray)); for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index 346de8ca9..df590bd48 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -54,7 +54,6 @@ extern int MultiShardCommitProtocol; /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); -extern void CompleteShardPlacementTransactions(XactEvent event, void *arg); extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h index 95f257768..4b68378d7 100644 --- a/src/include/distributed/multi_shard_transaction.h +++ b/src/include/distributed/multi_shard_transaction.h @@ -25,16 +25,15 @@ typedef struct ShardConnections } ShardConnections; -extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, - char *relationOwner); -extern HTAB * CreateShardConnectionHash(void); -extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, - char *nodeUser); -extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, - int64 shardId, - bool *shardConnectionsFound); +extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner); +extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext); +extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser); +extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound); +extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, + bool *connectionsFound); extern List * ConnectionList(HTAB *connectionHash); extern void CloseConnections(List *connectionList); +extern void InstallMultiShardXactShmemHook(void); #endif /* MULTI_SHARD_TRANSACTION_H */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 328bc1233..0bef4ff32 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -155,7 +155,7 @@ ABORT; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications COMMIT; -- whether it occurs first or second BEGIN; @@ -182,7 +182,7 @@ SELECT * FROM labs WHERE id = 6; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); \copy labs from stdin delimiter ',' -ERROR: cannot open new connections after the first modification command within a transaction +ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications CONTEXT: COPY labs, line 1: "10,Weyland-Yutani" COMMIT; -- though it will work if before any modifications @@ -200,7 +200,7 @@ COMMIT; BEGIN; \copy labs from stdin delimiter ',' \copy labs from stdin delimiter ',' -ERROR: cannot open new connections after the first modification command within a transaction +ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications CONTEXT: COPY labs, line 1: "12,fsociety" COMMIT; SELECT name FROM labs WHERE id = 11; @@ -213,7 +213,7 @@ SELECT name FROM labs WHERE id = 11; BEGIN; ALTER TABLE labs ADD COLUMN motto text; \copy labs from stdin delimiter ',' -ERROR: cannot open new connections after the first modification command within a transaction +ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications CONTEXT: COPY labs, line 1: "12,fsociety,lol" COMMIT; -- but the DDL should correctly roll back @@ -233,7 +233,7 @@ SELECT * FROM labs WHERE id = 12; BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; -ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications +ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications COMMIT; -- the DDL fails, but copy persists \d labs diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index a50b384fb..614327b0c 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -173,14 +173,114 @@ COMMIT; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; DROP INDEX temp_index_2; --- but that multiple ddl statements in a block results in ROLLBACK +-- and so are multiple ddl statements BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); ALTER TABLE lineitem_alter ADD COLUMN first integer; COMMIT; + +\d lineitem_alter + +ALTER TABLE lineitem_alter DROP COLUMN first; +DROP INDEX temp_index_2; + +-- ensure that user-specified rollback causes full rollback +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK; + SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; --- and distributed SELECTs cannot appear after ALTER +-- ensure that errors cause full rollback +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +ROLLBACK; + +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + +-- verify that SAVEPOINT is allowed... +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +SAVEPOINT my_savepoint; +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK; + +-- but that actually rolling back to it is not +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +SAVEPOINT my_savepoint; +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK TO my_savepoint; +COMMIT; + +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + +-- Add column on only one worker... +\c - - - :worker_2_port +ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer; +\c - - - :master_port + +-- and try to add it in a multi-statement block, which fails +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +ALTER TABLE lineitem_alter ADD COLUMN first integer; +COMMIT; + +-- Nothing from the block should have committed +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + +-- Create single-shard table (to avoid deadlocks in the upcoming test hackery) +CREATE TABLE single_shard_items (id integer, name text); +SELECT master_create_distributed_table('single_shard_items', 'id', 'hash'); +SELECT master_create_worker_shards('single_shard_items', 1, 2); + +-- Drop the column from the worker... +\c - - - :worker_2_port +ALTER TABLE lineitem_alter_220000 DROP COLUMN first; + +-- Create table to trigger at-xact-end (deferred) failure +CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED); + +-- Use an event trigger to log all DDL event tags in it +CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$ + BEGIN + INSERT INTO ddl_commands VALUES (tg_tag); + END; +$ldt$ LANGUAGE plpgsql; + +CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag(); + +\c - - - :master_port +-- The above trigger will cause failure at transaction end on one placement. +-- We'll test 2PC first, as it should handle this "best" (no divergence) +SET citus.multi_shard_commit_protocol TO '2pc'; +BEGIN; +CREATE INDEX single_index_2 ON single_shard_items(id); +CREATE INDEX single_index_3 ON single_shard_items(name); +COMMIT; + +-- Nothing from the block should have committed +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; + +-- Now try with 2pc off +RESET citus.multi_shard_commit_protocol; +BEGIN; +CREATE INDEX single_index_2 ON single_shard_items(id); +CREATE INDEX single_index_3 ON single_shard_items(name); +COMMIT; + +-- The block should have committed with a warning +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; + +\c - - - :worker_2_port +DROP EVENT TRIGGER log_ddl_tag; +DROP FUNCTION log_ddl_tag(); +DROP TABLE ddl_commands; + +\c - - - :master_port +-- Distributed SELECTs cannot appear after ALTER BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); SELECT l_orderkey FROM lineitem_alter LIMIT 0; diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index d00052881..2e74da86d 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -436,20 +436,169 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; (1 row) DROP INDEX temp_index_2; --- but that multiple ddl statements in a block results in ROLLBACK +-- and so are multiple ddl statements BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); ALTER TABLE lineitem_alter ADD COLUMN first integer; -ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications COMMIT; +\d lineitem_alter + Table "public.lineitem_alter" + Column | Type | Modifiers +-----------------+-----------------------+----------- + l_orderkey | bigint | not null + l_partkey | integer | not null + l_suppkey | integer | not null + l_linenumber | integer | not null + l_quantity | numeric(15,2) | not null + l_extendedprice | numeric(15,2) | not null + l_discount | numeric(15,2) | not null + l_tax | numeric(15,2) | not null + l_returnflag | character(1) | not null + l_linestatus | character(1) | not null + l_shipdate | date | not null + l_commitdate | date | not null + l_receiptdate | date | not null + l_shipinstruct | character(25) | not null + l_shipmode | character(10) | not null + l_comment | character varying(44) | not null + null_column | integer | + first | integer | +Indexes: + "temp_index_2" btree (l_orderkey) + +ALTER TABLE lineitem_alter DROP COLUMN first; +DROP INDEX temp_index_2; +-- ensure that user-specified rollback causes full rollback +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; indexname | tablename -----------+----------- (0 rows) --- and distributed SELECTs cannot appear after ALTER +-- ensure that errors cause full rollback BEGIN; CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +ERROR: relation "temp_index_2" already exists +ROLLBACK; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +-----------+----------- +(0 rows) + +-- verify that SAVEPOINT is allowed... +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +SAVEPOINT my_savepoint; +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK; +-- but that actually rolling back to it is not +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +SAVEPOINT my_savepoint; +CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey); +ROLLBACK TO my_savepoint; +COMMIT; +ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +-----------+----------- +(0 rows) + +-- Add column on only one worker... +\c - - - :worker_2_port +ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer; +\c - - - :master_port +-- and try to add it in a multi-statement block, which fails +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +ALTER TABLE lineitem_alter ADD COLUMN first integer; +WARNING: column "first" of relation "lineitem_alter_220000" already exists +CONTEXT: while executing command on localhost:57638 +ERROR: could not execute DDL command on worker node shards +COMMIT; +-- Nothing from the block should have committed +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +-----------+----------- +(0 rows) + +-- Create single-shard table (to avoid deadlocks in the upcoming test hackery) +CREATE TABLE single_shard_items (id integer, name text); +SELECT master_create_distributed_table('single_shard_items', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('single_shard_items', 1, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- Drop the column from the worker... +\c - - - :worker_2_port +ALTER TABLE lineitem_alter_220000 DROP COLUMN first; +-- Create table to trigger at-xact-end (deferred) failure +CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED); +-- Use an event trigger to log all DDL event tags in it +CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$ + BEGIN + INSERT INTO ddl_commands VALUES (tg_tag); + END; +$ldt$ LANGUAGE plpgsql; +CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag(); +\c - - - :master_port +-- The above trigger will cause failure at transaction end on one placement. +-- We'll test 2PC first, as it should handle this "best" (no divergence) +SET citus.multi_shard_commit_protocol TO '2pc'; +BEGIN; +CREATE INDEX single_index_2 ON single_shard_items(id); +CREATE INDEX single_index_3 ON single_shard_items(name); +COMMIT; +WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" +DETAIL: Key (command)=(CREATE INDEX) already exists. +CONTEXT: while executing command on localhost:57638 +ERROR: failed to prepare transaction +-- Nothing from the block should have committed +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; + indexname | tablename +-----------+----------- +(0 rows) + +-- Now try with 2pc off +RESET citus.multi_shard_commit_protocol; +BEGIN; +CREATE INDEX single_index_2 ON single_shard_items(id); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +CREATE INDEX single_index_3 ON single_shard_items(name); +COMMIT; +WARNING: failed to commit transaction on localhost:57638 +-- The block should have committed with a warning +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; + indexname | tablename +----------------+-------------------- + single_index_2 | single_shard_items + single_index_3 | single_shard_items +(2 rows) + +\c - - - :worker_2_port +DROP EVENT TRIGGER log_ddl_tag; +DROP FUNCTION log_ddl_tag(); +DROP TABLE ddl_commands; +\c - - - :master_port +-- Distributed SELECTs cannot appear after ALTER +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT l_orderkey FROM lineitem_alter LIMIT 0; ERROR: cannot open new connections after the first modification command within a transaction COMMIT; @@ -502,7 +651,7 @@ SELECT master_create_worker_shards('test_ab', 8, 2); INSERT INTO test_ab VALUES (2, 10); INSERT INTO test_ab VALUES (2, 11); CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); -WARNING: could not create unique index "temp_unique_index_1_220021" +WARNING: could not create unique index "temp_unique_index_1_220022" DETAIL: Key (a)=(2) is duplicated. CONTEXT: while executing command on localhost:57638 ERROR: could not execute DDL command on worker node shards @@ -550,7 +699,8 @@ ORDER BY attnum; null_column | integer ........pg.dropped.22........ | - ........pg.dropped.23........ | - -(29 rows) + ........pg.dropped.24........ | - +(30 rows) \c - - - :master_port -- verify that we don't intercept DDL commands if propagation is turned off