mirror of https://github.com/citusdata/citus.git
Permit multiple DDL commands in a transaction
Three changes here to get to true multi-statement, multi-relation DDL transactions (same functionality pre-5.2, with benefits of atomicity): 1. Changed the multi-shard utility hook to always run (consistency with router executor hook, removes ad-hoc "installed" boolean) 2. Change the global connection list in multi_shard_transaction to instead be a hash; update related functions to operate on global hash instead of local hash/global list 3. Remove check within DDL code to prevent subsequent DDL commands; place unset/reset guard around call to ConnectToNode to permit connecting to additional nodes after DDL transaction has begun In addition, code has been added to raise an error if a ROLLBACK TO SAVEPOINT is attempted (similar to router executor), and comprehensive tests execute all multi-DDL scenarios (full success, user ROLLBACK, any actual errors (say, duplicate index), partial failure (duplicate index on one node but not others), partial COMMIT (one node fails), and 2PC partial PREPARE (one node fails)). Interleavings with other commands (DML, \copy) are similarly all covered.pull/764/head
parent
8b3286b1f5
commit
74f4e0003b
|
@ -270,6 +270,14 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
|||
int32 nodePort = masterNodeAddress->nodePort;
|
||||
char *nodeUser = CurrentUserName();
|
||||
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed copy operations must not appear in "
|
||||
"transaction blocks containing other distributed "
|
||||
"modifications")));
|
||||
}
|
||||
|
||||
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||
|
||||
PG_TRY();
|
||||
|
@ -363,7 +371,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
ShardInterval **shardIntervalCache = NULL;
|
||||
bool useBinarySearch = false;
|
||||
|
||||
HTAB *shardConnectionHash = NULL;
|
||||
HTAB *copyConnectionHash = NULL;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
|
||||
|
@ -465,7 +473,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
|
||||
* documentation).
|
||||
*/
|
||||
shardConnectionHash = CreateShardConnectionHash();
|
||||
copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
|
||||
|
||||
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
|
||||
PG_TRY();
|
||||
|
@ -534,9 +542,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
/* get existing connections to the shard placements, if any */
|
||||
shardConnections = GetShardConnections(shardConnectionHash,
|
||||
shardId,
|
||||
&shardConnectionsFound);
|
||||
shardConnections = GetShardHashConnections(copyConnectionHash, shardId,
|
||||
&shardConnectionsFound);
|
||||
if (!shardConnectionsFound)
|
||||
{
|
||||
/* open connections and initiate COPY on shard placements */
|
||||
|
@ -560,7 +567,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
processedRowCount += 1;
|
||||
}
|
||||
|
||||
connectionList = ConnectionList(shardConnectionHash);
|
||||
connectionList = ConnectionList(copyConnectionHash);
|
||||
|
||||
/* send copy binary footers to all shard placements */
|
||||
if (copyOutState->binary)
|
||||
|
@ -590,7 +597,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
List *abortConnectionList = NIL;
|
||||
|
||||
/* roll back all transactions */
|
||||
abortConnectionList = ConnectionList(shardConnectionHash);
|
||||
abortConnectionList = ConnectionList(copyConnectionHash);
|
||||
EndRemoteCopy(abortConnectionList, false);
|
||||
AbortRemoteTransactions(abortConnectionList);
|
||||
CloseConnections(abortConnectionList);
|
||||
|
@ -936,6 +943,14 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
|||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed copy operations must not appear in "
|
||||
"transaction blocks containing other distributed "
|
||||
"modifications")));
|
||||
}
|
||||
|
||||
foreach(placementCell, finalizedPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||
|
|
|
@ -1255,11 +1255,11 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
|||
{
|
||||
bool executionOK = false;
|
||||
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed DDL commands must not appear within "
|
||||
"transaction blocks containing other modifications")));
|
||||
"transaction blocks containing data modifications")));
|
||||
}
|
||||
|
||||
ShowNoticeIfNotUsing2PC();
|
||||
|
@ -1322,18 +1322,12 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
|
|||
{
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
char *tableOwner = TableOwner(relationId);
|
||||
HTAB *shardConnectionHash = NULL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||
|
||||
LockShards(shardIntervalList, ShareLock);
|
||||
|
||||
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
|
||||
tableOwner);
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
|
@ -1345,9 +1339,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
|
|||
char *escapedCommandString = quote_literal_cstr(commandString);
|
||||
StringInfo applyCommand = makeStringInfo();
|
||||
|
||||
shardConnections = GetShardConnections(shardConnectionHash,
|
||||
shardId,
|
||||
&shardConnectionsFound);
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
Assert(shardConnectionsFound);
|
||||
|
||||
/* build the shard ddl command */
|
||||
|
|
|
@ -214,15 +214,9 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
|||
{
|
||||
int affectedTupleCount = 0;
|
||||
char *relationOwner = TableOwner(relationId);
|
||||
HTAB *shardConnectionHash = NULL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||
|
||||
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
|
||||
relationOwner);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
|
@ -236,9 +230,7 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
|||
char *shardQueryStringData = NULL;
|
||||
int shardAffectedTupleCount = -1;
|
||||
|
||||
shardConnections = GetShardConnections(shardConnectionHash,
|
||||
shardId,
|
||||
&shardConnectionsFound);
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
Assert(shardConnectionsFound);
|
||||
|
||||
deparse_shard_query(query, relationId, shardId, shardQueryString);
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -152,8 +153,9 @@ _PG_init(void)
|
|||
/* initialize worker node manager */
|
||||
WorkerNodeRegister();
|
||||
|
||||
/* initialize router executor callbacks */
|
||||
/* initialize transaction callbacks */
|
||||
InstallRouterExecutorShmemHook();
|
||||
InstallMultiShardXactShmemHook();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,72 +18,57 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void RegisterShardPlacementXactCallback(void);
|
||||
|
||||
|
||||
/* Global variables used in commit handler */
|
||||
static List *shardPlacementConnectionList = NIL;
|
||||
static bool isXactCallbackRegistered = false;
|
||||
static HTAB *shardConnectionHash = NULL;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
static bool subXactAbortAttempted = false;
|
||||
|
||||
/* functions needed by callbacks and hooks */
|
||||
static void RegisterShardPlacementXactCallbacks(void);
|
||||
static void CompleteShardPlacementTransactions(XactEvent event, void *arg);
|
||||
static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
|
||||
SubTransactionId parentSubid, void *arg);
|
||||
|
||||
|
||||
/*
|
||||
* OpenTransactionsToAllShardPlacements opens connections to all placements of
|
||||
* the given shard Id Pointer List and returns the hash table containing the connections.
|
||||
* The resulting hash table maps shardIds to ShardConnection structs.
|
||||
* OpenTransactionsToAllShardPlacements opens connections to all placements
|
||||
* using the provided shard identifier list. Connections accumulate in a global
|
||||
* shardConnectionHash variable for use (and re-use) within this transaction.
|
||||
*/
|
||||
HTAB *
|
||||
void
|
||||
OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
|
||||
{
|
||||
HTAB *shardConnectionHash = CreateShardConnectionHash();
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
ListCell *connectionCell = NULL;
|
||||
List *connectionList = NIL;
|
||||
|
||||
if (shardConnectionHash == NULL)
|
||||
{
|
||||
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
|
||||
}
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
|
||||
OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName);
|
||||
BeginTransactionOnShardPlacements(shardId, userName);
|
||||
}
|
||||
|
||||
connectionList = ConnectionList(shardConnectionHash);
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
PGresult *result = NULL;
|
||||
|
||||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReraiseRemoteError(connection, result);
|
||||
}
|
||||
}
|
||||
|
||||
shardPlacementConnectionList = ConnectionList(shardConnectionHash);
|
||||
|
||||
RegisterShardPlacementXactCallback();
|
||||
|
||||
return shardConnectionHash;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardConnectionHash constructs a hash table used for shardId->Connection
|
||||
* mapping.
|
||||
* CreateShardConnectionHash constructs a hash table which maps from shard
|
||||
* identifier to connection lists, passing the provided MemoryContext to
|
||||
* hash_create for hash allocations.
|
||||
*/
|
||||
HTAB *
|
||||
CreateShardConnectionHash(void)
|
||||
CreateShardConnectionHash(MemoryContext memoryContext)
|
||||
{
|
||||
HTAB *shardConnectionsHash = NULL;
|
||||
int hashFlags = 0;
|
||||
|
@ -92,10 +77,9 @@ CreateShardConnectionHash(void)
|
|||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(int64);
|
||||
info.entrysize = sizeof(ShardConnections);
|
||||
info.hash = tag_hash;
|
||||
info.hcxt = TopTransactionContext;
|
||||
info.hcxt = memoryContext;
|
||||
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||
|
||||
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
|
||||
shardConnectionsHash = hash_create("Shard Connections Hash",
|
||||
INITIAL_CONNECTION_CACHE_SIZE, &info,
|
||||
hashFlags);
|
||||
|
@ -105,80 +89,106 @@ CreateShardConnectionHash(void)
|
|||
|
||||
|
||||
/*
|
||||
* OpenConnectionsToShardPlacements opens connections to all placements of the
|
||||
* shard with the given shardId and populates the shardConnectionHash table
|
||||
* accordingly.
|
||||
* BeginTransactionOnShardPlacements opens new connections (if necessary) to
|
||||
* all placements of a shard (specified by shard identifier). After sending a
|
||||
* BEGIN command on all connections, they are added to shardConnectionHash for
|
||||
* use within this transaction. Exits early if connections already exist for
|
||||
* the specified shard, and errors if no placements can be found, a connection
|
||||
* cannot be made, or if the BEGIN command fails.
|
||||
*/
|
||||
void
|
||||
OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash,
|
||||
char *userName)
|
||||
BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||
{
|
||||
List *shardPlacementList = NIL;
|
||||
ListCell *placementCell = NULL;
|
||||
|
||||
ShardConnections *shardConnections = NULL;
|
||||
bool shardConnectionsFound = false;
|
||||
|
||||
/* get existing connections to the shard placements, if any */
|
||||
ShardConnections *shardConnections = GetShardConnections(shardConnectionHash,
|
||||
shardId,
|
||||
&shardConnectionsFound);
|
||||
|
||||
List *shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
List *connectionList = NIL;
|
||||
|
||||
Assert(!shardConnectionsFound);
|
||||
MemoryContext oldContext = NULL;
|
||||
shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
if (shardPlacementList == NIL)
|
||||
{
|
||||
/* going to have to have some placements to do any work */
|
||||
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
|
||||
UINT64_FORMAT, shardId)));
|
||||
}
|
||||
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
/* get existing connections to the shard placements, if any */
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
if (shardConnectionsFound)
|
||||
{
|
||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(
|
||||
shardPlacementCell);
|
||||
char *workerName = shardPlacement->nodeName;
|
||||
uint32 workerPort = shardPlacement->nodePort;
|
||||
PGconn *connection = ConnectToNode(workerName, workerPort, userName);
|
||||
/* exit early if we've already established shard transactions */
|
||||
return;
|
||||
}
|
||||
|
||||
foreach(placementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
|
||||
PGconn *connection = NULL;
|
||||
TransactionConnection *transactionConnection = NULL;
|
||||
PGresult *result = NULL;
|
||||
|
||||
connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort,
|
||||
userName);
|
||||
|
||||
if (connection == NULL)
|
||||
{
|
||||
List *abortConnectionList = ConnectionList(shardConnectionHash);
|
||||
CloseConnections(abortConnectionList);
|
||||
|
||||
ereport(ERROR, (errmsg("could not establish a connection to all "
|
||||
"placements of shard %lu", shardId)));
|
||||
}
|
||||
|
||||
/* entries must last through the whole top-level transaction */
|
||||
oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||
|
||||
transactionConnection = palloc0(sizeof(TransactionConnection));
|
||||
|
||||
transactionConnection->connectionId = shardConnections->shardId;
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
|
||||
transactionConnection->connection = connection;
|
||||
|
||||
connectionList = lappend(connectionList, transactionConnection);
|
||||
}
|
||||
shardConnections->connectionList = lappend(shardConnections->connectionList,
|
||||
transactionConnection);
|
||||
|
||||
shardConnections->connectionList = connectionList;
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
/* now that connection is tracked, issue BEGIN */
|
||||
result = PQexec(connection, "BEGIN");
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
ReraiseRemoteError(connection, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetShardConnections finds existing connections for a shard in the hash.
|
||||
* If not found, then a ShardConnections structure with empty connectionList
|
||||
* is returned.
|
||||
* GetShardConnections finds existing connections for a shard in the global
|
||||
* connection hash. If not found, then a ShardConnections structure with empty
|
||||
* connectionList is returned and the shardConnectionsFound output parameter
|
||||
* will be set to false.
|
||||
*/
|
||||
ShardConnections *
|
||||
GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
|
||||
bool *shardConnectionsFound)
|
||||
GetShardConnections(int64 shardId, bool *shardConnectionsFound)
|
||||
{
|
||||
return GetShardHashConnections(shardConnectionHash, shardId, shardConnectionsFound);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetShardHashConnections finds existing connections for a shard in the
|
||||
* provided hash. If not found, then a ShardConnections structure with empty
|
||||
* connectionList is returned.
|
||||
*/
|
||||
ShardConnections *
|
||||
GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFound)
|
||||
{
|
||||
ShardConnections *shardConnections = NULL;
|
||||
|
||||
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
|
||||
&shardId,
|
||||
HASH_ENTER,
|
||||
shardConnectionsFound);
|
||||
if (!*shardConnectionsFound)
|
||||
shardConnections = (ShardConnections *) hash_search(connectionHash, &shardId,
|
||||
HASH_ENTER, connectionsFound);
|
||||
if (!*connectionsFound)
|
||||
{
|
||||
shardConnections->shardId = shardId;
|
||||
shardConnections->connectionList = NIL;
|
||||
|
@ -198,6 +208,11 @@ ConnectionList(HTAB *connectionHash)
|
|||
HASH_SEQ_STATUS status;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
|
||||
if (connectionHash == NULL)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
hash_seq_init(&status, connectionHash);
|
||||
|
||||
shardConnections = (ShardConnections *) hash_seq_search(&status);
|
||||
|
@ -214,16 +229,31 @@ ConnectionList(HTAB *connectionHash)
|
|||
|
||||
|
||||
/*
|
||||
* EnableXactCallback ensures the XactCallback for committing/aborting
|
||||
* remote worker transactions is registered.
|
||||
* InstallMultiShardXactShmemHook simply installs a hook (intended to be called
|
||||
* once during backend startup), which will itself register all the transaction
|
||||
* callbacks needed by multi-shard transaction logic.
|
||||
*/
|
||||
void
|
||||
RegisterShardPlacementXactCallback(void)
|
||||
InstallMultiShardXactShmemHook(void)
|
||||
{
|
||||
if (!isXactCallbackRegistered)
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = RegisterShardPlacementXactCallbacks;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RegisterShardPlacementXactCallbacks registers transaction callbacks needed
|
||||
* for multi-shard transactions before calling previous shmem startup hooks.
|
||||
*/
|
||||
static void
|
||||
RegisterShardPlacementXactCallbacks(void)
|
||||
{
|
||||
RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
|
||||
RegisterSubXactCallback(MultiShardSubXactCallback, NULL);
|
||||
|
||||
if (prev_shmem_startup_hook != NULL)
|
||||
{
|
||||
RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
|
||||
isXactCallbackRegistered = true;
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -232,16 +262,28 @@ RegisterShardPlacementXactCallback(void)
|
|||
* CompleteShardPlacementTransactions commits or aborts pending shard placement
|
||||
* transactions when the local transaction commits or aborts.
|
||||
*/
|
||||
void
|
||||
static void
|
||||
CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
||||
{
|
||||
if (shardPlacementConnectionList == NIL)
|
||||
List *connectionList = ConnectionList(shardConnectionHash);
|
||||
|
||||
if (shardConnectionHash == NULL)
|
||||
{
|
||||
/* nothing to do */
|
||||
return;
|
||||
}
|
||||
else if (event == XACT_EVENT_PRE_COMMIT)
|
||||
|
||||
if (event == XACT_EVENT_PRE_COMMIT)
|
||||
{
|
||||
if (subXactAbortAttempted)
|
||||
{
|
||||
subXactAbortAttempted = false;
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
||||
"which modify distributed tables")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Any failure here will cause local changes to be rolled back,
|
||||
* and remote changes to either roll back (1PC) or, in case of
|
||||
|
@ -251,7 +293,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|||
|
||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||
{
|
||||
PrepareRemoteTransactions(shardPlacementConnectionList);
|
||||
PrepareRemoteTransactions(connectionList);
|
||||
}
|
||||
|
||||
return;
|
||||
|
@ -265,7 +307,7 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|||
* changes have already been committed.
|
||||
*/
|
||||
|
||||
CommitRemoteTransactions(shardPlacementConnectionList, false);
|
||||
CommitRemoteTransactions(connectionList, false);
|
||||
}
|
||||
else if (event == XACT_EVENT_ABORT)
|
||||
{
|
||||
|
@ -276,16 +318,28 @@ CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|||
* already been rolled back.
|
||||
*/
|
||||
|
||||
AbortRemoteTransactions(shardPlacementConnectionList);
|
||||
AbortRemoteTransactions(connectionList);
|
||||
}
|
||||
else
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
CloseConnections(shardPlacementConnectionList);
|
||||
shardPlacementConnectionList = NIL;
|
||||
CloseConnections(connectionList);
|
||||
shardConnectionHash = NULL;
|
||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
subXactAbortAttempted = false;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
|
||||
SubTransactionId parentSubid, void *arg)
|
||||
{
|
||||
if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
|
||||
{
|
||||
subXactAbortAttempted = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -377,13 +377,6 @@ ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
|
|||
|
||||
sprintf(nodePortString, "%d", nodePort);
|
||||
|
||||
if (XactModificationLevel > XACT_MODIFICATION_NONE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("cannot open new connections after the first modification "
|
||||
"command within a transaction")));
|
||||
}
|
||||
|
||||
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
||||
|
||||
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
||||
|
|
|
@ -54,7 +54,6 @@ extern int MultiShardCommitProtocol;
|
|||
|
||||
/* Functions declarations for transaction and connection management */
|
||||
extern void InitializeDistributedTransaction(void);
|
||||
extern void CompleteShardPlacementTransactions(XactEvent event, void *arg);
|
||||
extern void PrepareRemoteTransactions(List *connectionList);
|
||||
extern void AbortRemoteTransactions(List *connectionList);
|
||||
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
|
||||
|
|
|
@ -25,16 +25,15 @@ typedef struct ShardConnections
|
|||
} ShardConnections;
|
||||
|
||||
|
||||
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
|
||||
char *relationOwner);
|
||||
extern HTAB * CreateShardConnectionHash(void);
|
||||
extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash,
|
||||
char *nodeUser);
|
||||
extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
|
||||
int64 shardId,
|
||||
bool *shardConnectionsFound);
|
||||
extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner);
|
||||
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
|
||||
extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser);
|
||||
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
|
||||
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
|
||||
bool *connectionsFound);
|
||||
extern List * ConnectionList(HTAB *connectionHash);
|
||||
extern void CloseConnections(List *connectionList);
|
||||
extern void InstallMultiShardXactShmemHook(void);
|
||||
|
||||
|
||||
#endif /* MULTI_SHARD_TRANSACTION_H */
|
||||
|
|
|
@ -155,7 +155,7 @@ ABORT;
|
|||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
||||
COMMIT;
|
||||
-- whether it occurs first or second
|
||||
BEGIN;
|
||||
|
@ -182,7 +182,7 @@ SELECT * FROM labs WHERE id = 6;
|
|||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
|
||||
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
|
||||
COMMIT;
|
||||
-- though it will work if before any modifications
|
||||
|
@ -200,7 +200,7 @@ COMMIT;
|
|||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
|
||||
CONTEXT: COPY labs, line 1: "12,fsociety"
|
||||
COMMIT;
|
||||
SELECT name FROM labs WHERE id = 11;
|
||||
|
@ -213,7 +213,7 @@ SELECT name FROM labs WHERE id = 11;
|
|||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
\copy labs from stdin delimiter ','
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
|
||||
CONTEXT: COPY labs, line 1: "12,fsociety,lol"
|
||||
COMMIT;
|
||||
-- but the DDL should correctly roll back
|
||||
|
@ -233,7 +233,7 @@ SELECT * FROM labs WHERE id = 12;
|
|||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
||||
COMMIT;
|
||||
-- the DDL fails, but copy persists
|
||||
\d labs
|
||||
|
|
|
@ -173,14 +173,114 @@ COMMIT;
|
|||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
DROP INDEX temp_index_2;
|
||||
|
||||
-- but that multiple ddl statements in a block results in ROLLBACK
|
||||
-- and so are multiple ddl statements
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
COMMIT;
|
||||
|
||||
\d lineitem_alter
|
||||
|
||||
ALTER TABLE lineitem_alter DROP COLUMN first;
|
||||
DROP INDEX temp_index_2;
|
||||
|
||||
-- ensure that user-specified rollback causes full rollback
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK;
|
||||
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
|
||||
-- and distributed SELECTs cannot appear after ALTER
|
||||
-- ensure that errors cause full rollback
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ROLLBACK;
|
||||
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
|
||||
-- verify that SAVEPOINT is allowed...
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SAVEPOINT my_savepoint;
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK;
|
||||
|
||||
-- but that actually rolling back to it is not
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SAVEPOINT my_savepoint;
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK TO my_savepoint;
|
||||
COMMIT;
|
||||
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
|
||||
-- Add column on only one worker...
|
||||
\c - - - :worker_2_port
|
||||
ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer;
|
||||
\c - - - :master_port
|
||||
|
||||
-- and try to add it in a multi-statement block, which fails
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
COMMIT;
|
||||
|
||||
-- Nothing from the block should have committed
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
|
||||
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
||||
CREATE TABLE single_shard_items (id integer, name text);
|
||||
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
||||
SELECT master_create_worker_shards('single_shard_items', 1, 2);
|
||||
|
||||
-- Drop the column from the worker...
|
||||
\c - - - :worker_2_port
|
||||
ALTER TABLE lineitem_alter_220000 DROP COLUMN first;
|
||||
|
||||
-- Create table to trigger at-xact-end (deferred) failure
|
||||
CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED);
|
||||
|
||||
-- Use an event trigger to log all DDL event tags in it
|
||||
CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$
|
||||
BEGIN
|
||||
INSERT INTO ddl_commands VALUES (tg_tag);
|
||||
END;
|
||||
$ldt$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag();
|
||||
|
||||
\c - - - :master_port
|
||||
-- The above trigger will cause failure at transaction end on one placement.
|
||||
-- We'll test 2PC first, as it should handle this "best" (no divergence)
|
||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||
BEGIN;
|
||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||
COMMIT;
|
||||
|
||||
-- Nothing from the block should have committed
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||
|
||||
-- Now try with 2pc off
|
||||
RESET citus.multi_shard_commit_protocol;
|
||||
BEGIN;
|
||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||
COMMIT;
|
||||
|
||||
-- The block should have committed with a warning
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||
|
||||
\c - - - :worker_2_port
|
||||
DROP EVENT TRIGGER log_ddl_tag;
|
||||
DROP FUNCTION log_ddl_tag();
|
||||
DROP TABLE ddl_commands;
|
||||
|
||||
\c - - - :master_port
|
||||
-- Distributed SELECTs cannot appear after ALTER
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
|
|
|
@ -436,20 +436,169 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
|||
(1 row)
|
||||
|
||||
DROP INDEX temp_index_2;
|
||||
-- but that multiple ddl statements in a block results in ROLLBACK
|
||||
-- and so are multiple ddl statements
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing other modifications
|
||||
COMMIT;
|
||||
\d lineitem_alter
|
||||
Table "public.lineitem_alter"
|
||||
Column | Type | Modifiers
|
||||
-----------------+-----------------------+-----------
|
||||
l_orderkey | bigint | not null
|
||||
l_partkey | integer | not null
|
||||
l_suppkey | integer | not null
|
||||
l_linenumber | integer | not null
|
||||
l_quantity | numeric(15,2) | not null
|
||||
l_extendedprice | numeric(15,2) | not null
|
||||
l_discount | numeric(15,2) | not null
|
||||
l_tax | numeric(15,2) | not null
|
||||
l_returnflag | character(1) | not null
|
||||
l_linestatus | character(1) | not null
|
||||
l_shipdate | date | not null
|
||||
l_commitdate | date | not null
|
||||
l_receiptdate | date | not null
|
||||
l_shipinstruct | character(25) | not null
|
||||
l_shipmode | character(10) | not null
|
||||
l_comment | character varying(44) | not null
|
||||
null_column | integer |
|
||||
first | integer |
|
||||
Indexes:
|
||||
"temp_index_2" btree (l_orderkey)
|
||||
|
||||
ALTER TABLE lineitem_alter DROP COLUMN first;
|
||||
DROP INDEX temp_index_2;
|
||||
-- ensure that user-specified rollback causes full rollback
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- and distributed SELECTs cannot appear after ALTER
|
||||
-- ensure that errors cause full rollback
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
ERROR: relation "temp_index_2" already exists
|
||||
ROLLBACK;
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- verify that SAVEPOINT is allowed...
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SAVEPOINT my_savepoint;
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK;
|
||||
-- but that actually rolling back to it is not
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
SAVEPOINT my_savepoint;
|
||||
CREATE INDEX temp_index_3 ON lineitem_alter(l_partkey);
|
||||
ROLLBACK TO my_savepoint;
|
||||
COMMIT;
|
||||
ERROR: cannot ROLLBACK TO SAVEPOINT in transactions which modify distributed tables
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- Add column on only one worker...
|
||||
\c - - - :worker_2_port
|
||||
ALTER TABLE lineitem_alter_220000 ADD COLUMN first integer;
|
||||
\c - - - :master_port
|
||||
-- and try to add it in a multi-statement block, which fails
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||
WARNING: column "first" of relation "lineitem_alter_220000" already exists
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
COMMIT;
|
||||
-- Nothing from the block should have committed
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
||||
CREATE TABLE single_shard_items (id integer, name text);
|
||||
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('single_shard_items', 1, 2);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Drop the column from the worker...
|
||||
\c - - - :worker_2_port
|
||||
ALTER TABLE lineitem_alter_220000 DROP COLUMN first;
|
||||
-- Create table to trigger at-xact-end (deferred) failure
|
||||
CREATE TABLE ddl_commands (command text UNIQUE DEFERRABLE INITIALLY DEFERRED);
|
||||
-- Use an event trigger to log all DDL event tags in it
|
||||
CREATE FUNCTION log_ddl_tag() RETURNS event_trigger AS $ldt$
|
||||
BEGIN
|
||||
INSERT INTO ddl_commands VALUES (tg_tag);
|
||||
END;
|
||||
$ldt$ LANGUAGE plpgsql;
|
||||
CREATE EVENT TRIGGER log_ddl_tag ON ddl_command_end EXECUTE PROCEDURE log_ddl_tag();
|
||||
\c - - - :master_port
|
||||
-- The above trigger will cause failure at transaction end on one placement.
|
||||
-- We'll test 2PC first, as it should handle this "best" (no divergence)
|
||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||
BEGIN;
|
||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||
COMMIT;
|
||||
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
||||
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: failed to prepare transaction
|
||||
-- Nothing from the block should have committed
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||
indexname | tablename
|
||||
-----------+-----------
|
||||
(0 rows)
|
||||
|
||||
-- Now try with 2pc off
|
||||
RESET citus.multi_shard_commit_protocol;
|
||||
BEGIN;
|
||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||
COMMIT;
|
||||
WARNING: failed to commit transaction on localhost:57638
|
||||
-- The block should have committed with a warning
|
||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||
indexname | tablename
|
||||
----------------+--------------------
|
||||
single_index_2 | single_shard_items
|
||||
single_index_3 | single_shard_items
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
DROP EVENT TRIGGER log_ddl_tag;
|
||||
DROP FUNCTION log_ddl_tag();
|
||||
DROP TABLE ddl_commands;
|
||||
\c - - - :master_port
|
||||
-- Distributed SELECTs cannot appear after ALTER
|
||||
BEGIN;
|
||||
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
SELECT l_orderkey FROM lineitem_alter LIMIT 0;
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
COMMIT;
|
||||
|
@ -502,7 +651,7 @@ SELECT master_create_worker_shards('test_ab', 8, 2);
|
|||
INSERT INTO test_ab VALUES (2, 10);
|
||||
INSERT INTO test_ab VALUES (2, 11);
|
||||
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
|
||||
WARNING: could not create unique index "temp_unique_index_1_220021"
|
||||
WARNING: could not create unique index "temp_unique_index_1_220022"
|
||||
DETAIL: Key (a)=(2) is duplicated.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ERROR: could not execute DDL command on worker node shards
|
||||
|
@ -550,7 +699,8 @@ ORDER BY attnum;
|
|||
null_column | integer
|
||||
........pg.dropped.22........ | -
|
||||
........pg.dropped.23........ | -
|
||||
(29 rows)
|
||||
........pg.dropped.24........ | -
|
||||
(30 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
-- verify that we don't intercept DDL commands if propagation is turned off
|
||||
|
|
Loading…
Reference in New Issue