Merge pull request #764 from citusdata/feature/allow_multi_ddl_xact_block

Permit multiple DDL commands in a transaction

cr: @marcocitus
pull/777/head
Jason Petersen 2016-09-08 22:50:07 -05:00 committed by GitHub
commit 9fd6dafe33
11 changed files with 447 additions and 151 deletions

View File

@ -270,6 +270,14 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
int32 nodePort = masterNodeAddress->nodePort; int32 nodePort = masterNodeAddress->nodePort;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser); masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
PG_TRY(); PG_TRY();
@ -363,7 +371,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
ShardInterval **shardIntervalCache = NULL; ShardInterval **shardIntervalCache = NULL;
bool useBinarySearch = false; bool useBinarySearch = false;
HTAB *shardConnectionHash = NULL; HTAB *copyConnectionHash = NULL;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *connectionList = NIL;
@ -465,7 +473,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp * PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
* documentation). * documentation).
*/ */
shardConnectionHash = CreateShardConnectionHash(); copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY(); PG_TRY();
@ -534,8 +542,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
/* get existing connections to the shard placements, if any */ /* get existing connections to the shard placements, if any */
shardConnections = GetShardConnections(shardConnectionHash, shardConnections = GetShardHashConnections(copyConnectionHash, shardId,
shardId,
&shardConnectionsFound); &shardConnectionsFound);
if (!shardConnectionsFound) if (!shardConnectionsFound)
{ {
@ -560,7 +567,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
processedRowCount += 1; processedRowCount += 1;
} }
connectionList = ConnectionList(shardConnectionHash); connectionList = ConnectionList(copyConnectionHash);
/* send copy binary footers to all shard placements */ /* send copy binary footers to all shard placements */
if (copyOutState->binary) if (copyOutState->binary)
@ -590,7 +597,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
List *abortConnectionList = NIL; List *abortConnectionList = NIL;
/* roll back all transactions */ /* roll back all transactions */
abortConnectionList = ConnectionList(shardConnectionHash); abortConnectionList = ConnectionList(copyConnectionHash);
EndRemoteCopy(abortConnectionList, false); EndRemoteCopy(abortConnectionList, false);
AbortRemoteTransactions(abortConnectionList); AbortRemoteTransactions(abortConnectionList);
CloseConnections(abortConnectionList); CloseConnections(abortConnectionList);
@ -936,6 +943,14 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
foreach(placementCell, finalizedPlacementList) foreach(placementCell, finalizedPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);

View File

@ -1255,11 +1255,11 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
{ {
bool executionOK = false; bool executionOK = false;
if (XactModificationLevel > XACT_MODIFICATION_NONE) if (XactModificationLevel == XACT_MODIFICATION_DATA)
{ {
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed DDL commands must not appear within " errmsg("distributed DDL commands must not appear within "
"transaction blocks containing other modifications"))); "transaction blocks containing data modifications")));
} }
ShowNoticeIfNotUsing2PC(); ShowNoticeIfNotUsing2PC();
@ -1322,18 +1322,12 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
{ {
List *shardIntervalList = LoadShardIntervalList(relationId); List *shardIntervalList = LoadShardIntervalList(relationId);
char *tableOwner = TableOwner(relationId); char *tableOwner = TableOwner(relationId);
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
LockShards(shardIntervalList, ShareLock); LockShards(shardIntervalList, ShareLock);
OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner);
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
tableOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
@ -1345,9 +1339,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
char *escapedCommandString = quote_literal_cstr(commandString); char *escapedCommandString = quote_literal_cstr(commandString);
StringInfo applyCommand = makeStringInfo(); StringInfo applyCommand = makeStringInfo();
shardConnections = GetShardConnections(shardConnectionHash, shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
shardId,
&shardConnectionsFound);
Assert(shardConnectionsFound); Assert(shardConnectionsFound);
/* build the shard ddl command */ /* build the shard ddl command */

View File

@ -214,15 +214,9 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
{ {
int affectedTupleCount = 0; int affectedTupleCount = 0;
char *relationOwner = TableOwner(relationId); char *relationOwner = TableOwner(relationId);
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
relationOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
@ -236,9 +230,7 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
char *shardQueryStringData = NULL; char *shardQueryStringData = NULL;
int shardAffectedTupleCount = -1; int shardAffectedTupleCount = -1;
shardConnections = GetShardConnections(shardConnectionHash, shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
shardId,
&shardConnectionsFound);
Assert(shardConnectionsFound); Assert(shardConnectionsFound);
deparse_shard_query(query, relationId, shardId, shardQueryString); deparse_shard_query(query, relationId, shardId, shardQueryString);

View File

@ -29,6 +29,7 @@
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -152,8 +153,9 @@ _PG_init(void)
/* initialize worker node manager */ /* initialize worker node manager */
WorkerNodeRegister(); WorkerNodeRegister();
/* initialize router executor callbacks */ /* initialize transaction callbacks */
InstallRouterExecutorShmemHook(); InstallRouterExecutorShmemHook();
InstallMultiShardXactShmemHook();
} }

View File

@ -18,72 +18,57 @@
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "storage/ipc.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#define INITIAL_CONNECTION_CACHE_SIZE 1001 #define INITIAL_CONNECTION_CACHE_SIZE 1001
/* Local functions forward declarations */
static void RegisterShardPlacementXactCallback(void);
/* Global variables used in commit handler */ /* Global variables used in commit handler */
static List *shardPlacementConnectionList = NIL; static HTAB *shardConnectionHash = NULL;
static bool isXactCallbackRegistered = false; static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool subXactAbortAttempted = false;
/* functions needed by callbacks and hooks */
static void RegisterShardPlacementXactCallbacks(void);
static void CompleteShardPlacementTransactions(XactEvent event, void *arg);
static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg);
/* /*
* OpenTransactionsToAllShardPlacements opens connections to all placements of * OpenTransactionsToAllShardPlacements opens connections to all placements
* the given shard Id Pointer List and returns the hash table containing the connections. * using the provided shard identifier list. Connections accumulate in a global
* The resulting hash table maps shardIds to ShardConnection structs. * shardConnectionHash variable for use (and re-use) within this transaction.
*/ */
HTAB * void
OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
{ {
HTAB *shardConnectionHash = CreateShardConnectionHash();
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
ListCell *connectionCell = NULL;
List *connectionList = NIL; if (shardConnectionHash == NULL)
{
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
}
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName); BeginTransactionOnShardPlacements(shardId, userName);
} }
connectionList = ConnectionList(shardConnectionHash);
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PGresult *result = NULL;
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
}
shardPlacementConnectionList = ConnectionList(shardConnectionHash);
RegisterShardPlacementXactCallback();
return shardConnectionHash;
} }
/* /*
* CreateShardConnectionHash constructs a hash table used for shardId->Connection * CreateShardConnectionHash constructs a hash table which maps from shard
* mapping. * identifier to connection lists, passing the provided MemoryContext to
* hash_create for hash allocations.
*/ */
HTAB * HTAB *
CreateShardConnectionHash(void) CreateShardConnectionHash(MemoryContext memoryContext)
{ {
HTAB *shardConnectionsHash = NULL; HTAB *shardConnectionsHash = NULL;
int hashFlags = 0; int hashFlags = 0;
@ -92,10 +77,9 @@ CreateShardConnectionHash(void)
memset(&info, 0, sizeof(info)); memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64); info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections); info.entrysize = sizeof(ShardConnections);
info.hash = tag_hash; info.hcxt = memoryContext;
info.hcxt = TopTransactionContext; hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
shardConnectionsHash = hash_create("Shard Connections Hash", shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_CONNECTION_CACHE_SIZE, &info, INITIAL_CONNECTION_CACHE_SIZE, &info,
hashFlags); hashFlags);
@ -105,80 +89,106 @@ CreateShardConnectionHash(void)
/* /*
* OpenConnectionsToShardPlacements opens connections to all placements of the * BeginTransactionOnShardPlacements opens new connections (if necessary) to
* shard with the given shardId and populates the shardConnectionHash table * all placements of a shard (specified by shard identifier). After sending a
* accordingly. * BEGIN command on all connections, they are added to shardConnectionHash for
* use within this transaction. Exits early if connections already exist for
* the specified shard, and errors if no placements can be found, a connection
* cannot be made, or if the BEGIN command fails.
*/ */
void void
OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
char *userName)
{ {
List *shardPlacementList = NIL;
ListCell *placementCell = NULL;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false; bool shardConnectionsFound = false;
/* get existing connections to the shard placements, if any */ MemoryContext oldContext = NULL;
ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, shardPlacementList = FinalizedShardPlacementList(shardId);
shardId,
&shardConnectionsFound);
List *shardPlacementList = FinalizedShardPlacementList(shardId);
ListCell *shardPlacementCell = NULL;
List *connectionList = NIL;
Assert(!shardConnectionsFound);
if (shardPlacementList == NIL) if (shardPlacementList == NIL)
{ {
/* going to have to have some placements to do any work */
ereport(ERROR, (errmsg("could not find any shard placements for the shard " ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId))); UINT64_FORMAT, shardId)));
} }
foreach(shardPlacementCell, shardPlacementList) /* get existing connections to the shard placements, if any */
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
if (shardConnectionsFound)
{ {
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( /* exit early if we've already established shard transactions */
shardPlacementCell); return;
char *workerName = shardPlacement->nodeName; }
uint32 workerPort = shardPlacement->nodePort;
PGconn *connection = ConnectToNode(workerName, workerPort, userName); foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
PGconn *connection = NULL;
TransactionConnection *transactionConnection = NULL; TransactionConnection *transactionConnection = NULL;
PGresult *result = NULL;
connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort,
userName);
if (connection == NULL) if (connection == NULL)
{ {
List *abortConnectionList = ConnectionList(shardConnectionHash);
CloseConnections(abortConnectionList);
ereport(ERROR, (errmsg("could not establish a connection to all " ereport(ERROR, (errmsg("could not establish a connection to all "
"placements of shard %lu", shardId))); "placements of shard %lu", shardId)));
} }
/* entries must last through the whole top-level transaction */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
transactionConnection = palloc0(sizeof(TransactionConnection)); transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->connectionId = shardConnections->shardId; transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID; transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
transactionConnection->connection = connection; transactionConnection->connection = connection;
connectionList = lappend(connectionList, transactionConnection); shardConnections->connectionList = lappend(shardConnections->connectionList,
} transactionConnection);
shardConnections->connectionList = connectionList; MemoryContextSwitchTo(oldContext);
/* now that connection is tracked, issue BEGIN */
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
}
} }
/* /*
* GetShardConnections finds existing connections for a shard in the hash. * GetShardConnections finds existing connections for a shard in the global
* If not found, then a ShardConnections structure with empty connectionList * connection hash. If not found, then a ShardConnections structure with empty
* is returned. * connectionList is returned and the shardConnectionsFound output parameter
* will be set to false.
*/ */
ShardConnections * ShardConnections *
GetShardConnections(HTAB *shardConnectionHash, int64 shardId, GetShardConnections(int64 shardId, bool *shardConnectionsFound)
bool *shardConnectionsFound) {
return GetShardHashConnections(shardConnectionHash, shardId, shardConnectionsFound);
}
/*
* GetShardHashConnections finds existing connections for a shard in the
* provided hash. If not found, then a ShardConnections structure with empty
* connectionList is returned.
*/
ShardConnections *
GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound)
{ {
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(shardConnectionHash, shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId,
&shardId, HASH_ENTER, connectionsFound);
HASH_ENTER, if (!*connectionsFound)
shardConnectionsFound);
if (!*shardConnectionsFound)
{ {
shardConnections->shardId = shardId; shardConnections->shardId = shardId;
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
@ -198,6 +208,11 @@ ConnectionList(HTAB *connectionHash)
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
if (connectionHash == NULL)
{
return NIL;
}
hash_seq_init(&status, connectionHash); hash_seq_init(&status, connectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status); shardConnections = (ShardConnections *) hash_seq_search(&status);
@ -214,16 +229,31 @@ ConnectionList(HTAB *connectionHash)
/* /*
* EnableXactCallback ensures the XactCallback for committing/aborting * InstallMultiShardXactShmemHook simply installs a hook (intended to be called
* remote worker transactions is registered. * once during backend startup), which will itself register all the transaction
* callbacks needed by multi-shard transaction logic.
*/ */
void void
RegisterShardPlacementXactCallback(void) InstallMultiShardXactShmemHook(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = RegisterShardPlacementXactCallbacks;
}
/*
* RegisterShardPlacementXactCallbacks registers transaction callbacks needed
* for multi-shard transactions before calling previous shmem startup hooks.
*/
static void
RegisterShardPlacementXactCallbacks(void)
{ {
if (!isXactCallbackRegistered)
{
RegisterXactCallback(CompleteShardPlacementTransactions, NULL); RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
isXactCallbackRegistered = true; RegisterSubXactCallback(MultiShardSubXactCallback, NULL);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
} }
} }
@ -232,16 +262,28 @@ RegisterShardPlacementXactCallback(void)
* CompleteShardPlacementTransactions commits or aborts pending shard placement * CompleteShardPlacementTransactions commits or aborts pending shard placement
* transactions when the local transaction commits or aborts. * transactions when the local transaction commits or aborts.
*/ */
void static void
CompleteShardPlacementTransactions(XactEvent event, void *arg) CompleteShardPlacementTransactions(XactEvent event, void *arg)
{ {
if (shardPlacementConnectionList == NIL) List *connectionList = ConnectionList(shardConnectionHash);
if (shardConnectionHash == NULL)
{ {
/* nothing to do */ /* nothing to do */
return; return;
} }
else if (event == XACT_EVENT_PRE_COMMIT)
if (event == XACT_EVENT_PRE_COMMIT)
{ {
if (subXactAbortAttempted)
{
subXactAbortAttempted = false;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
"which modify distributed tables")));
}
/* /*
* Any failure here will cause local changes to be rolled back, * Any failure here will cause local changes to be rolled back,
* and remote changes to either roll back (1PC) or, in case of * and remote changes to either roll back (1PC) or, in case of
@ -251,7 +293,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{ {
PrepareRemoteTransactions(shardPlacementConnectionList); PrepareRemoteTransactions(connectionList);
} }
return; return;
@ -265,7 +307,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
* changes have already been committed. * changes have already been committed.
*/ */
CommitRemoteTransactions(shardPlacementConnectionList, false); CommitRemoteTransactions(connectionList, false);
} }
else if (event == XACT_EVENT_ABORT) else if (event == XACT_EVENT_ABORT)
{ {
@ -276,16 +318,28 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
* already been rolled back. * already been rolled back.
*/ */
AbortRemoteTransactions(shardPlacementConnectionList); AbortRemoteTransactions(connectionList);
} }
else else
{ {
return; return;
} }
CloseConnections(shardPlacementConnectionList); CloseConnections(connectionList);
shardPlacementConnectionList = NIL; shardConnectionHash = NULL;
XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationLevel = XACT_MODIFICATION_NONE;
subXactAbortAttempted = false;
}
static void
MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg)
{
if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
{
subXactAbortAttempted = true;
}
} }

View File

@ -377,13 +377,6 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
sprintf(nodePortString, "%d", nodePort); sprintf(nodePortString, "%d", nodePort);
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
Assert(sizeof(keywordArray) == sizeof(valueArray)); Assert(sizeof(keywordArray) == sizeof(valueArray));
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++) for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)

View File

@ -54,7 +54,6 @@ extern int MultiShardCommitProtocol;
/* Functions declarations for transaction and connection management */ /* Functions declarations for transaction and connection management */
extern void InitializeDistributedTransaction(void); extern void InitializeDistributedTransaction(void);
extern void CompleteShardPlacementTransactions(XactEvent event, void *arg);
extern void PrepareRemoteTransactions(List *connectionList); extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);

View File

@ -25,16 +25,15 @@ typedef struct ShardConnections
} ShardConnections; } ShardConnections;
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner);
char *relationOwner); extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern HTAB * CreateShardConnectionHash(void); extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser);
extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
char *nodeUser); extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, bool *connectionsFound);
int64 shardId,
bool *shardConnectionsFound);
extern List * ConnectionList(HTAB *connectionHash); extern List * ConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList); extern void CloseConnections(List *connectionList);
extern void InstallMultiShardXactShmemHook(void);
#endif /* MULTI_SHARD_TRANSACTION_H */ #endif /* MULTI_SHARD_TRANSACTION_H */

View File

@ -155,7 +155,7 @@ ABORT;
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
COMMIT; COMMIT;
-- whether it occurs first or second -- whether it occurs first or second
BEGIN; BEGIN;
@ -182,7 +182,7 @@ SELECT * FROM labs WHERE id = 6;
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: cannot open new connections after the first modification command within a transaction ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani" CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT; COMMIT;
-- though it will work if before any modifications -- though it will work if before any modifications
@ -200,7 +200,7 @@ COMMIT;
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: cannot open new connections after the first modification command within a transaction ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety" CONTEXT: COPY labs, line 1: "12,fsociety"
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11;
@ -213,7 +213,7 @@ SELECT name FROM labs WHERE id = 11;
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: cannot open new connections after the first modification command within a transaction ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety,lol" CONTEXT: COPY labs, line 1: "12,fsociety,lol"
COMMIT; COMMIT;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
@ -233,7 +233,7 @@ SELECT * FROM labs WHERE id = 12;
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
COMMIT; COMMIT;
-- the DDL fails, but copy persists -- the DDL fails, but copy persists
\d labs \d labs

View File

@ -173,14 +173,114 @@ COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_2; DROP INDEX temp_index_2;
-- but that multiple ddl statements in a block results in ROLLBACK -- and so are multiple ddl statements
BEGIN; BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ALTER TABLE lineitem_alter ADD COLUMN first integer; ALTER TABLE lineitem_alter ADD COLUMN first integer;
COMMIT; COMMIT;
\d lineitem_alter
ALTER TABLE lineitem_alter DROP COLUMN first;
DROP INDEX temp_index_2;
-- ensure that user-specified rollback causes full rollback
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
-- and distributed SELECTs cannot appear after ALTER -- ensure that errors cause full rollback
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ROLLBACK;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
-- verify that SAVEPOINT is allowed...
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
-- but that actually rolling back to it is not
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK TO my_savepoint;
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
-- Add column on only one worker...
\c - - - :worker_2_port
ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer;
\c - - - :master_port
-- and try to add it in a multi-statement block, which fails
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ALTER TABLE lineitem_alter ADD COLUMN first integer;
COMMIT;
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
CREATE TABLE single_shard_items (id integer, name text);
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
SELECT master_create_worker_shards('single_shard_items', 1, 2);
-- Drop the column from the worker...
\c - - - :worker_2_port
ALTER TABLE lineitem_alter_220000 DROP COLUMN first;
-- Create table to trigger at-xact-end (deferred) failure
CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED);
-- Use an event trigger to log all DDL event tags in it
CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$
BEGIN
INSERT INTO ddl_commands VALUES (tg_tag);
END;
$ldt$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag();
\c - - - :master_port
-- The above trigger will cause failure at transaction end on one placement.
-- We'll test 2PC first, as it should handle this "best" (no divergence)
SET citus.multi_shard_commit_protocol TO '2pc';
BEGIN;
CREATE INDEX single_index_2 ON single_shard_items(id);
CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
-- Now try with 2pc off
RESET citus.multi_shard_commit_protocol;
BEGIN;
CREATE INDEX single_index_2 ON single_shard_items(id);
CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
-- The block should have committed with a warning
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
\c - - - :worker_2_port
DROP EVENT TRIGGER log_ddl_tag;
DROP FUNCTION log_ddl_tag();
DROP TABLE ddl_commands;
\c - - - :master_port
-- Distributed SELECTs cannot appear after ALTER
BEGIN; BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SELECT l_orderkey FROM lineitem_alter LIMIT 0; SELECT l_orderkey FROM lineitem_alter LIMIT 0;

View File

@ -436,20 +436,169 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
(1 row) (1 row)
DROP INDEX temp_index_2; DROP INDEX temp_index_2;
-- but that multiple ddl statements in a block results in ROLLBACK -- and so are multiple ddl statements
BEGIN; BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ALTER TABLE lineitem_alter ADD COLUMN first integer; ALTER TABLE lineitem_alter ADD COLUMN first integer;
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
COMMIT; COMMIT;
\d lineitem_alter
Table "public.lineitem_alter"
Column | Type | Modifiers
-----------------+-----------------------+-----------
l_orderkey | bigint | not null
l_partkey | integer | not null
l_suppkey | integer | not null
l_linenumber | integer | not null
l_quantity | numeric(15,2) | not null
l_extendedprice | numeric(15,2) | not null
l_discount | numeric(15,2) | not null
l_tax | numeric(15,2) | not null
l_returnflag | character(1) | not null
l_linestatus | character(1) | not null
l_shipdate | date | not null
l_commitdate | date | not null
l_receiptdate | date | not null
l_shipinstruct | character(25) | not null
l_shipmode | character(10) | not null
l_comment | character varying(44) | not null
null_column | integer |
first | integer |
Indexes:
"temp_index_2" btree (l_orderkey)
ALTER TABLE lineitem_alter DROP COLUMN first;
DROP INDEX temp_index_2;
-- ensure that user-specified rollback causes full rollback
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename indexname | tablename
-----------+----------- -----------+-----------
(0 rows) (0 rows)
-- and distributed SELECTs cannot appear after ALTER -- ensure that errors cause full rollback
BEGIN; BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ERROR: relation "temp_index_2" already exists
ROLLBACK;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
-- verify that SAVEPOINT is allowed...
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK;
-- but that actually rolling back to it is not
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
SAVEPOINT my_savepoint;
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
ROLLBACK TO my_savepoint;
COMMIT;
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
-- Add column on only one worker...
\c - - - :worker_2_port
ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer;
\c - - - :master_port
-- and try to add it in a multi-statement block, which fails
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE lineitem_alter ADD COLUMN first integer;
WARNING: column "first" of relation "lineitem_alter_220000" already exists
CONTEXT: while executing command on localhost:57638
ERROR: could not execute DDL command on worker node shards
COMMIT;
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
CREATE TABLE single_shard_items (id integer, name text);
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('single_shard_items', 1, 2);
master_create_worker_shards
-----------------------------
(1 row)
-- Drop the column from the worker...
\c - - - :worker_2_port
ALTER TABLE lineitem_alter_220000 DROP COLUMN first;
-- Create table to trigger at-xact-end (deferred) failure
CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED);
-- Use an event trigger to log all DDL event tags in it
CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$
BEGIN
INSERT INTO ddl_commands VALUES (tg_tag);
END;
$ldt$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag();
\c - - - :master_port
-- The above trigger will cause failure at transaction end on one placement.
-- We'll test 2PC first, as it should handle this "best" (no divergence)
SET citus.multi_shard_commit_protocol TO '2pc';
BEGIN;
CREATE INDEX single_index_2 ON single_shard_items(id);
CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
DETAIL: Key (command)=(CREATE INDEX) already exists.
CONTEXT: while executing command on localhost:57638
ERROR: failed to prepare transaction
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
indexname | tablename
-----------+-----------
(0 rows)
-- Now try with 2pc off
RESET citus.multi_shard_commit_protocol;
BEGIN;
CREATE INDEX single_index_2 ON single_shard_items(id);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
WARNING: failed to commit transaction on localhost:57638
-- The block should have committed with a warning
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
indexname | tablename
----------------+--------------------
single_index_2 | single_shard_items
single_index_3 | single_shard_items
(2 rows)
\c - - - :worker_2_port
DROP EVENT TRIGGER log_ddl_tag;
DROP FUNCTION log_ddl_tag();
DROP TABLE ddl_commands;
\c - - - :master_port
-- Distributed SELECTs cannot appear after ALTER
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT l_orderkey FROM lineitem_alter LIMIT 0; SELECT l_orderkey FROM lineitem_alter LIMIT 0;
ERROR: cannot open new connections after the first modification command within a transaction ERROR: cannot open new connections after the first modification command within a transaction
COMMIT; COMMIT;
@ -502,7 +651,7 @@ SELECT master_create_worker_shards('test_ab', 8, 2);
INSERT INTO test_ab VALUES (2, 10); INSERT INTO test_ab VALUES (2, 10);
INSERT INTO test_ab VALUES (2, 11); INSERT INTO test_ab VALUES (2, 11);
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
WARNING: could not create unique index "temp_unique_index_1_220021" WARNING: could not create unique index "temp_unique_index_1_220022"
DETAIL: Key (a)=(2) is duplicated. DETAIL: Key (a)=(2) is duplicated.
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
ERROR: could not execute DDL command on worker node shards ERROR: could not execute DDL command on worker node shards
@ -550,7 +699,8 @@ ORDER BY attnum;
null_column | integer null_column | integer
........pg.dropped.22........ | - ........pg.dropped.22........ | -
........pg.dropped.23........ | - ........pg.dropped.23........ | -
(29 rows) ........pg.dropped.24........ | -
(30 rows)
\c - - - :master_port \c - - - :master_port
-- verify that we don't intercept DDL commands if propagation is turned off -- verify that we don't intercept DDL commands if propagation is turned off