Merge pull request #864 from citusdata/migrate_worker_transactions

Add worker transaction and transaction recovery infrastructure
pull/843/head
Eren Başak 2016-10-18 14:25:12 +03:00 committed by GitHub
commit 0bce20dd74
26 changed files with 1420 additions and 97 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -76,6 +76,8 @@ $(EXTENSION)--6.0-8.sql: $(EXTENSION)--6.0-7.sql $(EXTENSION)--6.0-7--6.0-8.sql
cat $^ > $@
$(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql
cat $^ > $@
$(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -1,5 +1,5 @@
/*
* Replace oid column in pg_dist_shard_placement with an sequence column.
* Replace oid column in pg_dist_shard_placement with a sequence column.
*/
CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq
NO CYCLE;

View File

@ -1,3 +1,5 @@
/* citus--6.0-8--6.0-9.sql */
CREATE TABLE citus.pg_dist_local_group(
groupid int NOT NULL PRIMARY KEY)
;

View File

@ -0,0 +1,24 @@
/* citus--6.0-9--6.0-10.sql */
CREATE TABLE citus.pg_dist_transaction (
groupid int NOT NULL,
gid text NOT NULL
);
CREATE INDEX pg_dist_transaction_group_index
ON citus.pg_dist_transaction using btree(groupid);
ALTER TABLE citus.pg_dist_transaction SET SCHEMA pg_catalog;
ALTER TABLE pg_catalog.pg_dist_transaction
ADD CONSTRAINT pg_dist_transaction_unique_constraint UNIQUE (groupid, gid);
GRANT SELECT ON pg_catalog.pg_dist_transaction TO public;
CREATE FUNCTION recover_prepared_transactions()
RETURNS int
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$recover_prepared_transactions$$;
COMMENT ON FUNCTION recover_prepared_transactions()
IS 'recover prepared transactions started by this node';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-9'
default_version = '6.0-10'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -956,6 +956,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
int workerGroupId = 0;
char *nodeUser = CurrentUserName();
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
@ -963,6 +965,17 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
StringInfo copyCommand = NULL;
PGresult *result = NULL;
/*
* When a copy is initiated from a worker, the information about the connected
* worker node may not be found if pg_dist_node entries are not synced to this
* node. In that case we leave the groupId as 0. Fortunately, it is unused since
* COPY from a worker does not initiate a 2PC.
*/
if (workerNode != NULL)
{
workerGroupId = workerNode->groupId;
}
if (connection == NULL)
{
if (stopOnFailure)
@ -1003,9 +1016,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerGroupId;
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED;
transactionConnection->connection = connection;
transactionConnection->nodeName = nodeName;
transactionConnection->nodePort = nodePort;
connectionList = lappend(connectionList, transactionConnection);
}

View File

@ -27,6 +27,7 @@
#include "distributed/resource_lock.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "storage/lock.h"
@ -53,9 +54,6 @@ static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNod
int32 sourceNodePort);
static char * ConstructQualifiedShardName(ShardInterval *shardInterval);
static List * RecreateTableDDLCommandList(Oid relationId);
static void SendCommandListInSingleTransaction(char *nodeName, int32 nodePort,
List *commandList);
static char * CitusExtensionOwnerName(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
@ -283,6 +281,7 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
List *ddlCommandList = NIL;
char *nodeUser = CitusExtensionOwnerName();
foreach(colocatedShardCell, colocatedShardList)
{
@ -297,7 +296,8 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
HOLD_INTERRUPTS();
SendCommandListInSingleTransaction(targetNodeName, targetNodePort, ddlCommandList);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, nodeUser,
ddlCommandList);
foreach(colocatedShardCell, colocatedShardList)
{
@ -443,87 +443,3 @@ RecreateTableDDLCommandList(Oid relationId)
return recreateCommandList;
}
/*
* SendCommandListInSingleTransaction opens connection to the node with the given
* nodeName and nodePort. Then, the connection starts a transaction on the remote
* node and executes the commands in the transaction. The function raises error if
* any of the queries fails.
*
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
*/
static void
SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, List *commandList)
{
char *nodeUser = CitusExtensionOwnerName();
PGconn *workerConnection = NULL;
PGresult *queryResult = NULL;
ListCell *commandCell = NULL;
workerConnection = ConnectToNode(nodeName, nodePort, nodeUser);
if (workerConnection == NULL)
{
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s",
nodeName, nodePort, nodeUser)));
}
/* start the transaction on the worker node */
queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
}
/* commit the transaction on the worker node */
queryResult = PQexec(workerConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
}
/*
* CitusExtensionOwnerName returns the name of the owner of the extension.
*
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
*/
static char *
CitusExtensionOwnerName(void)
{
Oid superUserId = CitusExtensionOwner();
#if (PG_VERSION_NUM < 90500)
return GetUserNameFromId(superUserId);
#else
return GetUserNameFromId(superUserId, false);
#endif
}

View File

@ -26,10 +26,6 @@
static uint32 DistributedTransactionId = 0;
/* Local functions forward declarations */
static StringInfo BuildTransactionName(int connectionId);
/* the commit protocol to use for COPY commands */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
@ -252,7 +248,7 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
* transaction, which causes it to be rolled back. In general, the user
* should ensure that prepared transactions do not linger.
*/
static StringInfo
StringInfo
BuildTransactionName(int connectionId)
{
StringInfo commandString = makeStringInfo();

View File

@ -17,6 +17,7 @@
#include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/worker_manager.h"
#include "nodes/pg_list.h"
#include "storage/ipc.h"
#include "utils/memutils.h"
@ -126,8 +127,16 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
PGconn *connection = NULL;
TransactionConnection *transactionConnection = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
shardPlacement->nodePort);
PGresult *result = NULL;
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node %s:%d",
shardPlacement->nodeName, shardPlacement->nodePort)));
}
connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort,
userName);
@ -142,9 +151,12 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerNode->groupId;
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
transactionConnection->connection = connection;
transactionConnection->nodeName = shardPlacement->nodeName;
transactionConnection->nodePort = shardPlacement->nodePort;
shardConnections->connectionList = lappend(shardConnections->connectionList,
transactionConnection);

View File

@ -0,0 +1,545 @@
/*-------------------------------------------------------------------------
*
* transaction_recovery.c
*
* Routines for recovering two-phase commits started by this node if a
* failure occurs between prepare and commit/abort.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/relscan.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_transaction.h"
#include "distributed/transaction_recovery.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/memutils.h"
#include "utils/rel.h"
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(recover_prepared_transactions);
/* Local functions forward declarations */
static void LogTransactionRecord(int groupId, char *transactionName);
static int RecoverPreparedTransactions(void);
static int RecoverWorkerTransactions(WorkerNode *workerNode);
static List * NameListDifference(List *nameList, List *subtractList);
static int CompareNames(const void *leftPointer, const void *rightPointer);
static bool FindMatchingName(char **nameArray, int nameCount, char *needle,
int *matchIndex);
static List * PendingWorkerTransactionList(PGconn *connection);
static List * UnconfirmedWorkerTransactionsList(int groupId);
static void DeleteTransactionRecord(int32 groupId, char *transactionName);
/*
* recover_prepared_transactions recovers any pending prepared
* transactions started by this node on other nodes.
*/
Datum
recover_prepared_transactions(PG_FUNCTION_ARGS)
{
int recoveredTransactionCount = 0;
recoveredTransactionCount = RecoverPreparedTransactions();
PG_RETURN_INT32(recoveredTransactionCount);
}
/*
* LogPreparedTransactions logs a commit record for all prepared transactions
* on connections in connectionList. The remote transaction is safe to commit
* once the record has been durably stored (i.e. the local transaction is
* committed).
*/
void
LogPreparedTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
char transactionState PG_USED_FOR_ASSERTS_ONLY =
transactionConnection->transactionState;
int groupId = transactionConnection->groupId;
int64 connectionId = transactionConnection->connectionId;
StringInfo transactionName = BuildTransactionName(connectionId);
Assert(transactionState == TRANSACTION_STATE_PREPARED);
LogTransactionRecord(groupId, transactionName->data);
}
}
/*
* LogTransactionRecord registers the fact that a transaction has been
* prepared on a worker. The presence of this record indicates that the
* prepared transaction should be committed.
*/
static void
LogTransactionRecord(int groupId, char *transactionName)
{
Relation pgDistTransaction = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_transaction];
bool isNulls[Natts_pg_dist_transaction];
/* form new transaction tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId);
values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName);
/* open transaction relation and insert new tuple */
pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistTransaction);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistTransaction, heapTuple);
CatalogUpdateIndexes(pgDistTransaction, heapTuple);
CommandCounterIncrement();
/* close relation and invalidate previous cache entry */
heap_close(pgDistTransaction, RowExclusiveLock);
}
/*
* RecoverPreparedTransactions recovers any pending prepared
* transactions started by this node on other nodes.
*/
static int
RecoverPreparedTransactions(void)
{
List *workerList = NIL;
ListCell *workerNodeCell = NULL;
int recoveredTransactionCount = 0;
/*
* We block here if metadata transactions are ongoing, since we
* mustn't commit/abort their prepared transactions under their
* feet. We also prevent concurrent recovery.
*/
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
workerList = WorkerNodeList();
foreach(workerNodeCell, workerList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
recoveredTransactionCount += RecoverWorkerTransactions(workerNode);
}
return recoveredTransactionCount;
}
/*
* RecoverWorkerTransactions recovers any pending prepared transactions
* started by this node on the specified worker.
*/
static int
RecoverWorkerTransactions(WorkerNode *workerNode)
{
int recoveredTransactionCount = 0;
int groupId = workerNode->groupId;
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
List *pendingTransactionList = NIL;
ListCell *pendingTransactionCell = NULL;
List *unconfirmedTransactionList = NIL;
char **unconfirmedTransactionArray = NULL;
int unconfirmedTransactionCount = 0;
int unconfirmedTransactionIndex = 0;
List *committedTransactionList = NIL;
ListCell *committedTransactionCell = NULL;
MemoryContext localContext = NULL;
MemoryContext oldContext = NULL;
PGconn *connection = GetOrEstablishConnection(nodeName, nodePort);
if (connection == NULL)
{
/* cannot recover transactions on this worker right now */
return 0;
}
localContext = AllocSetContextCreate(CurrentMemoryContext,
"RecoverWorkerTransactions",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldContext = MemoryContextSwitchTo(localContext);
/* find transactions that were committed, but not yet confirmed */
unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId);
unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames);
/* convert list to an array to use with FindMatchingNames */
unconfirmedTransactionCount = list_length(unconfirmedTransactionList);
unconfirmedTransactionArray =
(char **) PointerArrayFromList(unconfirmedTransactionList);
/* find stale prepared transactions on the remote node */
pendingTransactionList = PendingWorkerTransactionList(connection);
pendingTransactionList = SortList(pendingTransactionList, CompareNames);
/*
* Transactions that have no pending prepared transaction are assumed to
* have been committed. Any records in unconfirmedTransactionList that
* don't have a transaction in pendingTransactionList can be removed.
*/
committedTransactionList = NameListDifference(unconfirmedTransactionList,
pendingTransactionList);
/*
* For each pending prepared transaction, check whether there is a transaction
* record. If so, commit. If not, the transaction that started the transaction
* must have rolled back and thus the prepared transaction should be aborted.
*/
foreach(pendingTransactionCell, pendingTransactionList)
{
char *transactionName = (char *) lfirst(pendingTransactionCell);
StringInfo command = makeStringInfo();
PGresult *result = NULL;
bool shouldCommit = FindMatchingName(unconfirmedTransactionArray,
unconfirmedTransactionCount,
transactionName,
&unconfirmedTransactionIndex);
if (shouldCommit)
{
/* should have committed this prepared transaction */
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName);
}
else
{
/* no record of this prepared transaction, abort */
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName);
}
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
PQclear(result);
/* cannot recover this transaction right now */
continue;
}
PQclear(result);
ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d",
nodeName, nodePort),
errcontext("%s", command->data)));
if (shouldCommit)
{
committedTransactionList = lappend(committedTransactionList,
transactionName);
}
recoveredTransactionCount += 1;
}
/* we can remove the transaction records of confirmed transactions */
foreach(committedTransactionCell, committedTransactionList)
{
char *transactionName = (char *) lfirst(committedTransactionCell);
DeleteTransactionRecord(groupId, transactionName);
}
MemoryContextReset(localContext);
MemoryContextSwitchTo(oldContext);
return recoveredTransactionCount;
}
/*
* NameListDifference returns the difference between the bag of
* names in nameList and subtractList. Both are assumed to be
* sorted. We cannot use list_difference_ptr here since we need
* to compare the actual strings.
*/
static List *
NameListDifference(List *nameList, List *subtractList)
{
List *differenceList = NIL;
ListCell *nameCell = NULL;
int subtractIndex = 0;
int subtractCount = list_length(subtractList);
char **subtractArray = (char **) PointerArrayFromList(subtractList);
foreach(nameCell, nameList)
{
char *baseName = (char *) lfirst(nameCell);
bool nameFound = FindMatchingName(subtractArray, subtractCount,
baseName, &subtractIndex);
if (!nameFound)
{
/*
* baseName is not in subtractArray and thus included
* in the difference.
*/
differenceList = lappend(differenceList, baseName);
}
}
pfree(subtractArray);
return differenceList;
}
/*
* CompareNames compares names using strncmp. Its signature allows it to
* be used in qsort.
*/
static int
CompareNames(const void *leftPointer, const void *rightPointer)
{
const char *leftString = *((char **) leftPointer);
const char *rightString = *((char **) rightPointer);
int nameCompare = strncmp(leftString, rightString, NAMEDATALEN);
return nameCompare;
}
/*
* FindMatchingName searches for name in nameArray, starting at the
* value pointed to by matchIndex and stopping at the first index of
* name which is greater or equal to needle. nameArray is assumed
* to be sorted.
*
* The function sets matchIndex to the index of the name and returns
* true if the name is equal to needle. If matchIndex >= nameCount,
* then the function always returns false.
*/
static bool
FindMatchingName(char **nameArray, int nameCount, char *needle,
int *matchIndex)
{
bool foundMatchingName = false;
int searchIndex = *matchIndex;
int compareResult = -1;
while (searchIndex < nameCount)
{
char *testName = nameArray[searchIndex];
compareResult = strncmp(needle, testName, NAMEDATALEN);
if (compareResult <= 0)
{
break;
}
searchIndex++;
}
*matchIndex = searchIndex;
if (compareResult == 0)
{
foundMatchingName = true;
}
return foundMatchingName;
}
/*
* PendingWorkerTransactionList returns a list of pending prepared
* transactions on a remote node that were started by this node.
*/
static List *
PendingWorkerTransactionList(PGconn *connection)
{
StringInfo command = makeStringInfo();
PGresult *result = NULL;
int rowCount = 0;
int rowIndex = 0;
List *transactionNames = NIL;
int coordinatorId = 0;
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus_%d_%%'",
coordinatorId);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
ReraiseRemoteError(connection, result);
}
rowCount = PQntuples(result);
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
const int columnIndex = 0;
char *transactionName = PQgetvalue(result, rowIndex, columnIndex);
transactionNames = lappend(transactionNames, pstrdup(transactionName));
}
PQclear(result);
return transactionNames;
}
/*
* UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions
* for a group of workers from pg_dist_transaction. A transaction is confirmed
* once we have verified that it does not exist in pg_prepared_xacts on the
* remote node and the entry in pg_dist_transaction is removed.
*/
static List *
UnconfirmedWorkerTransactionsList(int groupId)
{
List *transactionNameList = NIL;
Relation pgDistTransaction = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionGroupIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
bool isNull = false;
Datum transactionNameDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_gid,
tupleDescriptor, &isNull);
char *transactionName = TextDatumGetCString(transactionNameDatum);
transactionNameList = lappend(transactionNameList, transactionName);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgDistTransaction, AccessShareLock);
return transactionNameList;
}
/*
* DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the
* first (unique) row that corresponds to the given transactionName and worker node,
* and deletes this row.
*/
static void
DeleteTransactionRecord(int32 groupId, char *transactionName)
{
Relation pgDistTransaction = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
bool heapTupleFound = false;
pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionGroupIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction);
bool isNull = false;
Datum gidDatum = heap_getattr(heapTuple,
Anum_pg_dist_transaction_gid,
tupleDescriptor, &isNull);
char *gid = TextDatumGetCString(gidDatum);
if (strncmp(transactionName, gid, NAMEDATALEN) == 0)
{
heapTupleFound = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* if we couldn't find the transaction record to delete, error out */
if (!heapTupleFound)
{
ereport(ERROR, (errmsg("could not find valid entry for transaction record "
"'%s' in group %d",
transactionName, groupId)));
}
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistTransaction, RowExclusiveLock);
}

View File

@ -0,0 +1,518 @@
/*-------------------------------------------------------------------------
*
* worker_transaction.c
*
* Routines for performing transactions across all workers.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/xact.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_transaction.h"
#include "distributed/transaction_recovery.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "utils/memutils.h"
/* Local functions forward declarations */
static void EnableXactCallback(void);
static void CompleteWorkerTransactions(XactEvent event, void *arg);
static List * OpenWorkerTransactions(void);
static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort);
static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet);
static bool IsResponseOK(ExecStatusType resultStatus);
/* Global worker connection list */
static List *workerConnectionList = NIL;
static bool isXactCallbackRegistered = false;
/*
* GetWorkerTransactions opens connections to all workers and starts
* a transaction block that is committed or aborted when the local
* transaction commits or aborts. Multiple invocations of
* GetWorkerTransactions within the same transaction will return
* the same list of connections.
*/
List *
GetWorkerTransactions(void)
{
if (workerConnectionList == NIL)
{
InitializeDistributedTransaction();
EnableXactCallback();
workerConnectionList = OpenWorkerTransactions();
}
/* ensure that number of workers has not change */
Assert(list_length(WorkerNodeList()) == list_length(workerConnectionList));
return workerConnectionList;
}
/*
* SendCommandToWorker sends a command to a particular worker as part of the
* 2PC.
*/
void
SendCommandToWorker(char *nodeName, int32 nodePort, char *command)
{
TransactionConnection *transactionConnection = NULL;
PGresult *queryResult = NULL;
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
transactionConnection = GetWorkerTransaction(nodeName, nodePort);
if (transactionConnection == NULL)
{
ereport(ERROR, (errmsg("worker %s:%d is not part of current transaction",
nodeName, nodePort)));
}
queryResult = PQexec(transactionConnection->connection, command);
resultStatus = PQresultStatus(queryResult);
if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK)
{
ReraiseRemoteError(transactionConnection->connection, queryResult);
}
PQclear(queryResult);
}
/*
* SendCommandToWorkers sends a command to all workers in
* parallel. Commands are committed on the workers when the local
* transaction commits. The connection are made as the extension
* owner to ensure write access to the Citus metadata tables.
*/
void
SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command)
{
SendCommandToWorkersParams(targetWorkerSet, command, 0, NULL, NULL);
}
/*
* SendCommandToWorkersParams sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The
* connection are made as the extension owner to ensure write access to the Citus
* metadata tables. Parameters can be specified as for PQexecParams, except that
* paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0
* respectively.
*/
void
SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues)
{
ListCell *connectionCell = NULL;
List *targetConnectionList = GetTargetWorkerTransactions(targetWorkerSet);
foreach(connectionCell, targetConnectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int querySent = PQsendQueryParams(connection, command, parameterCount,
parameterTypes, parameterValues, NULL, NULL, 0);
if (querySent == 0)
{
ReraiseRemoteError(connection, NULL);
}
}
foreach(connectionCell, targetConnectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PGresult *result = PQgetResult(connection);
ExecStatusType resultStatus = PQresultStatus(result);
if (!IsResponseOK(resultStatus))
{
ReraiseRemoteError(connection, result);
}
PQclear(result);
/* clear NULL result */
PQgetResult(connection);
}
}
/*
* SendCommandListInSingleTransaction opens connection to the node with the given
* nodeName and nodePort. Then, the connection starts a transaction on the remote
* node and executes the commands in the transaction. The function raises error if
* any of the queries fails.
*/
void
SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser,
List *commandList)
{
PGconn *workerConnection = NULL;
PGresult *queryResult = NULL;
ListCell *commandCell = NULL;
workerConnection = ConnectToNode(nodeName, nodePort, nodeUser);
if (workerConnection == NULL)
{
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s",
nodeName, nodePort, nodeUser)));
}
/* start the transaction on the worker node */
queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
}
/* commit the transaction on the worker node */
queryResult = PQexec(workerConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
}
/*
* IsWorkerTransactionActive returns true if there exists any on going
* worker transactions.
*/
bool
IsWorkerTransactionActive(void)
{
bool isWorkerTransactionActive = false;
if (workerConnectionList != NIL)
{
isWorkerTransactionActive = true;
}
return isWorkerTransactionActive;
}
/*
* EnableXactCallback registers the CompleteWorkerTransactions function as the callback
* of the worker transactions.
*/
static void
EnableXactCallback(void)
{
if (!isXactCallbackRegistered)
{
RegisterXactCallback(CompleteWorkerTransactions, NULL);
isXactCallbackRegistered = true;
}
}
/*
* CompleteWorkerTransaction commits or aborts pending worker transactions
* when the local transaction commits or aborts.
*/
static void
CompleteWorkerTransactions(XactEvent event, void *arg)
{
if (workerConnectionList == NIL)
{
/* nothing to do */
return;
}
else if (event == XACT_EVENT_PRE_COMMIT)
{
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
/*
* Any failure here will cause local changes to be rolled back,
* and may leave a prepared transaction on the remote node.
*/
PrepareRemoteTransactions(workerConnectionList);
/*
* We are now ready to commit the local transaction, followed
* by the remote transaction. As a final step, write commit
* records to a table. If there is a last-minute crash
* on the local machine, then the absence of these records
* will indicate that the remote transactions should be rolled
* back. Otherwise, the presence of these records indicates
* that the remote transactions should be committed.
*/
LogPreparedTransactions(workerConnectionList);
}
return;
}
else if (event == XACT_EVENT_COMMIT)
{
/*
* A failure here may cause some prepared transactions to be
* left pending. However, the local change have already been
* committed and a commit record exists to indicate that the
* remote transaction should be committed as well.
*/
CommitRemoteTransactions(workerConnectionList, false);
/*
* At this point, it is safe to remove the transaction records
* for all commits that have succeeded. However, we are no
* longer in a transaction and therefore cannot make changes
* to the metadata.
*/
}
else if (event == XACT_EVENT_ABORT)
{
/*
* A failure here may cause some prepared transactions to be
* left pending. The local changes have already been rolled
* back and the absence of a commit record indicates that
* the remote transaction should be rolled back as well.
*/
AbortRemoteTransactions(workerConnectionList);
}
else if (event == XACT_EVENT_PREPARE || event == XACT_EVENT_PRE_PREPARE)
{
/*
* If we allow a prepare we might not get to the commit handler
* in this session. We could resolve that if we intercept
* COMMIT/ABORT PREPARED commands. For now, we just error out.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified "
"distributed tables")));
}
else
{
return;
}
CloseConnections(workerConnectionList);
/*
* Memory allocated in workerConnectionList will be reclaimed when
* TopTransactionContext is released.
*/
workerConnectionList = NIL;
}
/*
* OpenWorkerTransactions opens connections to all primary workers and sends
* BEGIN commands. The returned TransactionConnection's are allocated in the
* top transaction context, such that they can still be used in the commit
* handler. The connections are made as the extension owner, such that they
* have write access to the Citus metadata tables.
*/
static List *
OpenWorkerTransactions(void)
{
ListCell *workerNodeCell = NULL;
List *connectionList = NIL;
MemoryContext oldContext = NULL;
List *workerList = NIL;
/*
* A new node addition might be in progress which will invalidate the
* worker list. The following statement blocks until the node addition and
* metadata syncing finishes after which we reload the worker list.
* It also ensures that no new node addition and metadata synchronization
* will run until this transaction finishes.
*/
LockMetadataSnapshot(AccessShareLock);
workerList = WorkerNodeList();
oldContext = MemoryContextSwitchTo(TopTransactionContext);
foreach(workerNodeCell, workerList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeUser = CitusExtensionOwnerName();
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
PGconn *connection = NULL;
TransactionConnection *transactionConnection = NULL;
PGresult *result = NULL;
connection = ConnectToNode(nodeName, nodePort, nodeUser);
if (connection == NULL)
{
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s",
nodeName, nodePort, nodeUser)));
}
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
PQclear(result);
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerNode->groupId;
transactionConnection->connectionId = 0;
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
transactionConnection->connection = connection;
transactionConnection->nodeName = pstrdup(nodeName);
transactionConnection->nodePort = nodePort;
connectionList = lappend(connectionList, transactionConnection);
}
MemoryContextSwitchTo(oldContext);
return connectionList;
}
/*
* GetNodeTransactionConnection finds the opened connection for the specified
* node. Note that it opens transaction connections to all workers, by
* calling GetWorkerTransactions therefore, it is suggested to use this
* function in operations that sends commands to all workers inside a
* distributed transaction.
*
* GetNodeTransactionConnection returns NULL, if the node with the specified
* nodeName and nodePort is not found. Note that this worker may open
* connections to all workers if there were not open already.
*/
static TransactionConnection *
GetWorkerTransaction(char *nodeName, int32 nodePort)
{
List *connectionList = NIL;
ListCell *connectionCell = NULL;
TransactionConnection *workerTransaction = NULL;
connectionList = GetWorkerTransactions();
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
if (strcmp(transactionConnection->nodeName, nodeName) == 0 &&
transactionConnection->nodePort == nodePort)
{
workerTransaction = transactionConnection;
break;
}
}
return workerTransaction;
}
/*
* GetTargetWorkerTransactions returns a subset of all worker transactions
* matching the given target worker set.
*/
static List *
GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet)
{
List *allWorkerConnectionsList = GetWorkerTransactions();
List *targetConnectionList = NIL;
ListCell *connectionCell = NULL;
if (targetWorkerSet == WORKERS_WITH_METADATA)
{
foreach(connectionCell, allWorkerConnectionsList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
char *nodeName = pstrdup(transactionConnection->nodeName);
int nodePort = transactionConnection->nodePort;
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode->hasMetadata)
{
targetConnectionList = lappend(targetConnectionList,
transactionConnection);
}
}
}
else
{
targetConnectionList = allWorkerConnectionsList;
}
return targetConnectionList;
}
/*
* IsResponseOK checks the resultStatus and returns true if the status is OK.
*/
static bool
IsResponseOK(ExecStatusType resultStatus)
{
if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK)
{
return true;
}
return false;
}

View File

@ -60,3 +60,24 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void *
return sortedList;
}
/*
* PointerArrayFromList converts a list of pointers to an array of pointers.
*/
void **
PointerArrayFromList(List *pointerList)
{
int pointerCount = list_length(pointerList);
void **pointerArray = (void **) palloc0(pointerCount * sizeof(void *));
ListCell *pointerCell = NULL;
int pointerIndex = 0;
foreach(pointerCell, pointerList)
{
pointerArray[pointerIndex] = (void *) lfirst(pointerCell);
pointerIndex += 1;
}
return pointerArray;
}

View File

@ -62,6 +62,8 @@ static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid;
static Oid distShardPlacementPlacementidIndexId = InvalidOid;
static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid distTransactionRelationId = InvalidOid;
static Oid distTransactionGroupIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid;
/* Hash table for informations about each partition */
@ -762,6 +764,27 @@ DistShardPlacementPlacementidIndexId(void)
}
/* return oid of pg_dist_transaction relation */
Oid
DistTransactionRelationId(void)
{
CachedRelationLookup("pg_dist_transaction", &distTransactionRelationId);
return distTransactionRelationId;
}
/* return oid of pg_dist_transaction_group_index */
Oid
DistTransactionGroupIndexId(void)
{
CachedRelationLookup("pg_dist_transaction_group_index",
&distTransactionGroupIndexId);
return distTransactionGroupIndexId;
}
/* return oid of pg_dist_shard_placement_nodeid_index */
Oid
DistShardPlacementNodeidIndexId(void)
@ -857,6 +880,18 @@ CitusExtensionOwner(void)
}
/*
* CitusExtensionOwnerName returns the name of the owner of the extension.
*/
char *
CitusExtensionOwnerName(void)
{
Oid superUserId = CitusExtensionOwner();
return GetUserNameFromId(superUserId, false);
}
/* return the username of the currently active role */
char *
CurrentUserName(void)
@ -1382,6 +1417,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardRelationId = InvalidOid;
distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid;
distNodeRelationId = InvalidOid;
distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid;
@ -1389,7 +1425,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid;
distShardPlacementPlacementidIndexId = InvalidOid;
distNodeRelationId = InvalidOid;
distTransactionRelationId = InvalidOid;
distTransactionGroupIndexId = InvalidOid;
extraDataContainerFuncId = InvalidOid;
}
}

View File

@ -19,6 +19,7 @@
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
@ -149,3 +150,18 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode)
LockShardResource(shardId, lockMode);
}
}
/*
* LockMetadataSnapshot acquires a lock needed to serialize changes to pg_dist_node
* and all other metadata changes. Operations that modify pg_dist_node should acquire
* AccessExclusiveLock. All other metadata changes should acquire AccessShareLock. Any locks
* acquired using this method are released at transaction end.
*/
void
LockMetadataSnapshot(LOCKMODE lockMode)
{
Assert(lockMode == AccessExclusiveLock || lockMode == AccessShareLock);
(void) LockRelationOid(DistNodeRelationId(), lockMode);
}

View File

@ -42,9 +42,12 @@ typedef enum
*/
typedef struct TransactionConnection
{
int groupId;
int64 connectionId;
TransactionState transactionState;
PGconn *connection;
const char *nodeName;
int nodePort;
} TransactionConnection;
@ -57,5 +60,6 @@ extern void InitializeDistributedTransaction(void);
extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
extern StringInfo BuildTransactionName(int connectionId);
#endif /* COMMIT_PROTOCOL_H */

View File

@ -21,6 +21,7 @@
/* utility functions declaration shared within this module */
extern List * SortList(List *pointerList,
int (*ComparisonFunction)(const void *, const void *));
extern void ** PointerArrayFromList(List *pointerList);
#endif /* CITUS_LISTUTILS_H */

View File

@ -79,6 +79,8 @@ extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void);
extern Oid DistShardPlacementPlacementidIndexId(void);
extern Oid DistTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistShardPlacementNodeidIndexId(void);
/* function oids */
@ -86,5 +88,6 @@ extern Oid CitusExtraDataContainerFuncId(void);
/* user related functions */
extern Oid CitusExtensionOwner(void);
extern char * CitusExtensionOwnerName(void);
extern char * CurrentUserName(void);
#endif /* METADATA_CACHE_H */

View File

@ -0,0 +1,43 @@
/*-------------------------------------------------------------------------
*
* pg_dist_transaction.h
* definition of the "transaction" relation (pg_dist_transaction).
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_TRANSACTION_H
#define PG_DIST_TRANSACTION_H
/* ----------------
* pg_dist_transaction definition.
* ----------------
*/
typedef struct FormData_pg_dist_transaction
{
int32 groupid; /* id of the replication group */
text gid; /* global transaction identifier */
} FormData_pg_dist_transaction;
/* ----------------
* Form_pg_dist_transactions corresponds to a pointer to a tuple with
* the format of pg_dist_transactions relation.
* ----------------
*/
typedef FormData_pg_dist_transaction *Form_pg_dist_transaction;
/* ----------------
* compiler constants for pg_dist_transaction
* ----------------
*/
#define Natts_pg_dist_transaction 2
#define Anum_pg_dist_transaction_groupid 1
#define Anum_pg_dist_transaction_gid 2
#endif /* PG_DIST_TRANSACTION_H */

View File

@ -76,5 +76,6 @@ extern void LockJobResource(uint64 jobId, LOCKMODE lockmode);
extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
extern void LockShards(List *shardIntervalList, LOCKMODE lockMode);
extern void LockMetadataSnapshot(LOCKMODE lockMode);
#endif /* RESOURCE_LOCK_H */

View File

@ -0,0 +1,22 @@
/*-------------------------------------------------------------------------
*
* transaction_recovery.h
* Type and function declarations used in recovering 2PC transactions.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef TRANSACTION_RECOVERY_H
#define TRANSACTION_RECOVERY_H
#include "nodes/pg_list.h"
/* Functions declarations for worker transactions */
extern void LogPreparedTransactions(List *connectionList);
#endif /* TRANSACTION_RECOVERY_H */

View File

@ -0,0 +1,42 @@
/*-------------------------------------------------------------------------
*
* worker_transaction.h
* Type and function declarations used in performing transactions across
* workers.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_TRANSACTION_H
#define WORKER_TRANSACTION_H
#include "distributed/worker_manager.h"
/*
* TargetWorkerSet is used for determining the type of workers that a command
* is targeted to.
*/
typedef enum TargetWorkerSet
{
WORKERS_WITH_METADATA,
ALL_WORKERS
} TargetWorkerSet;
/* Functions declarations for worker transactions */
extern List * GetWorkerTransactions(void);
extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command);
extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command);
extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);
extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort,
char *nodeUser, List *commandList);
/* helper functions for worker transactions */
extern bool IsWorkerTransactionActive(void);
#endif /* WORKER_TRANSACTION_H */

View File

@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -0,0 +1,62 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests for prepared transaction recovery
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT * FROM pg_dist_transaction;
groupid | gid
---------+-----
(0 rows)
-- Create some "fake" prepared transactions to recover
\c - - - :worker_1_port
BEGIN;
CREATE TABLE should_abort (value int);
PREPARE TRANSACTION 'citus_0_should_abort';
BEGIN;
CREATE TABLE should_commit (value int);
PREPARE TRANSACTION 'citus_0_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_0_should_commit'
recover_prepared_transactions
-------------------------------
3
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- Confirm that transactions were correctly rolled forward
\c - - - :worker_1_port
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
count
-------
1
(1 row)

View File

@ -135,6 +135,7 @@ test: multi_repartition_udt
test: multi_repartitioned_subquery_udf
test: multi_modifying_xacts
test: multi_metadata_snapshot
test: multi_transaction_recovery
# ---------
# multi_copy creates hash and range-partitioned tables and performs COPY

View File

@ -40,6 +40,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -0,0 +1,37 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests for prepared transaction recovery
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
SELECT * FROM pg_dist_transaction;
-- Create some "fake" prepared transactions to recover
\c - - - :worker_1_port
BEGIN;
CREATE TABLE should_abort (value int);
PREPARE TRANSACTION 'citus_0_should_abort';
BEGIN;
CREATE TABLE should_commit (value int);
PREPARE TRANSACTION 'citus_0_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
-- Confirm that transactions were correctly rolled forward
\c - - - :worker_1_port
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';