mirror of https://github.com/citusdata/citus.git
Merge pull request #1020 from citusdata/feature/transaction-management
Centralized Transaction Management Infrastructurepull/835/head
commit
dd149c3e24
|
@ -329,9 +329,12 @@ CloseConnection(MultiConnection *connection)
|
||||||
|
|
||||||
if (found)
|
if (found)
|
||||||
{
|
{
|
||||||
/* unlink from list */
|
/* unlink from list of open connections */
|
||||||
dlist_delete(&connection->connectionNode);
|
dlist_delete(&connection->connectionNode);
|
||||||
|
|
||||||
|
/* same for transaction state */
|
||||||
|
CloseRemoteTransaction(connection);
|
||||||
|
|
||||||
/* we leave the per-host entry alive */
|
/* we leave the per-host entry alive */
|
||||||
pfree(connection);
|
pfree(connection);
|
||||||
}
|
}
|
||||||
|
@ -632,6 +635,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
/* reset per-transaction state */
|
||||||
|
ResetRemoteTransaction(connection);
|
||||||
|
|
||||||
UnclaimConnection(connection);
|
UnclaimConnection(connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
|
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "storage/latch.h"
|
||||||
|
|
||||||
|
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
|
@ -52,7 +54,9 @@ ForgetResults(MultiConnection *connection)
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
result = PQgetResult(connection->pgConn);
|
const bool dontRaiseErrors = false;
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, dontRaiseErrors);
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
@ -184,12 +188,193 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
/* wrappers around libpq functions, with command logging support */
|
/* wrappers around libpq functions, with command logging support */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendRemoteCommand is a tiny PQsendQuery wrapper that logs remote commands,
|
* SendRemoteCommand is a PQsendQuery wrapper that logs remote commands, and
|
||||||
* and accepts a MultiConnection instead of a plain PGconn.
|
* accepts a MultiConnection instead of a plain PGconn. It makes sure it can
|
||||||
|
* send commands asynchronously without blocking (at the potential expense of
|
||||||
|
* an additional memory allocation).
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
SendRemoteCommand(MultiConnection *connection, const char *command)
|
SendRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
{
|
{
|
||||||
|
PGconn *pgConn = connection->pgConn;
|
||||||
|
bool wasNonblocking = PQisnonblocking(pgConn);
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
LogRemoteCommand(connection, command);
|
LogRemoteCommand(connection, command);
|
||||||
return PQsendQuery(connection->pgConn, command);
|
|
||||||
|
/* make sure not to block anywhere */
|
||||||
|
if (!wasNonblocking)
|
||||||
|
{
|
||||||
|
PQsetnonblocking(pgConn, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = PQsendQuery(pgConn, command);
|
||||||
|
|
||||||
|
/* reset nonblocking connection to its original state */
|
||||||
|
if (!wasNonblocking)
|
||||||
|
{
|
||||||
|
PQsetnonblocking(pgConn, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetCommandResult is a wrapper around PQgetResult() that handles interrupts.
|
||||||
|
*
|
||||||
|
* If raiseInterrupts is true and an interrupt arrives, e.g. the query is
|
||||||
|
* being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws
|
||||||
|
* an error.
|
||||||
|
*
|
||||||
|
* If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
|
||||||
|
* an error, GetRemotecommandResult returns NULL, and the transaction is
|
||||||
|
* marked as having failed. While that's not a perfect way to signal failure,
|
||||||
|
* callers will usually treat that as an error, and it's easy to use.
|
||||||
|
*
|
||||||
|
* Handling of interrupts is important to allow queries being cancelled while
|
||||||
|
* waiting on remote nodes. In a distributed deadlock scenario cancelling
|
||||||
|
* might be the only way to resolve the deadlock.
|
||||||
|
*/
|
||||||
|
PGresult *
|
||||||
|
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
||||||
|
{
|
||||||
|
PGconn *pgConn = connection->pgConn;
|
||||||
|
int socket = 0;
|
||||||
|
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
|
||||||
|
bool wasNonblocking = false;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
bool failed = false;
|
||||||
|
|
||||||
|
/* short circuit tests around the more expensive parts of this routine */
|
||||||
|
if (!PQisBusy(pgConn))
|
||||||
|
{
|
||||||
|
return PQgetResult(connection->pgConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
socket = PQsocket(pgConn);
|
||||||
|
wasNonblocking = PQisnonblocking(pgConn);
|
||||||
|
|
||||||
|
/* make sure not to block anywhere */
|
||||||
|
if (!wasNonblocking)
|
||||||
|
{
|
||||||
|
PQsetnonblocking(pgConn, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* make sure command has been sent out */
|
||||||
|
while (!failed)
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
/* try to send all the data */
|
||||||
|
rc = PQflush(pgConn);
|
||||||
|
|
||||||
|
/* stop writing if all data has been sent, or there was none to send */
|
||||||
|
if (rc == 0)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if sending failed, there's nothing more we can do */
|
||||||
|
if (rc == -1)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* this means we have to wait for data to go out */
|
||||||
|
Assert(rc == 1);
|
||||||
|
|
||||||
|
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0);
|
||||||
|
|
||||||
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
/* if allowed raise errors */
|
||||||
|
if (raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If raising errors allowed, or called within in a section with
|
||||||
|
* interrupts held, return NULL instead, and mark the transaction
|
||||||
|
* as failed.
|
||||||
|
*/
|
||||||
|
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
|
||||||
|
{
|
||||||
|
connection->remoteTransaction.transactionFailed = true;
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* wait for the result of the command to come in */
|
||||||
|
while (!failed)
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
/* if reading fails, there's not much we can do */
|
||||||
|
if (PQconsumeInput(pgConn) == 0)
|
||||||
|
{
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* check if all the necessary data is now available */
|
||||||
|
if (!PQisBusy(pgConn))
|
||||||
|
{
|
||||||
|
result = PQgetResult(connection->pgConn);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0);
|
||||||
|
|
||||||
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
/* if allowed raise errors */
|
||||||
|
if (raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If raising errors allowed, or called within in a section with
|
||||||
|
* interrupts held, return NULL instead, and mark the transaction
|
||||||
|
* as failed.
|
||||||
|
*/
|
||||||
|
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
|
||||||
|
{
|
||||||
|
connection->remoteTransaction.transactionFailed = true;
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!wasNonblocking)
|
||||||
|
{
|
||||||
|
PQsetnonblocking(pgConn, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/remote_transaction.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "executor/execdesc.h"
|
#include "executor/execdesc.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
@ -80,13 +81,9 @@ bool AllModificationsCommutative = false;
|
||||||
* of XactShardConnSets, which map a shard identifier to a set of connection
|
* of XactShardConnSets, which map a shard identifier to a set of connection
|
||||||
* hash entries. This list is walked by MarkRemainingInactivePlacements to
|
* hash entries. This list is walked by MarkRemainingInactivePlacements to
|
||||||
* ensure we mark placements as failed if they reject a COMMIT.
|
* ensure we mark placements as failed if they reject a COMMIT.
|
||||||
*
|
|
||||||
* Beyond that, there's a backend hook to register xact callbacks and a flag to
|
|
||||||
* track when a user tries to roll back to a savepoint (not allowed).
|
|
||||||
*/
|
*/
|
||||||
static HTAB *xactParticipantHash = NULL;
|
static HTAB *xactParticipantHash = NULL;
|
||||||
static List *xactShardConnSetList = NIL;
|
static List *xactShardConnSetList = NIL;
|
||||||
static bool subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
/* functions needed during start phase */
|
/* functions needed during start phase */
|
||||||
static void InitTransactionStateForTask(Task *task);
|
static void InitTransactionStateForTask(Task *task);
|
||||||
|
@ -124,11 +121,7 @@ static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows
|
||||||
static void RecordShardIdParticipant(uint64 affectedShardId,
|
static void RecordShardIdParticipant(uint64 affectedShardId,
|
||||||
NodeConnectionEntry *participantEntry);
|
NodeConnectionEntry *participantEntry);
|
||||||
|
|
||||||
/* functions needed by callbacks and hooks */
|
/* to verify the health of shards after a transactional modification command */
|
||||||
static void RouterTransactionCallback(XactEvent event, void *arg);
|
|
||||||
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg);
|
|
||||||
static void ExecuteTransactionEnd(bool commit);
|
|
||||||
static void MarkRemainingInactivePlacements(void);
|
static void MarkRemainingInactivePlacements(void);
|
||||||
|
|
||||||
|
|
||||||
|
@ -249,6 +242,8 @@ InitTransactionStateForTask(Task *task)
|
||||||
{
|
{
|
||||||
ListCell *placementCell = NULL;
|
ListCell *placementCell = NULL;
|
||||||
|
|
||||||
|
BeginCoordinatedTransaction();
|
||||||
|
|
||||||
xactParticipantHash = CreateXactParticipantHash();
|
xactParticipantHash = CreateXactParticipantHash();
|
||||||
|
|
||||||
foreach(placementCell, task->taskPlacementList)
|
foreach(placementCell, task->taskPlacementList)
|
||||||
|
@ -257,8 +252,9 @@ InitTransactionStateForTask(Task *task)
|
||||||
NodeConnectionKey participantKey;
|
NodeConnectionKey participantKey;
|
||||||
NodeConnectionEntry *participantEntry = NULL;
|
NodeConnectionEntry *participantEntry = NULL;
|
||||||
bool entryFound = false;
|
bool entryFound = false;
|
||||||
|
int connectionFlags = SESSION_LIFESPAN;
|
||||||
PGconn *connection = NULL;
|
MultiConnection *connection =
|
||||||
|
GetNodeConnection(connectionFlags, placement->nodeName, placement->nodePort);
|
||||||
|
|
||||||
MemSet(&participantKey, 0, sizeof(participantKey));
|
MemSet(&participantKey, 0, sizeof(participantKey));
|
||||||
strlcpy(participantKey.nodeName, placement->nodeName,
|
strlcpy(participantKey.nodeName, placement->nodeName,
|
||||||
|
@ -269,21 +265,8 @@ InitTransactionStateForTask(Task *task)
|
||||||
HASH_ENTER, &entryFound);
|
HASH_ENTER, &entryFound);
|
||||||
Assert(!entryFound);
|
Assert(!entryFound);
|
||||||
|
|
||||||
connection = GetOrEstablishConnection(placement->nodeName,
|
/* issue BEGIN if necessary */
|
||||||
placement->nodePort);
|
RemoteTransactionBeginIfNecessary(connection);
|
||||||
if (connection != NULL)
|
|
||||||
{
|
|
||||||
PGresult *result = PQexec(connection, "BEGIN");
|
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
|
||||||
{
|
|
||||||
WarnRemoteError(connection, result);
|
|
||||||
CloseConnectionByPGconn(connection);
|
|
||||||
|
|
||||||
connection = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
participantEntry->connection = connection;
|
participantEntry->connection = connection;
|
||||||
}
|
}
|
||||||
|
@ -1212,7 +1195,7 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
|
||||||
RecordShardIdParticipant(placement->shardId, participantEntry);
|
RecordShardIdParticipant(placement->shardId, participantEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
return participantEntry->connection;
|
return participantEntry->connection->pgConn;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1721,173 +1704,52 @@ RouterExecutorEnd(QueryDesc *queryDesc)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RegisterRouterExecutorXactCallbacks registers this executor's callbacks.
|
* RouterExecutorPreCommitCheck() gets called after remote transactions have
|
||||||
|
* committed, so it can invalidate failed shards and perform related checks.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RegisterRouterExecutorXactCallbacks(void)
|
RouterExecutorPreCommitCheck(void)
|
||||||
{
|
{
|
||||||
RegisterXactCallback(RouterTransactionCallback, NULL);
|
/* no transactional router modification were issued, nothing to do */
|
||||||
RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterTransactionCallback handles committing or aborting remote transactions
|
|
||||||
* after the local one has committed or aborted. It only sends COMMIT or ABORT
|
|
||||||
* commands to still-healthy remotes; the failed ones are marked as inactive if
|
|
||||||
* after a successful COMMIT (no need to mark on ABORTs).
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RouterTransactionCallback(XactEvent event, void *arg)
|
|
||||||
{
|
|
||||||
if (XactModificationLevel != XACT_MODIFICATION_DATA)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (event)
|
|
||||||
{
|
|
||||||
case XACT_EVENT_PARALLEL_COMMIT:
|
|
||||||
case XACT_EVENT_COMMIT:
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case XACT_EVENT_PARALLEL_ABORT:
|
|
||||||
case XACT_EVENT_ABORT:
|
|
||||||
{
|
|
||||||
bool commit = false;
|
|
||||||
|
|
||||||
ExecuteTransactionEnd(commit);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* no support for prepare with multi-statement transactions */
|
|
||||||
case XACT_EVENT_PREPARE:
|
|
||||||
case XACT_EVENT_PRE_PREPARE:
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot prepare a transaction that modified "
|
|
||||||
"distributed tables")));
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
|
||||||
case XACT_EVENT_PRE_COMMIT:
|
|
||||||
{
|
|
||||||
bool commit = true;
|
|
||||||
|
|
||||||
if (subXactAbortAttempted)
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
|
||||||
"which modify distributed tables")));
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecuteTransactionEnd(commit);
|
|
||||||
MarkRemainingInactivePlacements();
|
|
||||||
|
|
||||||
/* leave early to avoid resetting transaction state */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* reset transaction state */
|
|
||||||
xactParticipantHash = NULL;
|
|
||||||
xactShardConnSetList = NIL;
|
|
||||||
subXactAbortAttempted = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RouterSubtransactionCallback silently keeps track of any attempt to ROLLBACK
|
|
||||||
* TO SAVEPOINT, which is not permitted by this executor. At transaction end,
|
|
||||||
* the executor checks whether such a rollback was attempted and, if so, errors
|
|
||||||
* out entirely (with an appropriate message).
|
|
||||||
*
|
|
||||||
* This implementation permits savepoints so long as no rollbacks occur.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg)
|
|
||||||
{
|
|
||||||
if ((xactParticipantHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteTransactionEnd ends any remote transactions still taking place on
|
|
||||||
* remote nodes. It uses xactParticipantHash to know which nodes need any
|
|
||||||
* final COMMIT or ABORT commands. Nodes which fail a final COMMIT will have
|
|
||||||
* their connection field set to NULL to permit placement invalidation.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ExecuteTransactionEnd(bool commit)
|
|
||||||
{
|
|
||||||
const char *sqlCommand = commit ? "COMMIT TRANSACTION" : "ABORT TRANSACTION";
|
|
||||||
HASH_SEQ_STATUS scan;
|
|
||||||
NodeConnectionEntry *participant;
|
|
||||||
bool completed = !commit; /* aborts are assumed completed */
|
|
||||||
|
|
||||||
if (xactParticipantHash == NULL)
|
if (xactParticipantHash == NULL)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash_seq_init(&scan, xactParticipantHash);
|
MarkRemainingInactivePlacements();
|
||||||
while ((participant = (NodeConnectionEntry *) hash_seq_search(&scan)))
|
}
|
||||||
{
|
|
||||||
PGconn *connection = participant->connection;
|
|
||||||
PGresult *result = NULL;
|
|
||||||
|
|
||||||
if (PQstatus(connection) != CONNECTION_OK)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
result = PQexec(connection, sqlCommand);
|
/*
|
||||||
if (PQresultStatus(result) == PGRES_COMMAND_OK)
|
* Cleanup callback called after a transaction commits or aborts.
|
||||||
{
|
*/
|
||||||
completed = true;
|
void
|
||||||
}
|
RouterExecutorPostCommit(void)
|
||||||
else
|
{
|
||||||
{
|
/* reset transaction state */
|
||||||
WarnRemoteError(connection, result);
|
xactParticipantHash = NULL;
|
||||||
CloseConnectionByPGconn(participant->connection);
|
xactShardConnSetList = NIL;
|
||||||
|
|
||||||
participant->connection = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!completed)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MarkRemainingInactivePlacements takes care of marking placements of a shard
|
* MarkRemainingInactivePlacements takes care of marking placements of a shard
|
||||||
* inactive after some of the placements rejected the final COMMIT phase of a
|
* inactive after some of the placements rejected the final COMMIT phase of a
|
||||||
* transaction. This step is skipped if all placements reject the COMMIT, since
|
* transaction.
|
||||||
* in that case no modifications to the placement have persisted.
|
|
||||||
*
|
*
|
||||||
* Failures are detected by checking the connection field of the entries in the
|
* Failures are detected by checking the connection & transaction state for
|
||||||
* connection set for each shard: it is always set to NULL after errors.
|
* each of the entries in the connection set for each shard.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
MarkRemainingInactivePlacements(void)
|
MarkRemainingInactivePlacements(void)
|
||||||
{
|
{
|
||||||
ListCell *shardConnSetCell = NULL;
|
ListCell *shardConnSetCell = NULL;
|
||||||
|
int totalSuccesses = 0;
|
||||||
|
|
||||||
|
if (xactParticipantHash == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(shardConnSetCell, xactShardConnSetList)
|
foreach(shardConnSetCell, xactShardConnSetList)
|
||||||
{
|
{
|
||||||
|
@ -1899,11 +1761,16 @@ MarkRemainingInactivePlacements(void)
|
||||||
/* determine how many actual successes there were: subtract failures */
|
/* determine how many actual successes there were: subtract failures */
|
||||||
foreach(participantCell, participantList)
|
foreach(participantCell, participantList)
|
||||||
{
|
{
|
||||||
NodeConnectionEntry *participant = NULL;
|
NodeConnectionEntry *participant =
|
||||||
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
(NodeConnectionEntry *) lfirst(participantCell);
|
||||||
|
MultiConnection *connection = participant->connection;
|
||||||
|
|
||||||
/* other codes sets connection to NULL after errors */
|
/*
|
||||||
if (participant->connection == NULL)
|
* Fail if the connection has been set to NULL after an error, or
|
||||||
|
* if the transaction failed for other reasons (e.g. COMMIT
|
||||||
|
* failed).
|
||||||
|
*/
|
||||||
|
if (connection == NULL || connection->remoteTransaction.transactionFailed)
|
||||||
{
|
{
|
||||||
successes--;
|
successes--;
|
||||||
}
|
}
|
||||||
|
@ -1921,7 +1788,8 @@ MarkRemainingInactivePlacements(void)
|
||||||
NodeConnectionEntry *participant = NULL;
|
NodeConnectionEntry *participant = NULL;
|
||||||
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
participant = (NodeConnectionEntry *) lfirst(participantCell);
|
||||||
|
|
||||||
if (participant->connection == NULL)
|
if (participant->connection == NULL ||
|
||||||
|
participant->connection->remoteTransaction.transactionFailed)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardConnSet->shardId;
|
uint64 shardId = shardConnSet->shardId;
|
||||||
NodeConnectionKey *nodeKey = &participant->cacheKey;
|
NodeConnectionKey *nodeKey = &participant->cacheKey;
|
||||||
|
@ -1934,5 +1802,13 @@ MarkRemainingInactivePlacements(void)
|
||||||
nodeKey->nodeName, nodeKey->nodePort);
|
nodeKey->nodeName, nodeKey->nodePort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
totalSuccesses++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If no shards could be modified at all, error out. */
|
||||||
|
if (totalSuccesses == 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not commit transaction on any active nodes")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/task_tracker.h"
|
#include "distributed/task_tracker.h"
|
||||||
|
@ -161,10 +160,6 @@ _PG_init(void)
|
||||||
InitializeTransactionManagement();
|
InitializeTransactionManagement();
|
||||||
InitializeConnectionManagement();
|
InitializeConnectionManagement();
|
||||||
|
|
||||||
/* initialize transaction callbacks */
|
|
||||||
RegisterRouterExecutorXactCallbacks();
|
|
||||||
RegisterShardPlacementXactCallbacks();
|
|
||||||
|
|
||||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||||
if (IsBinaryUpgrade)
|
if (IsBinaryUpgrade)
|
||||||
{
|
{
|
||||||
|
|
|
@ -30,14 +30,8 @@
|
||||||
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
||||||
|
|
||||||
|
|
||||||
/* Global variables used in commit handler */
|
/* per-transaction state */
|
||||||
static HTAB *shardConnectionHash = NULL;
|
static HTAB *shardConnectionHash = NULL;
|
||||||
static bool subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
/* functions needed by callbacks and hooks */
|
|
||||||
static void CompleteShardPlacementTransactions(XactEvent event, void *arg);
|
|
||||||
static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -118,6 +112,12 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||||
UINT64_FORMAT, shardId)));
|
UINT64_FORMAT, shardId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||||
|
{
|
||||||
|
CoordinatedTransactionUse2PC();
|
||||||
|
}
|
||||||
|
|
||||||
/* get existing connections to the shard placements, if any */
|
/* get existing connections to the shard placements, if any */
|
||||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||||
if (shardConnectionsFound)
|
if (shardConnectionsFound)
|
||||||
|
@ -129,11 +129,11 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||||
foreach(placementCell, shardPlacementList)
|
foreach(placementCell, shardPlacementList)
|
||||||
{
|
{
|
||||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
|
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
|
||||||
PGconn *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
TransactionConnection *transactionConnection = NULL;
|
TransactionConnection *transactionConnection = NULL;
|
||||||
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
|
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
|
||||||
shardPlacement->nodePort);
|
shardPlacement->nodePort);
|
||||||
PGresult *result = NULL;
|
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||||
|
|
||||||
if (workerNode == NULL)
|
if (workerNode == NULL)
|
||||||
{
|
{
|
||||||
|
@ -141,10 +141,13 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||||
shardPlacement->nodeName, shardPlacement->nodePort)));
|
shardPlacement->nodeName, shardPlacement->nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort,
|
/* XXX: It'd be nicer to establish connections asynchronously here */
|
||||||
userName);
|
connection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
|
shardPlacement->nodeName,
|
||||||
if (connection == NULL)
|
shardPlacement->nodePort,
|
||||||
|
userName,
|
||||||
|
NULL);
|
||||||
|
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not establish a connection to all "
|
ereport(ERROR, (errmsg("could not establish a connection to all "
|
||||||
"placements of shard %lu", shardId)));
|
"placements of shard %lu", shardId)));
|
||||||
|
@ -158,7 +161,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||||
transactionConnection->groupId = workerNode->groupId;
|
transactionConnection->groupId = workerNode->groupId;
|
||||||
transactionConnection->connectionId = shardConnections->shardId;
|
transactionConnection->connectionId = shardConnections->shardId;
|
||||||
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
|
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
|
||||||
transactionConnection->connection = connection;
|
transactionConnection->connection = connection->pgConn;
|
||||||
transactionConnection->nodeName = shardPlacement->nodeName;
|
transactionConnection->nodeName = shardPlacement->nodeName;
|
||||||
transactionConnection->nodePort = shardPlacement->nodePort;
|
transactionConnection->nodePort = shardPlacement->nodePort;
|
||||||
|
|
||||||
|
@ -167,12 +170,14 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
/* now that connection is tracked, issue BEGIN */
|
/*
|
||||||
result = PQexec(connection, "BEGIN");
|
* Every individual failure should cause entire distributed
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
* transaction to fail.
|
||||||
{
|
*/
|
||||||
ReraiseRemoteError(connection, result);
|
MarkRemoteTransactionCritical(connection);
|
||||||
}
|
|
||||||
|
/* issue BEGIN */
|
||||||
|
RemoteTransactionBegin(connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,98 +258,18 @@ ConnectionList(HTAB *connectionHash)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RegisterShardPlacementXactCallbacks registers transaction callbacks needed
|
* ResetShardPlacementTransactionState performs cleanup after the end of a
|
||||||
* for multi-shard transactions.
|
* transaction.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RegisterShardPlacementXactCallbacks(void)
|
ResetShardPlacementTransactionState(void)
|
||||||
{
|
{
|
||||||
RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
|
/*
|
||||||
RegisterSubXactCallback(MultiShardSubXactCallback, NULL);
|
* Now that transaction management does most of our work, nothing remains
|
||||||
}
|
* but to reset the connection hash, which wouldn't be valid next time
|
||||||
|
* round.
|
||||||
|
*/
|
||||||
/*
|
|
||||||
* CompleteShardPlacementTransactions commits or aborts pending shard placement
|
|
||||||
* transactions when the local transaction commits or aborts.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|
||||||
{
|
|
||||||
List *connectionList = ConnectionList(shardConnectionHash);
|
|
||||||
|
|
||||||
if (shardConnectionHash == NULL)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event == XACT_EVENT_PRE_COMMIT)
|
|
||||||
{
|
|
||||||
if (subXactAbortAttempted)
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = false;
|
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot ROLLBACK TO SAVEPOINT in transactions "
|
|
||||||
"which modify distributed tables")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Any failure here will cause local changes to be rolled back,
|
|
||||||
* and remote changes to either roll back (1PC) or, in case of
|
|
||||||
* connection or node failure, leave a prepared transaction
|
|
||||||
* (2PC).
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
|
||||||
{
|
|
||||||
PrepareRemoteTransactions(connectionList);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (event == XACT_EVENT_COMMIT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* A failure here will cause some remote changes to either
|
|
||||||
* roll back (1PC) or, in case of connection or node failure,
|
|
||||||
* leave a prepared transaction (2PC). However, the local
|
|
||||||
* changes have already been committed.
|
|
||||||
*/
|
|
||||||
|
|
||||||
CommitRemoteTransactions(connectionList, false);
|
|
||||||
}
|
|
||||||
else if (event == XACT_EVENT_ABORT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* A failure here will cause some remote changes to either
|
|
||||||
* roll back (1PC) or, in case of connection or node failure,
|
|
||||||
* leave a prepared transaction (2PC). The local changes have
|
|
||||||
* already been rolled back.
|
|
||||||
*/
|
|
||||||
|
|
||||||
AbortRemoteTransactions(connectionList);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseConnections(connectionList);
|
|
||||||
shardConnectionHash = NULL;
|
shardConnectionHash = NULL;
|
||||||
subXactAbortAttempted = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
|
|
||||||
SubTransactionId parentSubid, void *arg)
|
|
||||||
{
|
|
||||||
if ((shardConnectionHash != NULL) && (event == SUBXACT_EVENT_ABORT_SUB))
|
|
||||||
{
|
|
||||||
subXactAbortAttempted = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,863 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* remote_transaction.c
|
||||||
|
* Management of transaction spanning more than one node.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/remote_transaction.h"
|
||||||
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
|
||||||
|
static void CheckTransactionHealth(void);
|
||||||
|
static void Assign2PCIdentifier(MultiConnection *connection);
|
||||||
|
static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartRemoteTransactionBeging initiates beginning the remote transaction in
|
||||||
|
* a non-blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState == REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
/* remember transaction as being in-progress */
|
||||||
|
dlist_push_tail(&InProgressTransactions, &connection->transactionNode);
|
||||||
|
|
||||||
|
transaction->transactionState = REMOTE_TRANS_STARTING;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Explicitly specify READ COMMITTED, the default on the remote
|
||||||
|
* side might have been changed, and that would cause problematic
|
||||||
|
* behaviour.
|
||||||
|
*/
|
||||||
|
if (!SendRemoteCommand(connection,
|
||||||
|
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin
|
||||||
|
* initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
FinishRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
const bool raiseErrors = true;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, raiseErrors);
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, raiseErrors);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_STARTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
if (!transaction->transactionFailed)
|
||||||
|
{
|
||||||
|
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionBegin begins a remote transaction in a blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
StartRemoteTransactionBegin(connection);
|
||||||
|
FinishRemoteTransactionBegin(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartRemoteTransactionCommit initiates transaction commit in a non-blocking
|
||||||
|
* manner. If the transaction is in a failed state, it'll instead get rolled
|
||||||
|
* back.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
StartRemoteTransactionCommit(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
const bool dontRaiseError = false;
|
||||||
|
const bool isCommit = true;
|
||||||
|
|
||||||
|
/* can only commit if transaction is in progress */
|
||||||
|
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
/* can't commit if we already started to commit or abort */
|
||||||
|
Assert(transaction->transactionState < REMOTE_TRANS_1PC_ABORTING);
|
||||||
|
|
||||||
|
if (transaction->transactionFailed)
|
||||||
|
{
|
||||||
|
/* abort the transaction if it failed */
|
||||||
|
transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Try sending an ROLLBACK; Depending on the state that won't
|
||||||
|
* succeed, but let's try. Have to clear previous results
|
||||||
|
* first.
|
||||||
|
*/
|
||||||
|
ForgetResults(connection); /* try to clear pending stuff */
|
||||||
|
if (!SendRemoteCommand(connection, "ROLLBACK"))
|
||||||
|
{
|
||||||
|
/* no point in reporting a likely redundant message */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (transaction->transactionState == REMOTE_TRANS_PREPARED)
|
||||||
|
{
|
||||||
|
/* commit the prepared transaction */
|
||||||
|
StringInfoData command;
|
||||||
|
|
||||||
|
initStringInfo(&command);
|
||||||
|
appendStringInfo(&command, "COMMIT PREPARED '%s'",
|
||||||
|
transaction->preparedName);
|
||||||
|
|
||||||
|
transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING;
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, command.data))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseError);
|
||||||
|
|
||||||
|
WarnAboutLeakedPreparedTransaction(connection, isCommit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* initiate remote transaction commit */
|
||||||
|
transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING;
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, "COMMIT"))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For a moment there I thought we were in trouble.
|
||||||
|
*
|
||||||
|
* Failing in this state means that we don't know whether the the
|
||||||
|
* commit has succeeded.
|
||||||
|
*/
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FinishRemoteTransactionCommit finishes the work
|
||||||
|
* StartRemoteTransactionCommit initiated. It blocks if necessary (i.e. if
|
||||||
|
* PQisBusy() would return true).
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
FinishRemoteTransactionCommit(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
const bool dontRaiseErrors = false;
|
||||||
|
const bool isCommit = true;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING);
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, dontRaiseErrors);
|
||||||
|
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Failing in this state means that we will often not know whether
|
||||||
|
* the the commit has succeeded (particularly in case of network
|
||||||
|
* troubles).
|
||||||
|
*
|
||||||
|
* XXX: It might be worthwhile to discern cases where we got a
|
||||||
|
* proper error back from postgres (i.e. COMMIT was received but
|
||||||
|
* produced an error) from cases where the connection failed
|
||||||
|
* before getting a reply.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING)
|
||||||
|
{
|
||||||
|
if (transaction->transactionCritical)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("failed to commit critical transaction "
|
||||||
|
"on %s:%d, metadata is likely out of sync",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
WarnAboutLeakedPreparedTransaction(connection, isCommit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_ABORTED;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_COMMITTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
|
||||||
|
ForgetResults(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionCommit commits (or aborts, if the transaction failed) a
|
||||||
|
* remote transaction in a blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionCommit(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
StartRemoteTransactionCommit(connection);
|
||||||
|
FinishRemoteTransactionCommit(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartRemoteTransactionAbort initiates abortin the transaction in a
|
||||||
|
* non-blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
StartRemoteTransactionAbort(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
const bool dontRaiseErrors = false;
|
||||||
|
const bool isNotCommit = false;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Clear previous results, so we have a better chance to send
|
||||||
|
* ROLLBACK [PREPARED];
|
||||||
|
*/
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_PREPARED)
|
||||||
|
{
|
||||||
|
StringInfoData command;
|
||||||
|
|
||||||
|
initStringInfo(&command);
|
||||||
|
appendStringInfo(&command, "ROLLBACK PREPARED '%s'",
|
||||||
|
transaction->preparedName);
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, command.data))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
|
||||||
|
|
||||||
|
WarnAboutLeakedPreparedTransaction(connection, isNotCommit);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_2PC_ABORTING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!SendRemoteCommand(connection, "ROLLBACK"))
|
||||||
|
{
|
||||||
|
/* no point in reporting a likely redundant message */
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FinishRemoteTransactionAbort finishes the work StartRemoteTransactionAbort
|
||||||
|
* initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
FinishRemoteTransactionAbort(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
const bool dontRaiseErrors = false;
|
||||||
|
const bool isNotCommit = false;
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, dontRaiseErrors);
|
||||||
|
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, dontRaiseErrors);
|
||||||
|
|
||||||
|
if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING)
|
||||||
|
{
|
||||||
|
ereport(WARNING,
|
||||||
|
(errmsg("failed to abort 2PC transaction \"%s\" on %s:%d",
|
||||||
|
transaction->preparedName, connection->hostname,
|
||||||
|
connection->port)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
WarnAboutLeakedPreparedTransaction(connection, isNotCommit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, dontRaiseErrors);
|
||||||
|
Assert(!result);
|
||||||
|
|
||||||
|
transaction->transactionState = REMOTE_TRANS_ABORTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionAbort aborts a remote transaction in a blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionAbort(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
StartRemoteTransactionAbort(connection);
|
||||||
|
FinishRemoteTransactionAbort(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* StartRemoteTransactionPrepare initiates preparing the transaction in a
|
||||||
|
* non-blocking manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
StartRemoteTransactionPrepare(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
StringInfoData command;
|
||||||
|
const bool raiseErrors = true;
|
||||||
|
|
||||||
|
/* can't prepare a nonexistant transaction */
|
||||||
|
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
/* can't prepare in a failed transaction */
|
||||||
|
Assert(!transaction->transactionFailed);
|
||||||
|
|
||||||
|
/* can't prepare if already started to prepare/abort/commit */
|
||||||
|
Assert(transaction->transactionState < REMOTE_TRANS_PREPARING);
|
||||||
|
|
||||||
|
Assign2PCIdentifier(connection);
|
||||||
|
|
||||||
|
initStringInfo(&command);
|
||||||
|
appendStringInfo(&command, "PREPARE TRANSACTION '%s'",
|
||||||
|
transaction->preparedName);
|
||||||
|
|
||||||
|
if (!SendRemoteCommand(connection, command.data))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, WARNING);
|
||||||
|
MarkRemoteTransactionFailed(connection, raiseErrors);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_PREPARING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FinishRemoteTransactionPrepare finishes the work
|
||||||
|
* StartRemoteTransactionPrepare initiated. It blocks if necessary (i.e. if
|
||||||
|
* PQisBusy() would return true).
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
FinishRemoteTransactionPrepare(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
PGresult *result = NULL;
|
||||||
|
const bool raiseErrors = true;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState == REMOTE_TRANS_PREPARING);
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, raiseErrors);
|
||||||
|
|
||||||
|
if (!IsResponseOK(result))
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, WARNING);
|
||||||
|
transaction->transactionState = REMOTE_TRANS_ABORTED;
|
||||||
|
MarkRemoteTransactionFailed(connection, raiseErrors);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
transaction->transactionState = REMOTE_TRANS_PREPARED;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = GetRemoteCommandResult(connection, raiseErrors);
|
||||||
|
Assert(!result);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionPrepare prepares a remote transaction in a blocking
|
||||||
|
* manner.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionPrepare(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
StartRemoteTransactionPrepare(connection);
|
||||||
|
FinishRemoteTransactionPrepare(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionBeginIfNecessary is a convenience wrapper around
|
||||||
|
* RemoteTransactionsBeginIfNecessary(), for a single connection.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionBeginIfNecessary(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
/* just delegate */
|
||||||
|
if (InCoordinatedTransaction())
|
||||||
|
{
|
||||||
|
List *connectionList = list_make1(connection);
|
||||||
|
|
||||||
|
RemoteTransactionsBeginIfNecessary(connectionList);
|
||||||
|
list_free(connectionList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteTransactionsBeginIfNecessary begins, if necessary according to this
|
||||||
|
* session's coordinated transaction state, and the remote transaction's
|
||||||
|
* state, an explicit transaction on all the connections. This is done in
|
||||||
|
* parallel, to lessen latency penalties.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoteTransactionsBeginIfNecessary(List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Don't do anything if not in a coordinated transaction. That allows the
|
||||||
|
* same code to work both in situations that uses transactions, and when
|
||||||
|
* not.
|
||||||
|
*/
|
||||||
|
if (!InCoordinatedTransaction())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* issue BEGIN to all connections needing it */
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
/* can't send BEGIN if a command already is in progress */
|
||||||
|
Assert(PQtransactionStatus(connection->pgConn) != PQTRANS_ACTIVE);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If a transaction already is in progress (including having failed),
|
||||||
|
* don't start it again. Thats quite normal if a piece of code allows
|
||||||
|
* cached connections.
|
||||||
|
*/
|
||||||
|
if (transaction->transactionState != REMOTE_TRANS_INVALID)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
StartRemoteTransactionBegin(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XXX: Should perform network IO for all connections in a non-blocking manner */
|
||||||
|
|
||||||
|
/* get result of all the BEGINs */
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Only handle BEGIN results on connections that are in process of
|
||||||
|
* starting a transaction, and haven't already failed (e.g. by not
|
||||||
|
* being able to send BEGIN due to a network failure).
|
||||||
|
*/
|
||||||
|
if (transaction->transactionFailed ||
|
||||||
|
transaction->transactionState != REMOTE_TRANS_STARTING)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishRemoteTransactionBegin(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkRemoteTransactionFailed records a transaction as having failed.
|
||||||
|
*
|
||||||
|
* If the connection is marked as critical, and allowErrorPromotion is true,
|
||||||
|
* this routine will ERROR out. The allowErrorPromotion case is primarily
|
||||||
|
* required for the transaction management code itself. Usually it is helpful
|
||||||
|
* to fail as soon as possible. If !allowErrorPromotion transaction commit
|
||||||
|
* will instead issue an error before committing on any node.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
transaction->transactionFailed = true;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the connection is marked as critical, fail the entire coordinated
|
||||||
|
* transaction. If allowed.
|
||||||
|
*/
|
||||||
|
if (transaction->transactionCritical && allowErrorPromotion)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkRemoteTransactionCritical signals that failures on this remote
|
||||||
|
* transaction should fail the entire coordinated transaction.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
MarkRemoteTransactionCritical(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
transaction->transactionCritical = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CloseRemoteTransaction handles closing a connection that, potentially, is
|
||||||
|
* part of a coordinated transaction. This should only ever be called from
|
||||||
|
* connection_management.c, while closing a connection during a transaction.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CloseRemoteTransaction(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
/* unlink from list of open transactions, if necessary */
|
||||||
|
if (transaction->transactionState != REMOTE_TRANS_INVALID)
|
||||||
|
{
|
||||||
|
/* XXX: Should we error out for a critical transaction? */
|
||||||
|
|
||||||
|
dlist_delete(&connection->transactionNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ResetRemoteTransaction resets the state of the transaction after the end of
|
||||||
|
* the main transaction, if the connection is being reused.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ResetRemoteTransaction(struct MultiConnection *connection)
|
||||||
|
{
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
/* just reset the entire state, relying on 0 being invalid/false */
|
||||||
|
memset(transaction, 0, sizeof(*transaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoordinatedRemoteTransactionsPrepare PREPAREs a 2PC transaction on all
|
||||||
|
* non-failed transactions participating in the coordinated transaction.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CoordinatedRemoteTransactionsPrepare(void)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
|
/* issue PREPARE TRANSACTION; to all relevant remote nodes */
|
||||||
|
|
||||||
|
/* asynchronously send PREPARE */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
||||||
|
|
||||||
|
/* can't PREPARE a transaction that failed */
|
||||||
|
if (transaction->transactionFailed)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
StartRemoteTransactionPrepare(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XXX: Should perform network IO for all connections in a non-blocking manner */
|
||||||
|
|
||||||
|
/* Wait for result */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
if (transaction->transactionState != REMOTE_TRANS_PREPARING)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishRemoteTransactionPrepare(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoordinatedRemoteTransactionsCommit performs distributed transactions
|
||||||
|
* handling at commit time. This will be called at XACT_EVENT_PRE_COMMIT if
|
||||||
|
* 1PC commits are used - so shards can still be invalidated - and at
|
||||||
|
* XACT_EVENT_COMMIT if 2PC is being used.
|
||||||
|
*
|
||||||
|
* Note that this routine has to issue rollbacks for failed transactions.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CoordinatedRemoteTransactionsCommit(void)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Before starting to commit on any of the nodes - after which we can't
|
||||||
|
* completely roll-back anymore - check that things are in a good state.
|
||||||
|
*/
|
||||||
|
CheckTransactionHealth();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Issue appropriate transaction commands to remote nodes. If everything
|
||||||
|
* went well that's going to be COMMIT or COMMIT PREPARED, if individual
|
||||||
|
* connections had errors, some or all of them might require a ROLLBACK.
|
||||||
|
*
|
||||||
|
* First send the command asynchronously over all connections.
|
||||||
|
*/
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
if (transaction->transactionState == REMOTE_TRANS_INVALID ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_COMMITTED)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
StartRemoteTransactionCommit(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XXX: Should perform network IO for all connections in a non-blocking manner */
|
||||||
|
|
||||||
|
/* wait for the replies to the commands to come in */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
/* nothing to do if not committing / aborting */
|
||||||
|
if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING &&
|
||||||
|
transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING &&
|
||||||
|
transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
|
||||||
|
transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishRemoteTransactionCommit(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoordinatedRemoteTransactionsAbort performs distributed transactions
|
||||||
|
* handling at abort time.
|
||||||
|
*
|
||||||
|
* This issues ROLLBACKS and ROLLBACK PREPARED depending on whether the remote
|
||||||
|
* transaction has been prepared or not.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CoordinatedRemoteTransactionsAbort(void)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
|
/* asynchronously send ROLLBACK [PREPARED] */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
if (transaction->transactionState == REMOTE_TRANS_INVALID ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_2PC_ABORTING ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_ABORTED)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
StartRemoteTransactionAbort(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XXX: Should perform network IO for all connections in a non-blocking manner */
|
||||||
|
|
||||||
|
/* and wait for the results */
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
|
||||||
|
transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishRemoteTransactionAbort(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CheckTransactionHealth checks if any of the participating transactions in a
|
||||||
|
* coordinated transaction failed, and what consequence that should have.
|
||||||
|
* This needs to be called before the coordinated transaction commits (but
|
||||||
|
* after they've been PREPAREd if 2PC is in use).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CheckTransactionHealth(void)
|
||||||
|
{
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
|
dlist_foreach(iter, &InProgressTransactions)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
|
||||||
|
iter.cur);
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
|
||||||
|
|
||||||
|
/* if the connection is in a bad state, so is the transaction's state */
|
||||||
|
if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN)
|
||||||
|
{
|
||||||
|
transaction->transactionFailed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If a critical connection is marked as failed (and no error has been
|
||||||
|
* raised yet) do so now.
|
||||||
|
*/
|
||||||
|
if (transaction->transactionFailed && transaction->transactionCritical)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
|
||||||
|
connection->hostname, connection->port)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Assign2PCIdentifier compute the 2PC transaction name to use for a
|
||||||
|
* transaction.
|
||||||
|
*
|
||||||
|
* Every 2PC transaction should get a new name, i.e. this function will need
|
||||||
|
* to be called again.
|
||||||
|
*
|
||||||
|
* NB: we rely on the fact that we don't need to do full escaping on the names
|
||||||
|
* generated here.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
Assign2PCIdentifier(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
static uint64 sequence = 0;
|
||||||
|
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
|
||||||
|
"citus_%d_"UINT64_FORMAT,
|
||||||
|
MyProcPid, sequence++);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WarnAboutLeakedPreparedTransaction issues a WARNING explaining that a
|
||||||
|
* prepared transaction could not be committed or rolled back, and explains
|
||||||
|
* how to perform cleanup.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit)
|
||||||
|
{
|
||||||
|
StringInfoData command;
|
||||||
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
|
initStringInfo(&command);
|
||||||
|
|
||||||
|
if (commit)
|
||||||
|
{
|
||||||
|
appendStringInfo(&command, "COMMIT PREPARED '%s'",
|
||||||
|
transaction->preparedName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(&command, "ROLLBACK PREPARED '%s'",
|
||||||
|
transaction->preparedName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* log a warning so the user may abort the transaction later */
|
||||||
|
ereport(WARNING, (errmsg("failed to roll back prepared transaction '%s'",
|
||||||
|
transaction->preparedName),
|
||||||
|
errhint("Run \"%s\" on %s:%u",
|
||||||
|
command.data, connection->hostname, connection->port)));
|
||||||
|
}
|
|
@ -21,6 +21,8 @@
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
|
#include "distributed/multi_shard_transaction.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
#include "utils/guc.h"
|
#include "utils/guc.h"
|
||||||
|
@ -34,9 +36,18 @@ int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
|
||||||
/* state needed to keep track of operations used during a transaction */
|
/* state needed to keep track of operations used during a transaction */
|
||||||
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
|
|
||||||
|
/* list of connections that are part of the current coordinated transaction */
|
||||||
|
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
|
||||||
|
|
||||||
|
|
||||||
static bool subXactAbortAttempted = false;
|
static bool subXactAbortAttempted = false;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Should this coordinated transaction use 2PC? Set by
|
||||||
|
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
|
||||||
|
* MultiShardCommitProtocol was set to 2PC.
|
||||||
|
*/
|
||||||
|
static bool CurrentTransactionUse2PC = false;
|
||||||
|
|
||||||
/* transaction management functions */
|
/* transaction management functions */
|
||||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||||
|
@ -47,6 +58,64 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
|
||||||
static void AdjustMaxPreparedTransactions(void);
|
static void AdjustMaxPreparedTransactions(void);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BeginCoordinatedTransaction begins a coordinated transaction. No
|
||||||
|
* pre-existing coordinated transaction may be in progress.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
BeginCoordinatedTransaction(void)
|
||||||
|
{
|
||||||
|
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
|
||||||
|
CurrentCoordinatedTransactionState != COORD_TRANS_IDLE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("starting transaction in wrong state")));
|
||||||
|
}
|
||||||
|
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BeginOrContinueCoordinatedTransaction starts a coordinated transaction,
|
||||||
|
* unless one already is in progress.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
BeginOrContinueCoordinatedTransaction(void)
|
||||||
|
{
|
||||||
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BeginCoordinatedTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InCoordinatedTransaction returns whether a coordinated transaction has been
|
||||||
|
* started.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
InCoordinatedTransaction(void)
|
||||||
|
{
|
||||||
|
return CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
|
||||||
|
CurrentCoordinatedTransactionState != COORD_TRANS_IDLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoordinatedTransactionUse2PC() signals that the current coordinated
|
||||||
|
* transaction should use 2PC to commit.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CoordinatedTransactionUse2PC(void)
|
||||||
|
{
|
||||||
|
Assert(InCoordinatedTransaction());
|
||||||
|
|
||||||
|
CurrentTransactionUse2PC = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
InitializeTransactionManagement(void)
|
InitializeTransactionManagement(void)
|
||||||
{
|
{
|
||||||
|
@ -73,6 +142,20 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
{
|
{
|
||||||
case XACT_EVENT_COMMIT:
|
case XACT_EVENT_COMMIT:
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Call other parts of citus that need to integrate into
|
||||||
|
* transaction management. Do so before doing other work, so the
|
||||||
|
* callbacks still can perform work if needed.
|
||||||
|
*/
|
||||||
|
ResetShardPlacementTransactionState();
|
||||||
|
RouterExecutorPostCommit();
|
||||||
|
|
||||||
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
|
||||||
|
{
|
||||||
|
/* handles both already prepared and open transactions */
|
||||||
|
CoordinatedRemoteTransactionsCommit();
|
||||||
|
}
|
||||||
|
|
||||||
/* close connections etc. */
|
/* close connections etc. */
|
||||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
||||||
{
|
{
|
||||||
|
@ -82,11 +165,33 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
Assert(!subXactAbortAttempted);
|
Assert(!subXactAbortAttempted);
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
|
dlist_init(&InProgressTransactions);
|
||||||
|
CurrentTransactionUse2PC = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case XACT_EVENT_ABORT:
|
case XACT_EVENT_ABORT:
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* FIXME: Add warning for the COORD_TRANS_COMMITTED case. That
|
||||||
|
* can be reached if this backend fails after the
|
||||||
|
* XACT_EVENT_PRE_COMMIT state.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Call other parts of citus that need to integrate into
|
||||||
|
* transaction management. Do so before doing other work, so the
|
||||||
|
* callbacks still can perform work if needed.
|
||||||
|
*/
|
||||||
|
ResetShardPlacementTransactionState();
|
||||||
|
RouterExecutorPostCommit();
|
||||||
|
|
||||||
|
/* handles both already prepared and open transactions */
|
||||||
|
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
||||||
|
{
|
||||||
|
CoordinatedRemoteTransactionsAbort();
|
||||||
|
}
|
||||||
|
|
||||||
/* close connections etc. */
|
/* close connections etc. */
|
||||||
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
|
||||||
{
|
{
|
||||||
|
@ -95,6 +200,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||||
|
dlist_init(&InProgressTransactions);
|
||||||
|
CurrentTransactionUse2PC = false;
|
||||||
subXactAbortAttempted = false;
|
subXactAbortAttempted = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -118,6 +225,37 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
"which modify distributed tables")));
|
"which modify distributed tables")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* nothing further to do if there's no managed remote xacts */
|
||||||
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (CurrentTransactionUse2PC)
|
||||||
|
{
|
||||||
|
CoordinatedRemoteTransactionsPrepare();
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Have to commit remote transactions in PRE_COMMIT, to allow
|
||||||
|
* us to mark failed placements as invalid. Better don't use
|
||||||
|
* this for anything important (i.e. DDL/metadata).
|
||||||
|
*/
|
||||||
|
CoordinatedRemoteTransactionsCommit();
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Call other parts of citus that need to integrate into
|
||||||
|
* transaction management. Call them *after* committing/preparing
|
||||||
|
* the remote transactions, to allow marking shards as invalid
|
||||||
|
* (e.g. if the remote commit failed).
|
||||||
|
*/
|
||||||
|
RouterExecutorPreCommitCheck();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ typedef struct NodeConnectionKey
|
||||||
typedef struct NodeConnectionEntry
|
typedef struct NodeConnectionEntry
|
||||||
{
|
{
|
||||||
NodeConnectionKey cacheKey; /* hash entry key */
|
NodeConnectionKey cacheKey; /* hash entry key */
|
||||||
PGconn *connection; /* connection to remote server, if any */
|
MultiConnection *connection; /* connection to remote server, if any */
|
||||||
} NodeConnectionEntry;
|
} NodeConnectionEntry;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
#define CONNECTION_MANAGMENT_H
|
#define CONNECTION_MANAGMENT_H
|
||||||
|
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/remote_transaction.h"
|
||||||
#include "lib/ilist.h"
|
#include "lib/ilist.h"
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
@ -63,6 +64,12 @@ typedef struct MultiConnection
|
||||||
|
|
||||||
/* membership in list of list of connections in ConnectionHashEntry */
|
/* membership in list of list of connections in ConnectionHashEntry */
|
||||||
dlist_node connectionNode;
|
dlist_node connectionNode;
|
||||||
|
|
||||||
|
/* information about the associated remote transaction */
|
||||||
|
RemoteTransaction remoteTransaction;
|
||||||
|
|
||||||
|
/* membership in list of in-progress transactions */
|
||||||
|
dlist_node transactionNode;
|
||||||
} MultiConnection;
|
} MultiConnection;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,8 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList
|
||||||
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
|
||||||
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
||||||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||||
extern void RegisterRouterExecutorXactCallbacks(void);
|
extern void RouterExecutorPreCommitCheck(void);
|
||||||
|
extern void RouterExecutorPostCommit(void);
|
||||||
|
|
||||||
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 sh
|
||||||
bool *connectionsFound);
|
bool *connectionsFound);
|
||||||
extern List * ConnectionList(HTAB *connectionHash);
|
extern List * ConnectionList(HTAB *connectionHash);
|
||||||
extern void CloseConnections(List *connectionList);
|
extern void CloseConnections(List *connectionList);
|
||||||
extern void RegisterShardPlacementXactCallbacks(void);
|
extern void ResetShardPlacementTransactionState(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_SHARD_TRANSACTION_H */
|
#endif /* MULTI_SHARD_TRANSACTION_H */
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
|
||||||
|
|
||||||
|
struct pg_result; /* target of the PGresult typedef */
|
||||||
|
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
extern bool LogRemoteCommands;
|
extern bool LogRemoteCommands;
|
||||||
|
|
||||||
|
@ -31,6 +33,8 @@ extern void LogRemoteCommand(MultiConnection *connection, const char *command);
|
||||||
|
|
||||||
/* wrappers around libpq functions, with command logging support */
|
/* wrappers around libpq functions, with command logging support */
|
||||||
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
|
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
|
||||||
|
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
|
||||||
|
bool raiseInterrupts);
|
||||||
|
|
||||||
|
|
||||||
#endif /* REMOTE_COMMAND_H */
|
#endif /* REMOTE_COMMAND_H */
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
* remote_transaction.h
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#ifndef REMOTE_TRANSACTION_H
|
||||||
|
#define REMOTE_TRANSACTION_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
#include "lib/ilist.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* forward declare, to avoid recursive includes */
|
||||||
|
struct MultiConnection;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Enum that defines different remote transaction states, of a single remote
|
||||||
|
* transaction.
|
||||||
|
*/
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
/* no transaction active */
|
||||||
|
REMOTE_TRANS_INVALID = 0,
|
||||||
|
|
||||||
|
/* transaction start */
|
||||||
|
REMOTE_TRANS_STARTING,
|
||||||
|
REMOTE_TRANS_STARTED,
|
||||||
|
|
||||||
|
/* 2pc prepare */
|
||||||
|
REMOTE_TRANS_PREPARING,
|
||||||
|
REMOTE_TRANS_PREPARED,
|
||||||
|
|
||||||
|
/* transaction abort */
|
||||||
|
REMOTE_TRANS_1PC_ABORTING,
|
||||||
|
REMOTE_TRANS_2PC_ABORTING,
|
||||||
|
REMOTE_TRANS_ABORTED,
|
||||||
|
|
||||||
|
/* transaction commit */
|
||||||
|
REMOTE_TRANS_1PC_COMMITTING,
|
||||||
|
REMOTE_TRANS_2PC_COMMITTING,
|
||||||
|
REMOTE_TRANS_COMMITTED
|
||||||
|
} RemoteTransactionState;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Transaction state associated associated with a single MultiConnection.
|
||||||
|
*/
|
||||||
|
typedef struct RemoteTransaction
|
||||||
|
{
|
||||||
|
/* what state is the remote side transaction in */
|
||||||
|
RemoteTransactionState transactionState;
|
||||||
|
|
||||||
|
/* failures on this connection should abort entire coordinated transaction */
|
||||||
|
bool transactionCritical;
|
||||||
|
|
||||||
|
/* failed in current transaction */
|
||||||
|
bool transactionFailed;
|
||||||
|
|
||||||
|
/* 2PC transaction name currently associated with connection */
|
||||||
|
char preparedName[NAMEDATALEN];
|
||||||
|
} RemoteTransaction;
|
||||||
|
|
||||||
|
|
||||||
|
/* change an individual remote transaction's state */
|
||||||
|
extern void StartRemoteTransactionBegin(struct MultiConnection *connection);
|
||||||
|
extern void FinishRemoteTransactionBegin(struct MultiConnection *connection);
|
||||||
|
extern void RemoteTransactionBegin(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
extern void StartRemoteTransactionPrepare(struct MultiConnection *connection);
|
||||||
|
extern void FinishRemoteTransactionPrepare(struct MultiConnection *connection);
|
||||||
|
extern void RemoteTransactionPrepare(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
extern void StartRemoteTransactionCommit(struct MultiConnection *connection);
|
||||||
|
extern void FinishRemoteTransactionCommit(struct MultiConnection *connection);
|
||||||
|
extern void RemoteTransactionCommit(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
extern void StartRemoteTransactionAbort(struct MultiConnection *connection);
|
||||||
|
extern void FinishRemoteTransactionAbort(struct MultiConnection *connection);
|
||||||
|
extern void RemoteTransactionAbort(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
/* start transaction if necessary */
|
||||||
|
extern void RemoteTransactionBeginIfNecessary(struct MultiConnection *connection);
|
||||||
|
extern void RemoteTransactionsBeginIfNecessary(List *connectionList);
|
||||||
|
|
||||||
|
/* other public functionality */
|
||||||
|
extern void MarkRemoteTransactionFailed(struct MultiConnection *connection,
|
||||||
|
bool allowErrorPromotion);
|
||||||
|
extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The following functions should all only be called by connection /
|
||||||
|
* transaction managment code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
extern void CloseRemoteTransaction(struct MultiConnection *connection);
|
||||||
|
extern void ResetRemoteTransaction(struct MultiConnection *connection);
|
||||||
|
|
||||||
|
/* perform handling for all in-progress transactions */
|
||||||
|
extern void CoordinatedRemoteTransactionsPrepare(void);
|
||||||
|
extern void CoordinatedRemoteTransactionsCommit(void);
|
||||||
|
extern void CoordinatedRemoteTransactionsAbort(void);
|
||||||
|
|
||||||
|
#endif /* REMOTE_TRANSACTION_H */
|
|
@ -9,6 +9,8 @@
|
||||||
#ifndef TRANSACTION_MANAGMENT_H
|
#ifndef TRANSACTION_MANAGMENT_H
|
||||||
#define TRANSACTION_MANAGMENT_H
|
#define TRANSACTION_MANAGMENT_H
|
||||||
|
|
||||||
|
#include "lib/ilist.h"
|
||||||
|
|
||||||
/* describes what kind of modifications have occurred in the current transaction */
|
/* describes what kind of modifications have occurred in the current transaction */
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
|
@ -29,7 +31,16 @@ typedef enum CoordinatedTransactionState
|
||||||
COORD_TRANS_NONE,
|
COORD_TRANS_NONE,
|
||||||
|
|
||||||
/* no coordinated transaction in progress, but connections established */
|
/* no coordinated transaction in progress, but connections established */
|
||||||
COORD_TRANS_IDLE
|
COORD_TRANS_IDLE,
|
||||||
|
|
||||||
|
/* coordinated transaction in progress */
|
||||||
|
COORD_TRANS_STARTED,
|
||||||
|
|
||||||
|
/* coordinated transaction prepared on all workers */
|
||||||
|
COORD_TRANS_PREPARED,
|
||||||
|
|
||||||
|
/* coordinated transaction committed */
|
||||||
|
COORD_TRANS_COMMITTED
|
||||||
} CoordinatedTransactionState;
|
} CoordinatedTransactionState;
|
||||||
|
|
||||||
|
|
||||||
|
@ -46,9 +57,18 @@ extern int MultiShardCommitProtocol;
|
||||||
/* state needed to prevent new connections during modifying transactions */
|
/* state needed to prevent new connections during modifying transactions */
|
||||||
extern XactModificationType XactModificationLevel;
|
extern XactModificationType XactModificationLevel;
|
||||||
|
|
||||||
|
|
||||||
extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
|
extern CoordinatedTransactionState CurrentCoordinatedTransactionState;
|
||||||
|
|
||||||
|
/* list of connections that are part of the current coordinated transaction */
|
||||||
|
extern dlist_head InProgressTransactions;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Coordinated transaction management.
|
||||||
|
*/
|
||||||
|
extern void BeginCoordinatedTransaction(void);
|
||||||
|
extern void BeginOrContinueCoordinatedTransaction(void);
|
||||||
|
extern bool InCoordinatedTransaction(void);
|
||||||
|
extern void CoordinatedTransactionUse2PC(void);
|
||||||
|
|
||||||
/* initialization function(s) */
|
/* initialization function(s) */
|
||||||
extern void InitializeTransactionManagement(void);
|
extern void InitializeTransactionManagement(void);
|
||||||
|
|
|
@ -80,14 +80,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- see that our first multi shard INSERT...SELECT works expected
|
-- see that our first multi shard INSERT...SELECT works expected
|
||||||
SET client_min_messages TO INFO;
|
SET client_min_messages TO INFO;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
|
@ -283,14 +275,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
user_id | time | value_1 | value_2 | value_3 | value_4
|
user_id | time | value_1 | value_2 | value_3 | value_4
|
||||||
---------+------+---------+---------+---------+---------
|
---------+------+---------+---------+---------+---------
|
||||||
9 | | 90 | | 9000 |
|
9 | | 90 | | 9000 |
|
||||||
|
@ -363,14 +347,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- group by column not exists on the SELECT target list
|
-- group by column not exists on the SELECT target list
|
||||||
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
|
INSERT INTO agg_events (value_3_agg, value_4_agg, value_1_agg, user_id)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -574,14 +550,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- add one more level subqueris on top of subquery JOINs
|
-- add one more level subqueris on top of subquery JOINs
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(user_id, value_4_agg)
|
(user_id, value_4_agg)
|
||||||
|
@ -659,14 +627,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- subqueries in WHERE clause
|
-- subqueries in WHERE clause
|
||||||
INSERT INTO raw_events_second
|
INSERT INTO raw_events_second
|
||||||
(user_id)
|
(user_id)
|
||||||
|
@ -711,14 +671,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- some UPSERTS
|
-- some UPSERTS
|
||||||
INSERT INTO agg_events AS ae
|
INSERT INTO agg_events AS ae
|
||||||
(
|
(
|
||||||
|
@ -758,14 +710,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- upserts with returning
|
-- upserts with returning
|
||||||
INSERT INTO agg_events AS ae
|
INSERT INTO agg_events AS ae
|
||||||
(
|
(
|
||||||
|
@ -806,14 +750,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
user_id | value_1_agg
|
user_id | value_1_agg
|
||||||
---------+-------------
|
---------+-------------
|
||||||
7 |
|
7 |
|
||||||
|
@ -848,14 +784,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- FILTER CLAUSE
|
-- FILTER CLAUSE
|
||||||
INSERT INTO agg_events (user_id, value_1_agg)
|
INSERT INTO agg_events (user_id, value_1_agg)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -886,14 +814,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- a test with reference table JOINs
|
-- a test with reference table JOINs
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
agg_events (user_id, value_1_agg)
|
agg_events (user_id, value_1_agg)
|
||||||
|
@ -929,14 +849,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- a note on the outer joins is that
|
-- a note on the outer joins is that
|
||||||
-- we filter out outer join results
|
-- we filter out outer join results
|
||||||
-- where partition column returns
|
-- where partition column returns
|
||||||
|
@ -1024,14 +936,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
SET client_min_messages TO INFO;
|
SET client_min_messages TO INFO;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
DEBUG: StartTransaction
|
DEBUG: StartTransaction
|
||||||
|
@ -1094,14 +998,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- we don't want to see constraint vialotions, so truncate first
|
-- we don't want to see constraint vialotions, so truncate first
|
||||||
SET client_min_messages TO INFO;
|
SET client_min_messages TO INFO;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
|
@ -1168,14 +1064,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
-- We do not support any set operations
|
-- We do not support any set operations
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
raw_events_first(user_id)
|
raw_events_first(user_id)
|
||||||
|
@ -1547,14 +1435,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
SET client_min_messages TO INFO;
|
SET client_min_messages TO INFO;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
DEBUG: StartTransaction
|
DEBUG: StartTransaction
|
||||||
|
@ -1695,13 +1575,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300001
|
|
||||||
DEBUG: sent COMMIT over connection 13300000
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300002
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
DEBUG: sent COMMIT over connection 13300003
|
|
||||||
SET client_min_messages TO INFO;
|
SET client_min_messages TO INFO;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
DEBUG: StartTransaction
|
DEBUG: StartTransaction
|
||||||
|
@ -1784,10 +1657,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- see that defaults are filled
|
-- see that defaults are filled
|
||||||
INSERT INTO table_with_defaults (store_id, first_name)
|
INSERT INTO table_with_defaults (store_id, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1806,10 +1675,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- shuffle one of the defaults and skip the other
|
-- shuffle one of the defaults and skip the other
|
||||||
INSERT INTO table_with_defaults (default_2, store_id, first_name)
|
INSERT INTO table_with_defaults (default_2, store_id, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1828,10 +1693,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- shuffle both defaults
|
-- shuffle both defaults
|
||||||
INSERT INTO table_with_defaults (default_2, store_id, default_1, first_name)
|
INSERT INTO table_with_defaults (default_2, store_id, default_1, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1850,10 +1711,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- use constants instead of non-default column
|
-- use constants instead of non-default column
|
||||||
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name)
|
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1872,10 +1729,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- use constants instead of non-default column and skip both defauls
|
-- use constants instead of non-default column and skip both defauls
|
||||||
INSERT INTO table_with_defaults (last_name, store_id, first_name)
|
INSERT INTO table_with_defaults (last_name, store_id, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1894,10 +1747,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- use constants instead of default columns
|
-- use constants instead of default columns
|
||||||
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1)
|
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1916,10 +1765,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- use constants instead of both default columns and non-default columns
|
-- use constants instead of both default columns and non-default columns
|
||||||
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1)
|
INSERT INTO table_with_defaults (default_2, last_name, store_id, first_name, default_1)
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -1938,10 +1783,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- some of the the ultimate queries where we have constants,
|
-- some of the the ultimate queries where we have constants,
|
||||||
-- defaults and group by entry is not on the target entry
|
-- defaults and group by entry is not on the target entry
|
||||||
INSERT INTO table_with_defaults (default_2, store_id, first_name)
|
INSERT INTO table_with_defaults (default_2, store_id, first_name)
|
||||||
|
@ -1963,10 +1804,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2)
|
INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2)
|
||||||
SELECT
|
SELECT
|
||||||
1000, store_id, 'Andres', '2000'
|
1000, store_id, 'Andres', '2000'
|
||||||
|
@ -1986,10 +1823,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2)
|
INSERT INTO table_with_defaults (default_1, store_id, first_name, default_2)
|
||||||
SELECT
|
SELECT
|
||||||
1000, store_id, 'Andres', '2000'
|
1000, store_id, 'Andres', '2000'
|
||||||
|
@ -2009,10 +1842,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
INSERT INTO table_with_defaults (default_1, store_id, first_name)
|
INSERT INTO table_with_defaults (default_1, store_id, first_name)
|
||||||
SELECT
|
SELECT
|
||||||
1000, store_id, 'Andres'
|
1000, store_id, 'Andres'
|
||||||
|
@ -2032,10 +1861,6 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: CommitTransactionCommand
|
DEBUG: CommitTransactionCommand
|
||||||
DEBUG: CommitTransaction
|
DEBUG: CommitTransaction
|
||||||
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
DEBUG: name: unnamed; blockState: STARTED; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300013
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
DEBUG: sent COMMIT over connection 13300014
|
|
||||||
-- set back to the default
|
-- set back to the default
|
||||||
SET citus.shard_count TO DEFAULT;
|
SET citus.shard_count TO DEFAULT;
|
||||||
DEBUG: StartTransactionCommand
|
DEBUG: StartTransactionCommand
|
||||||
|
|
|
@ -45,8 +45,6 @@ CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
|
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
|
||||||
DEBUG: sent COMMIT over connection 650000
|
|
||||||
DEBUG: sent COMMIT over connection 650001
|
|
||||||
CREATE TABLE orders_hash (
|
CREATE TABLE orders_hash (
|
||||||
o_orderkey bigint not null,
|
o_orderkey bigint not null,
|
||||||
o_custkey integer not null,
|
o_custkey integer not null,
|
||||||
|
|
|
@ -442,6 +442,7 @@ INSERT INTO objects VALUES (2, 'BAD');
|
||||||
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
|
INSERT INTO labs VALUES (9, 'Umbrella Corporation');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
|
WARNING: failed to commit transaction on localhost:57638
|
||||||
-- data should be persisted
|
-- data should be persisted
|
||||||
SELECT * FROM objects WHERE id = 2;
|
SELECT * FROM objects WHERE id = 2;
|
||||||
id | name
|
id | name
|
||||||
|
@ -490,7 +491,9 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
INSERT INTO labs VALUES (9, 'BAD');
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
|
WARNING: failed to commit transaction on localhost:57637
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
|
WARNING: failed to commit transaction on localhost:57638
|
||||||
ERROR: could not commit transaction on any active nodes
|
ERROR: could not commit transaction on any active nodes
|
||||||
-- data should NOT be persisted
|
-- data should NOT be persisted
|
||||||
SELECT * FROM objects WHERE id = 1;
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
@ -526,6 +529,7 @@ INSERT INTO labs VALUES (8, 'Aperture Science');
|
||||||
INSERT INTO labs VALUES (9, 'BAD');
|
INSERT INTO labs VALUES (9, 'BAD');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: illegal value
|
WARNING: illegal value
|
||||||
|
WARNING: failed to commit transaction on localhost:57637
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
-- data to objects should be persisted, but labs should not...
|
-- data to objects should be persisted, but labs should not...
|
||||||
SELECT * FROM objects WHERE id = 1;
|
SELECT * FROM objects WHERE id = 1;
|
||||||
|
|
|
@ -119,10 +119,6 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE
|
||||||
DEBUG: predicate pruning for shardId 350001
|
DEBUG: predicate pruning for shardId 350001
|
||||||
DEBUG: predicate pruning for shardId 350002
|
DEBUG: predicate pruning for shardId 350002
|
||||||
DEBUG: predicate pruning for shardId 350003
|
DEBUG: predicate pruning for shardId 350003
|
||||||
DEBUG: sent PREPARE TRANSACTION over connection 350000
|
|
||||||
DEBUG: sent PREPARE TRANSACTION over connection 350000
|
|
||||||
DEBUG: sent COMMIT PREPARED over connection 350000
|
|
||||||
DEBUG: sent COMMIT PREPARED over connection 350000
|
|
||||||
master_modify_multiple_shards
|
master_modify_multiple_shards
|
||||||
-------------------------------
|
-------------------------------
|
||||||
1
|
1
|
||||||
|
|
|
@ -559,7 +559,7 @@ COMMIT;
|
||||||
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
||||||
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
ERROR: failed to prepare transaction
|
ERROR: failure on connection marked as essential: localhost:57638
|
||||||
-- Nothing from the block should have committed
|
-- Nothing from the block should have committed
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
indexname | tablename
|
indexname | tablename
|
||||||
|
@ -574,7 +574,10 @@ NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: failed to commit transaction on localhost:57638
|
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
||||||
|
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync
|
||||||
-- The block should have committed with a warning
|
-- The block should have committed with a warning
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
indexname | tablename
|
indexname | tablename
|
||||||
|
|
Loading…
Reference in New Issue