diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 7a3c54bb3..b06e5a876 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -76,6 +76,8 @@ $(EXTENSION)--6.0-8.sql: $(EXTENSION)--6.0-7.sql $(EXTENSION)--6.0-7--6.0-8.sql cat $^ > $@ $(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql cat $^ > $@ +$(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-4--6.0-5.sql b/src/backend/distributed/citus--6.0-4--6.0-5.sql index 1b72ede65..fe3d9cf06 100644 --- a/src/backend/distributed/citus--6.0-4--6.0-5.sql +++ b/src/backend/distributed/citus--6.0-4--6.0-5.sql @@ -1,5 +1,5 @@ /* - * Replace oid column in pg_dist_shard_placement with an sequence column. + * Replace oid column in pg_dist_shard_placement with a sequence column. */ CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq NO CYCLE; diff --git a/src/backend/distributed/citus--6.0-8--6.0-9.sql b/src/backend/distributed/citus--6.0-8--6.0-9.sql index 78139bc77..b20f7947e 100644 --- a/src/backend/distributed/citus--6.0-8--6.0-9.sql +++ b/src/backend/distributed/citus--6.0-8--6.0-9.sql @@ -1,3 +1,5 @@ +/* citus--6.0-8--6.0-9.sql */ + CREATE TABLE citus.pg_dist_local_group( groupid int NOT NULL PRIMARY KEY) ; diff --git a/src/backend/distributed/citus--6.0-9--6.0-10.sql b/src/backend/distributed/citus--6.0-9--6.0-10.sql new file mode 100644 index 000000000..d3f127933 --- /dev/null +++ b/src/backend/distributed/citus--6.0-9--6.0-10.sql @@ -0,0 +1,24 @@ +/* citus--6.0-9--6.0-10.sql */ + +CREATE TABLE citus.pg_dist_transaction ( + groupid int NOT NULL, + gid text NOT NULL +); + +CREATE INDEX pg_dist_transaction_group_index +ON citus.pg_dist_transaction using btree(groupid); + +ALTER TABLE citus.pg_dist_transaction SET SCHEMA pg_catalog; +ALTER TABLE pg_catalog.pg_dist_transaction +ADD CONSTRAINT pg_dist_transaction_unique_constraint UNIQUE (groupid, gid); + +GRANT SELECT ON pg_catalog.pg_dist_transaction TO public; + +CREATE FUNCTION recover_prepared_transactions() + RETURNS int + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$recover_prepared_transactions$$; + +COMMENT ON FUNCTION recover_prepared_transactions() + IS 'recover prepared transactions started by this node'; + diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 6dcf270ae..da6f90cff 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-9' +default_version = '6.0-10' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 43299d922..b1ed3d5cd 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -956,6 +956,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); char *nodeName = placement->nodeName; int nodePort = placement->nodePort; + WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); + int workerGroupId = 0; char *nodeUser = CurrentUserName(); PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); @@ -963,6 +965,17 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections StringInfo copyCommand = NULL; PGresult *result = NULL; + /* + * When a copy is initiated from a worker, the information about the connected + * worker node may not be found if pg_dist_node entries are not synced to this + * node. In that case we leave the groupId as 0. Fortunately, it is unused since + * COPY from a worker does not initiate a 2PC. + */ + if (workerNode != NULL) + { + workerGroupId = workerNode->groupId; + } + if (connection == NULL) { if (stopOnFailure) @@ -1003,9 +1016,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections transactionConnection = palloc0(sizeof(TransactionConnection)); + transactionConnection->groupId = workerGroupId; transactionConnection->connectionId = shardConnections->shardId; transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED; transactionConnection->connection = connection; + transactionConnection->nodeName = nodeName; + transactionConnection->nodePort = nodePort; connectionList = lappend(connectionList, transactionConnection); } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 3ae2e8e10..30975e390 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -27,6 +27,7 @@ #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" #include "storage/lock.h" @@ -53,9 +54,6 @@ static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNod int32 sourceNodePort); static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static List * RecreateTableDDLCommandList(Oid relationId); -static void SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, - List *commandList); -static char * CitusExtensionOwnerName(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_copy_shard_placement); @@ -283,6 +281,7 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, List *colocatedShardList = ColocatedShardIntervalList(shardInterval); ListCell *colocatedShardCell = NULL; List *ddlCommandList = NIL; + char *nodeUser = CitusExtensionOwnerName(); foreach(colocatedShardCell, colocatedShardList) { @@ -297,7 +296,8 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, HOLD_INTERRUPTS(); - SendCommandListInSingleTransaction(targetNodeName, targetNodePort, ddlCommandList); + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, nodeUser, + ddlCommandList); foreach(colocatedShardCell, colocatedShardList) { @@ -443,87 +443,3 @@ RecreateTableDDLCommandList(Oid relationId) return recreateCommandList; } - - -/* - * SendCommandListInSingleTransaction opens connection to the node with the given - * nodeName and nodePort. Then, the connection starts a transaction on the remote - * node and executes the commands in the transaction. The function raises error if - * any of the queries fails. - * - * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. - */ -static void -SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, List *commandList) -{ - char *nodeUser = CitusExtensionOwnerName(); - PGconn *workerConnection = NULL; - PGresult *queryResult = NULL; - ListCell *commandCell = NULL; - - workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); - if (workerConnection == NULL) - { - ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", - nodeName, nodePort, nodeUser))); - } - - /* start the transaction on the worker node */ - queryResult = PQexec(workerConnection, "BEGIN"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* iterate over the commands and execute them in the same connection */ - foreach(commandCell, commandList) - { - char *commandString = lfirst(commandCell); - ExecStatusType resultStatus = PGRES_EMPTY_QUERY; - - queryResult = PQexec(workerConnection, commandString); - resultStatus = PQresultStatus(queryResult); - if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || - resultStatus == PGRES_COMMAND_OK)) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - } - - /* commit the transaction on the worker node */ - queryResult = PQexec(workerConnection, "COMMIT"); - if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) - { - ReraiseRemoteError(workerConnection, queryResult); - } - - PQclear(queryResult); - - /* clear NULL result */ - PQgetResult(workerConnection); - - /* we no longer need this connection */ - PQfinish(workerConnection); -} - - -/* - * CitusExtensionOwnerName returns the name of the owner of the extension. - * - * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. - */ -static char * -CitusExtensionOwnerName(void) -{ - Oid superUserId = CitusExtensionOwner(); - -#if (PG_VERSION_NUM < 90500) - return GetUserNameFromId(superUserId); -#else - return GetUserNameFromId(superUserId, false); -#endif -} diff --git a/src/backend/distributed/transaction/commit_protocol.c b/src/backend/distributed/transaction/commit_protocol.c index 728f672f4..1c5ac7417 100644 --- a/src/backend/distributed/transaction/commit_protocol.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -26,10 +26,6 @@ static uint32 DistributedTransactionId = 0; -/* Local functions forward declarations */ -static StringInfo BuildTransactionName(int connectionId); - - /* the commit protocol to use for COPY commands */ int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; @@ -252,7 +248,7 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure) * transaction, which causes it to be rolled back. In general, the user * should ensure that prepared transactions do not linger. */ -static StringInfo +StringInfo BuildTransactionName(int connectionId) { StringInfo commandString = makeStringInfo(); diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 712beaa5e..09d251c0e 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -17,6 +17,7 @@ #include "distributed/connection_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/worker_manager.h" #include "nodes/pg_list.h" #include "storage/ipc.h" #include "utils/memutils.h" @@ -126,8 +127,16 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell); PGconn *connection = NULL; TransactionConnection *transactionConnection = NULL; + WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName, + shardPlacement->nodePort); PGresult *result = NULL; + if (workerNode == NULL) + { + ereport(ERROR, (errmsg("could not find worker node %s:%d", + shardPlacement->nodeName, shardPlacement->nodePort))); + } + connection = ConnectToNode(shardPlacement->nodeName, shardPlacement->nodePort, userName); @@ -142,9 +151,12 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) transactionConnection = palloc0(sizeof(TransactionConnection)); + transactionConnection->groupId = workerNode->groupId; transactionConnection->connectionId = shardConnections->shardId; transactionConnection->transactionState = TRANSACTION_STATE_INVALID; transactionConnection->connection = connection; + transactionConnection->nodeName = shardPlacement->nodeName; + transactionConnection->nodePort = shardPlacement->nodePort; shardConnections->connectionList = lappend(shardConnections->connectionList, transactionConnection); diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c new file mode 100644 index 000000000..9c59d57af --- /dev/null +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -0,0 +1,545 @@ +/*------------------------------------------------------------------------- + * + * transaction_recovery.c + * + * Routines for recovering two-phase commits started by this node if a + * failure occurs between prepare and commit/abort. + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include +#include + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/relscan.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/pg_dist_transaction.h" +#include "distributed/transaction_recovery.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" +#include "lib/stringinfo.h" +#include "storage/lmgr.h" +#include "storage/lock.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/rel.h" + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(recover_prepared_transactions); + + +/* Local functions forward declarations */ +static void LogTransactionRecord(int groupId, char *transactionName); +static int RecoverPreparedTransactions(void); +static int RecoverWorkerTransactions(WorkerNode *workerNode); +static List * NameListDifference(List *nameList, List *subtractList); +static int CompareNames(const void *leftPointer, const void *rightPointer); +static bool FindMatchingName(char **nameArray, int nameCount, char *needle, + int *matchIndex); +static List * PendingWorkerTransactionList(PGconn *connection); +static List * UnconfirmedWorkerTransactionsList(int groupId); +static void DeleteTransactionRecord(int32 groupId, char *transactionName); + + +/* + * recover_prepared_transactions recovers any pending prepared + * transactions started by this node on other nodes. + */ +Datum +recover_prepared_transactions(PG_FUNCTION_ARGS) +{ + int recoveredTransactionCount = 0; + + recoveredTransactionCount = RecoverPreparedTransactions(); + + PG_RETURN_INT32(recoveredTransactionCount); +} + + +/* + * LogPreparedTransactions logs a commit record for all prepared transactions + * on connections in connectionList. The remote transaction is safe to commit + * once the record has been durably stored (i.e. the local transaction is + * committed). + */ +void +LogPreparedTransactions(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + + char transactionState PG_USED_FOR_ASSERTS_ONLY = + transactionConnection->transactionState; + int groupId = transactionConnection->groupId; + int64 connectionId = transactionConnection->connectionId; + StringInfo transactionName = BuildTransactionName(connectionId); + + Assert(transactionState == TRANSACTION_STATE_PREPARED); + + LogTransactionRecord(groupId, transactionName->data); + } +} + + +/* + * LogTransactionRecord registers the fact that a transaction has been + * prepared on a worker. The presence of this record indicates that the + * prepared transaction should be committed. + */ +static void +LogTransactionRecord(int groupId, char *transactionName) +{ + Relation pgDistTransaction = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_transaction]; + bool isNulls[Natts_pg_dist_transaction]; + + /* form new transaction tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); + values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + + /* open transaction relation and insert new tuple */ + pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistTransaction); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistTransaction, heapTuple); + CatalogUpdateIndexes(pgDistTransaction, heapTuple); + CommandCounterIncrement(); + + /* close relation and invalidate previous cache entry */ + heap_close(pgDistTransaction, RowExclusiveLock); +} + + +/* + * RecoverPreparedTransactions recovers any pending prepared + * transactions started by this node on other nodes. + */ +static int +RecoverPreparedTransactions(void) +{ + List *workerList = NIL; + ListCell *workerNodeCell = NULL; + int recoveredTransactionCount = 0; + + /* + * We block here if metadata transactions are ongoing, since we + * mustn't commit/abort their prepared transactions under their + * feet. We also prevent concurrent recovery. + */ + LockRelationOid(DistTransactionRelationId(), ExclusiveLock); + + workerList = WorkerNodeList(); + + foreach(workerNodeCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + + recoveredTransactionCount += RecoverWorkerTransactions(workerNode); + } + + return recoveredTransactionCount; +} + + +/* + * RecoverWorkerTransactions recovers any pending prepared transactions + * started by this node on the specified worker. + */ +static int +RecoverWorkerTransactions(WorkerNode *workerNode) +{ + int recoveredTransactionCount = 0; + + int groupId = workerNode->groupId; + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + List *pendingTransactionList = NIL; + ListCell *pendingTransactionCell = NULL; + + List *unconfirmedTransactionList = NIL; + char **unconfirmedTransactionArray = NULL; + int unconfirmedTransactionCount = 0; + int unconfirmedTransactionIndex = 0; + + List *committedTransactionList = NIL; + ListCell *committedTransactionCell = NULL; + + MemoryContext localContext = NULL; + MemoryContext oldContext = NULL; + + PGconn *connection = GetOrEstablishConnection(nodeName, nodePort); + if (connection == NULL) + { + /* cannot recover transactions on this worker right now */ + return 0; + } + + localContext = AllocSetContextCreate(CurrentMemoryContext, + "RecoverWorkerTransactions", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldContext = MemoryContextSwitchTo(localContext); + + /* find transactions that were committed, but not yet confirmed */ + unconfirmedTransactionList = UnconfirmedWorkerTransactionsList(groupId); + unconfirmedTransactionList = SortList(unconfirmedTransactionList, CompareNames); + + /* convert list to an array to use with FindMatchingNames */ + unconfirmedTransactionCount = list_length(unconfirmedTransactionList); + unconfirmedTransactionArray = + (char **) PointerArrayFromList(unconfirmedTransactionList); + + /* find stale prepared transactions on the remote node */ + pendingTransactionList = PendingWorkerTransactionList(connection); + pendingTransactionList = SortList(pendingTransactionList, CompareNames); + + /* + * Transactions that have no pending prepared transaction are assumed to + * have been committed. Any records in unconfirmedTransactionList that + * don't have a transaction in pendingTransactionList can be removed. + */ + committedTransactionList = NameListDifference(unconfirmedTransactionList, + pendingTransactionList); + + /* + * For each pending prepared transaction, check whether there is a transaction + * record. If so, commit. If not, the transaction that started the transaction + * must have rolled back and thus the prepared transaction should be aborted. + */ + foreach(pendingTransactionCell, pendingTransactionList) + { + char *transactionName = (char *) lfirst(pendingTransactionCell); + StringInfo command = makeStringInfo(); + PGresult *result = NULL; + + bool shouldCommit = FindMatchingName(unconfirmedTransactionArray, + unconfirmedTransactionCount, + transactionName, + &unconfirmedTransactionIndex); + + if (shouldCommit) + { + /* should have committed this prepared transaction */ + appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName); + } + else + { + /* no record of this prepared transaction, abort */ + appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName); + } + + result = PQexec(connection, command->data); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + WarnRemoteError(connection, result); + PQclear(result); + + /* cannot recover this transaction right now */ + continue; + } + + PQclear(result); + + ereport(NOTICE, (errmsg("recovered a prepared transaction on %s:%d", + nodeName, nodePort), + errcontext("%s", command->data))); + + if (shouldCommit) + { + committedTransactionList = lappend(committedTransactionList, + transactionName); + } + + recoveredTransactionCount += 1; + } + + /* we can remove the transaction records of confirmed transactions */ + foreach(committedTransactionCell, committedTransactionList) + { + char *transactionName = (char *) lfirst(committedTransactionCell); + + DeleteTransactionRecord(groupId, transactionName); + } + + MemoryContextReset(localContext); + MemoryContextSwitchTo(oldContext); + + return recoveredTransactionCount; +} + + +/* + * NameListDifference returns the difference between the bag of + * names in nameList and subtractList. Both are assumed to be + * sorted. We cannot use list_difference_ptr here since we need + * to compare the actual strings. + */ +static List * +NameListDifference(List *nameList, List *subtractList) +{ + List *differenceList = NIL; + ListCell *nameCell = NULL; + + int subtractIndex = 0; + int subtractCount = list_length(subtractList); + char **subtractArray = (char **) PointerArrayFromList(subtractList); + + foreach(nameCell, nameList) + { + char *baseName = (char *) lfirst(nameCell); + + bool nameFound = FindMatchingName(subtractArray, subtractCount, + baseName, &subtractIndex); + + if (!nameFound) + { + /* + * baseName is not in subtractArray and thus included + * in the difference. + */ + differenceList = lappend(differenceList, baseName); + } + } + + pfree(subtractArray); + + return differenceList; +} + + +/* + * CompareNames compares names using strncmp. Its signature allows it to + * be used in qsort. + */ +static int +CompareNames(const void *leftPointer, const void *rightPointer) +{ + const char *leftString = *((char **) leftPointer); + const char *rightString = *((char **) rightPointer); + + int nameCompare = strncmp(leftString, rightString, NAMEDATALEN); + + return nameCompare; +} + + +/* + * FindMatchingName searches for name in nameArray, starting at the + * value pointed to by matchIndex and stopping at the first index of + * name which is greater or equal to needle. nameArray is assumed + * to be sorted. + * + * The function sets matchIndex to the index of the name and returns + * true if the name is equal to needle. If matchIndex >= nameCount, + * then the function always returns false. + */ +static bool +FindMatchingName(char **nameArray, int nameCount, char *needle, + int *matchIndex) +{ + bool foundMatchingName = false; + int searchIndex = *matchIndex; + int compareResult = -1; + + while (searchIndex < nameCount) + { + char *testName = nameArray[searchIndex]; + compareResult = strncmp(needle, testName, NAMEDATALEN); + + if (compareResult <= 0) + { + break; + } + + searchIndex++; + } + + *matchIndex = searchIndex; + + if (compareResult == 0) + { + foundMatchingName = true; + } + + return foundMatchingName; +} + + +/* + * PendingWorkerTransactionList returns a list of pending prepared + * transactions on a remote node that were started by this node. + */ +static List * +PendingWorkerTransactionList(PGconn *connection) +{ + StringInfo command = makeStringInfo(); + PGresult *result = NULL; + int rowCount = 0; + int rowIndex = 0; + List *transactionNames = NIL; + int coordinatorId = 0; + + appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " + "WHERE gid LIKE 'citus_%d_%%'", + coordinatorId); + + result = PQexec(connection, command->data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + { + ReraiseRemoteError(connection, result); + } + + rowCount = PQntuples(result); + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + const int columnIndex = 0; + char *transactionName = PQgetvalue(result, rowIndex, columnIndex); + + transactionNames = lappend(transactionNames, pstrdup(transactionName)); + } + + PQclear(result); + + return transactionNames; +} + + +/* + * UnconfirmedWorkerTransactionList returns a list of unconfirmed transactions + * for a group of workers from pg_dist_transaction. A transaction is confirmed + * once we have verified that it does not exist in pg_prepared_xacts on the + * remote node and the entry in pg_dist_transaction is removed. + */ +static List * +UnconfirmedWorkerTransactionsList(int groupId) +{ + List *transactionNameList = NIL; + Relation pgDistTransaction = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + + pgDistTransaction = heap_open(DistTransactionRelationId(), AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + + scanDescriptor = systable_beginscan(pgDistTransaction, + DistTransactionGroupIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); + bool isNull = false; + + Datum transactionNameDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_gid, + tupleDescriptor, &isNull); + + char *transactionName = TextDatumGetCString(transactionNameDatum); + transactionNameList = lappend(transactionNameList, transactionName); + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistTransaction, AccessShareLock); + + return transactionNameList; +} + + +/* + * DeleteTransactionRecord opens the pg_dist_transaction system catalog, finds the + * first (unique) row that corresponds to the given transactionName and worker node, + * and deletes this row. + */ +static void +DeleteTransactionRecord(int32 groupId, char *transactionName) +{ + Relation pgDistTransaction = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + bool heapTupleFound = false; + + pgDistTransaction = heap_open(DistTransactionRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + + scanDescriptor = systable_beginscan(pgDistTransaction, + DistTransactionGroupIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); + bool isNull = false; + + Datum gidDatum = heap_getattr(heapTuple, + Anum_pg_dist_transaction_gid, + tupleDescriptor, &isNull); + + char *gid = TextDatumGetCString(gidDatum); + + if (strncmp(transactionName, gid, NAMEDATALEN) == 0) + { + heapTupleFound = true; + break; + } + + heapTuple = systable_getnext(scanDescriptor); + } + + /* if we couldn't find the transaction record to delete, error out */ + if (!heapTupleFound) + { + ereport(ERROR, (errmsg("could not find valid entry for transaction record " + "'%s' in group %d", + transactionName, groupId))); + } + + simple_heap_delete(pgDistTransaction, &heapTuple->t_self); + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistTransaction, RowExclusiveLock); +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c new file mode 100644 index 000000000..69e3e169f --- /dev/null +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -0,0 +1,518 @@ +/*------------------------------------------------------------------------- + * + * worker_transaction.c + * + * Routines for performing transactions across all workers. + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include +#include + +#include "access/xact.h" +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_shard_transaction.h" +#include "distributed/resource_lock.h" +#include "distributed/pg_dist_node.h" +#include "distributed/pg_dist_transaction.h" +#include "distributed/transaction_recovery.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" +#include "utils/memutils.h" + + +/* Local functions forward declarations */ +static void EnableXactCallback(void); +static void CompleteWorkerTransactions(XactEvent event, void *arg); +static List * OpenWorkerTransactions(void); +static TransactionConnection * GetWorkerTransaction(char *nodeName, int32 nodePort); +static List * GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet); +static bool IsResponseOK(ExecStatusType resultStatus); + + +/* Global worker connection list */ +static List *workerConnectionList = NIL; +static bool isXactCallbackRegistered = false; + + +/* + * GetWorkerTransactions opens connections to all workers and starts + * a transaction block that is committed or aborted when the local + * transaction commits or aborts. Multiple invocations of + * GetWorkerTransactions within the same transaction will return + * the same list of connections. + */ +List * +GetWorkerTransactions(void) +{ + if (workerConnectionList == NIL) + { + InitializeDistributedTransaction(); + EnableXactCallback(); + + workerConnectionList = OpenWorkerTransactions(); + } + + /* ensure that number of workers has not change */ + Assert(list_length(WorkerNodeList()) == list_length(workerConnectionList)); + + return workerConnectionList; +} + + +/* + * SendCommandToWorker sends a command to a particular worker as part of the + * 2PC. + */ +void +SendCommandToWorker(char *nodeName, int32 nodePort, char *command) +{ + TransactionConnection *transactionConnection = NULL; + PGresult *queryResult = NULL; + ExecStatusType resultStatus = PGRES_EMPTY_QUERY; + + transactionConnection = GetWorkerTransaction(nodeName, nodePort); + if (transactionConnection == NULL) + { + ereport(ERROR, (errmsg("worker %s:%d is not part of current transaction", + nodeName, nodePort))); + } + + queryResult = PQexec(transactionConnection->connection, command); + resultStatus = PQresultStatus(queryResult); + if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK) + { + ReraiseRemoteError(transactionConnection->connection, queryResult); + } + + PQclear(queryResult); +} + + +/* + * SendCommandToWorkers sends a command to all workers in + * parallel. Commands are committed on the workers when the local + * transaction commits. The connection are made as the extension + * owner to ensure write access to the Citus metadata tables. + */ +void +SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) +{ + SendCommandToWorkersParams(targetWorkerSet, command, 0, NULL, NULL); +} + + +/* + * SendCommandToWorkersParams sends a command to all workers in parallel. + * Commands are committed on the workers when the local transaction commits. The + * connection are made as the extension owner to ensure write access to the Citus + * metadata tables. Parameters can be specified as for PQexecParams, except that + * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0 + * respectively. + */ +void +SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, + int parameterCount, const Oid *parameterTypes, + const char *const *parameterValues) +{ + ListCell *connectionCell = NULL; + List *targetConnectionList = GetTargetWorkerTransactions(targetWorkerSet); + + foreach(connectionCell, targetConnectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + + PGconn *connection = transactionConnection->connection; + + int querySent = PQsendQueryParams(connection, command, parameterCount, + parameterTypes, parameterValues, NULL, NULL, 0); + if (querySent == 0) + { + ReraiseRemoteError(connection, NULL); + } + } + + foreach(connectionCell, targetConnectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + + PGconn *connection = transactionConnection->connection; + PGresult *result = PQgetResult(connection); + ExecStatusType resultStatus = PQresultStatus(result); + + if (!IsResponseOK(resultStatus)) + { + ReraiseRemoteError(connection, result); + } + + PQclear(result); + + /* clear NULL result */ + PQgetResult(connection); + } +} + + +/* + * SendCommandListInSingleTransaction opens connection to the node with the given + * nodeName and nodePort. Then, the connection starts a transaction on the remote + * node and executes the commands in the transaction. The function raises error if + * any of the queries fails. + */ +void +SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char *nodeUser, + List *commandList) +{ + PGconn *workerConnection = NULL; + PGresult *queryResult = NULL; + ListCell *commandCell = NULL; + + workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); + if (workerConnection == NULL) + { + ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", + nodeName, nodePort, nodeUser))); + } + + /* start the transaction on the worker node */ + queryResult = PQexec(workerConnection, "BEGIN"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) + { + char *commandString = lfirst(commandCell); + ExecStatusType resultStatus = PGRES_EMPTY_QUERY; + + queryResult = PQexec(workerConnection, commandString); + resultStatus = PQresultStatus(queryResult); + if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK)) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + } + + /* commit the transaction on the worker node */ + queryResult = PQexec(workerConnection, "COMMIT"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + + /* clear NULL result */ + PQgetResult(workerConnection); + + /* we no longer need this connection */ + PQfinish(workerConnection); +} + + +/* + * IsWorkerTransactionActive returns true if there exists any on going + * worker transactions. + */ +bool +IsWorkerTransactionActive(void) +{ + bool isWorkerTransactionActive = false; + + if (workerConnectionList != NIL) + { + isWorkerTransactionActive = true; + } + + return isWorkerTransactionActive; +} + + +/* + * EnableXactCallback registers the CompleteWorkerTransactions function as the callback + * of the worker transactions. + */ +static void +EnableXactCallback(void) +{ + if (!isXactCallbackRegistered) + { + RegisterXactCallback(CompleteWorkerTransactions, NULL); + isXactCallbackRegistered = true; + } +} + + +/* + * CompleteWorkerTransaction commits or aborts pending worker transactions + * when the local transaction commits or aborts. + */ +static void +CompleteWorkerTransactions(XactEvent event, void *arg) +{ + if (workerConnectionList == NIL) + { + /* nothing to do */ + return; + } + else if (event == XACT_EVENT_PRE_COMMIT) + { + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + /* + * Any failure here will cause local changes to be rolled back, + * and may leave a prepared transaction on the remote node. + */ + + PrepareRemoteTransactions(workerConnectionList); + + /* + * We are now ready to commit the local transaction, followed + * by the remote transaction. As a final step, write commit + * records to a table. If there is a last-minute crash + * on the local machine, then the absence of these records + * will indicate that the remote transactions should be rolled + * back. Otherwise, the presence of these records indicates + * that the remote transactions should be committed. + */ + + LogPreparedTransactions(workerConnectionList); + } + + return; + } + else if (event == XACT_EVENT_COMMIT) + { + /* + * A failure here may cause some prepared transactions to be + * left pending. However, the local change have already been + * committed and a commit record exists to indicate that the + * remote transaction should be committed as well. + */ + + CommitRemoteTransactions(workerConnectionList, false); + + /* + * At this point, it is safe to remove the transaction records + * for all commits that have succeeded. However, we are no + * longer in a transaction and therefore cannot make changes + * to the metadata. + */ + } + else if (event == XACT_EVENT_ABORT) + { + /* + * A failure here may cause some prepared transactions to be + * left pending. The local changes have already been rolled + * back and the absence of a commit record indicates that + * the remote transaction should be rolled back as well. + */ + + AbortRemoteTransactions(workerConnectionList); + } + else if (event == XACT_EVENT_PREPARE || event == XACT_EVENT_PRE_PREPARE) + { + /* + * If we allow a prepare we might not get to the commit handler + * in this session. We could resolve that if we intercept + * COMMIT/ABORT PREPARED commands. For now, we just error out. + */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified " + "distributed tables"))); + } + else + { + return; + } + + CloseConnections(workerConnectionList); + + /* + * Memory allocated in workerConnectionList will be reclaimed when + * TopTransactionContext is released. + */ + + workerConnectionList = NIL; +} + + +/* + * OpenWorkerTransactions opens connections to all primary workers and sends + * BEGIN commands. The returned TransactionConnection's are allocated in the + * top transaction context, such that they can still be used in the commit + * handler. The connections are made as the extension owner, such that they + * have write access to the Citus metadata tables. + */ +static List * +OpenWorkerTransactions(void) +{ + ListCell *workerNodeCell = NULL; + List *connectionList = NIL; + MemoryContext oldContext = NULL; + List *workerList = NIL; + + /* + * A new node addition might be in progress which will invalidate the + * worker list. The following statement blocks until the node addition and + * metadata syncing finishes after which we reload the worker list. + * It also ensures that no new node addition and metadata synchronization + * will run until this transaction finishes. + */ + LockMetadataSnapshot(AccessShareLock); + + workerList = WorkerNodeList(); + + oldContext = MemoryContextSwitchTo(TopTransactionContext); + + foreach(workerNodeCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeUser = CitusExtensionOwnerName(); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + PGconn *connection = NULL; + + TransactionConnection *transactionConnection = NULL; + PGresult *result = NULL; + + connection = ConnectToNode(nodeName, nodePort, nodeUser); + if (connection == NULL) + { + ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", + nodeName, nodePort, nodeUser))); + } + + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(connection, result); + } + + PQclear(result); + + transactionConnection = palloc0(sizeof(TransactionConnection)); + + transactionConnection->groupId = workerNode->groupId; + transactionConnection->connectionId = 0; + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + transactionConnection->connection = connection; + transactionConnection->nodeName = pstrdup(nodeName); + transactionConnection->nodePort = nodePort; + + connectionList = lappend(connectionList, transactionConnection); + } + + MemoryContextSwitchTo(oldContext); + + return connectionList; +} + + +/* + * GetNodeTransactionConnection finds the opened connection for the specified + * node. Note that it opens transaction connections to all workers, by + * calling GetWorkerTransactions therefore, it is suggested to use this + * function in operations that sends commands to all workers inside a + * distributed transaction. + * + * GetNodeTransactionConnection returns NULL, if the node with the specified + * nodeName and nodePort is not found. Note that this worker may open + * connections to all workers if there were not open already. + */ +static TransactionConnection * +GetWorkerTransaction(char *nodeName, int32 nodePort) +{ + List *connectionList = NIL; + ListCell *connectionCell = NULL; + TransactionConnection *workerTransaction = NULL; + + connectionList = GetWorkerTransactions(); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + + if (strcmp(transactionConnection->nodeName, nodeName) == 0 && + transactionConnection->nodePort == nodePort) + { + workerTransaction = transactionConnection; + break; + } + } + + return workerTransaction; +} + + +/* + * GetTargetWorkerTransactions returns a subset of all worker transactions + * matching the given target worker set. + */ +static List * +GetTargetWorkerTransactions(TargetWorkerSet targetWorkerSet) +{ + List *allWorkerConnectionsList = GetWorkerTransactions(); + List *targetConnectionList = NIL; + ListCell *connectionCell = NULL; + + if (targetWorkerSet == WORKERS_WITH_METADATA) + { + foreach(connectionCell, allWorkerConnectionsList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + char *nodeName = pstrdup(transactionConnection->nodeName); + int nodePort = transactionConnection->nodePort; + WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); + + if (workerNode->hasMetadata) + { + targetConnectionList = lappend(targetConnectionList, + transactionConnection); + } + } + } + else + { + targetConnectionList = allWorkerConnectionsList; + } + + return targetConnectionList; +} + + +/* + * IsResponseOK checks the resultStatus and returns true if the status is OK. + */ +static bool +IsResponseOK(ExecStatusType resultStatus) +{ + if (resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK) + { + return true; + } + + return false; +} diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 256b3a03a..49528a28b 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -60,3 +60,24 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void * return sortedList; } + + +/* + * PointerArrayFromList converts a list of pointers to an array of pointers. + */ +void ** +PointerArrayFromList(List *pointerList) +{ + int pointerCount = list_length(pointerList); + void **pointerArray = (void **) palloc0(pointerCount * sizeof(void *)); + ListCell *pointerCell = NULL; + int pointerIndex = 0; + + foreach(pointerCell, pointerList) + { + pointerArray[pointerIndex] = (void *) lfirst(pointerCell); + pointerIndex += 1; + } + + return pointerArray; +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index a2b62bb34..ac504cab7 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -62,6 +62,8 @@ static Oid distShardShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid; static Oid distShardPlacementPlacementidIndexId = InvalidOid; static Oid distShardPlacementNodeidIndexId = InvalidOid; +static Oid distTransactionRelationId = InvalidOid; +static Oid distTransactionGroupIndexId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid; /* Hash table for informations about each partition */ @@ -762,6 +764,27 @@ DistShardPlacementPlacementidIndexId(void) } +/* return oid of pg_dist_transaction relation */ +Oid +DistTransactionRelationId(void) +{ + CachedRelationLookup("pg_dist_transaction", &distTransactionRelationId); + + return distTransactionRelationId; +} + + +/* return oid of pg_dist_transaction_group_index */ +Oid +DistTransactionGroupIndexId(void) +{ + CachedRelationLookup("pg_dist_transaction_group_index", + &distTransactionGroupIndexId); + + return distTransactionGroupIndexId; +} + + /* return oid of pg_dist_shard_placement_nodeid_index */ Oid DistShardPlacementNodeidIndexId(void) @@ -857,6 +880,18 @@ CitusExtensionOwner(void) } +/* + * CitusExtensionOwnerName returns the name of the owner of the extension. + */ +char * +CitusExtensionOwnerName(void) +{ + Oid superUserId = CitusExtensionOwner(); + + return GetUserNameFromId(superUserId, false); +} + + /* return the username of the currently active role */ char * CurrentUserName(void) @@ -1382,6 +1417,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid; distLocalGroupRelationId = InvalidOid; + distNodeRelationId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; @@ -1389,7 +1425,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; distShardPlacementPlacementidIndexId = InvalidOid; - distNodeRelationId = InvalidOid; + distTransactionRelationId = InvalidOid; + distTransactionGroupIndexId = InvalidOid; extraDataContainerFuncId = InvalidOid; } } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index ba748e890..f9048ccf9 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -19,6 +19,7 @@ #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -149,3 +150,18 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode) LockShardResource(shardId, lockMode); } } + + +/* + * LockMetadataSnapshot acquires a lock needed to serialize changes to pg_dist_node + * and all other metadata changes. Operations that modify pg_dist_node should acquire + * AccessExclusiveLock. All other metadata changes should acquire AccessShareLock. Any locks + * acquired using this method are released at transaction end. + */ +void +LockMetadataSnapshot(LOCKMODE lockMode) +{ + Assert(lockMode == AccessExclusiveLock || lockMode == AccessShareLock); + + (void) LockRelationOid(DistNodeRelationId(), lockMode); +} diff --git a/src/include/distributed/commit_protocol.h b/src/include/distributed/commit_protocol.h index df590bd48..140b0ade0 100644 --- a/src/include/distributed/commit_protocol.h +++ b/src/include/distributed/commit_protocol.h @@ -42,9 +42,12 @@ typedef enum */ typedef struct TransactionConnection { + int groupId; int64 connectionId; TransactionState transactionState; PGconn *connection; + const char *nodeName; + int nodePort; } TransactionConnection; @@ -57,5 +60,6 @@ extern void InitializeDistributedTransaction(void); extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); +extern StringInfo BuildTransactionName(int connectionId); #endif /* COMMIT_PROTOCOL_H */ diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index 5330295cf..66b433fed 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -21,6 +21,7 @@ /* utility functions declaration shared within this module */ extern List * SortList(List *pointerList, int (*ComparisonFunction)(const void *, const void *)); +extern void ** PointerArrayFromList(List *pointerList); #endif /* CITUS_LISTUTILS_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 7a32854f1..35d7e835f 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -79,6 +79,8 @@ extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void); extern Oid DistShardPlacementPlacementidIndexId(void); +extern Oid DistTransactionRelationId(void); +extern Oid DistTransactionGroupIndexId(void); extern Oid DistShardPlacementNodeidIndexId(void); /* function oids */ @@ -86,5 +88,6 @@ extern Oid CitusExtraDataContainerFuncId(void); /* user related functions */ extern Oid CitusExtensionOwner(void); +extern char * CitusExtensionOwnerName(void); extern char * CurrentUserName(void); #endif /* METADATA_CACHE_H */ diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h new file mode 100644 index 000000000..9eeff8450 --- /dev/null +++ b/src/include/distributed/pg_dist_transaction.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_transaction.h + * definition of the "transaction" relation (pg_dist_transaction). + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_TRANSACTION_H +#define PG_DIST_TRANSACTION_H + + +/* ---------------- + * pg_dist_transaction definition. + * ---------------- + */ +typedef struct FormData_pg_dist_transaction +{ + int32 groupid; /* id of the replication group */ + text gid; /* global transaction identifier */ +} FormData_pg_dist_transaction; + + +/* ---------------- + * Form_pg_dist_transactions corresponds to a pointer to a tuple with + * the format of pg_dist_transactions relation. + * ---------------- + */ +typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; + + +/* ---------------- + * compiler constants for pg_dist_transaction + * ---------------- + */ +#define Natts_pg_dist_transaction 2 +#define Anum_pg_dist_transaction_groupid 1 +#define Anum_pg_dist_transaction_gid 2 + + +#endif /* PG_DIST_TRANSACTION_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 92ae6b7fc..17319a079 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -76,5 +76,6 @@ extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); extern void LockShards(List *shardIntervalList, LOCKMODE lockMode); +extern void LockMetadataSnapshot(LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */ diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h new file mode 100644 index 000000000..82c7d0dde --- /dev/null +++ b/src/include/distributed/transaction_recovery.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * transaction_recovery.h + * Type and function declarations used in recovering 2PC transactions. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef TRANSACTION_RECOVERY_H +#define TRANSACTION_RECOVERY_H + + +#include "nodes/pg_list.h" + + +/* Functions declarations for worker transactions */ +extern void LogPreparedTransactions(List *connectionList); + + +#endif /* TRANSACTION_RECOVERY_H */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h new file mode 100644 index 000000000..9fa28c335 --- /dev/null +++ b/src/include/distributed/worker_transaction.h @@ -0,0 +1,42 @@ +/*------------------------------------------------------------------------- + * + * worker_transaction.h + * Type and function declarations used in performing transactions across + * workers. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef WORKER_TRANSACTION_H +#define WORKER_TRANSACTION_H + +#include "distributed/worker_manager.h" + + +/* + * TargetWorkerSet is used for determining the type of workers that a command + * is targeted to. + */ +typedef enum TargetWorkerSet +{ + WORKERS_WITH_METADATA, + ALL_WORKERS +} TargetWorkerSet; + + +/* Functions declarations for worker transactions */ +extern List * GetWorkerTransactions(void); +extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command); +extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command); +extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, + int parameterCount, const Oid *parameterTypes, + const char *const *parameterValues); +extern void SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, + char *nodeUser, List *commandList); + +/* helper functions for worker transactions */ +extern bool IsWorkerTransactionActive(void); + +#endif /* WORKER_TRANSACTION_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 0ebe5275f..7ff35d2ee 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; +ALTER EXTENSION citus UPDATE TO '6.0-10'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out new file mode 100644 index 000000000..a0bc331bf --- /dev/null +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -0,0 +1,62 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; +-- Tests for prepared transaction recovery +-- Ensure pg_dist_transaction is empty for test +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT * FROM pg_dist_transaction; + groupid | gid +---------+----- +(0 rows) + +-- Create some "fake" prepared transactions to recover +\c - - - :worker_1_port +BEGIN; +CREATE TABLE should_abort (value int); +PREPARE TRANSACTION 'citus_0_should_abort'; +BEGIN; +CREATE TABLE should_commit (value int); +PREPARE TRANSACTION 'citus_0_should_commit'; +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; +\c - - - :master_port +-- Add "fake" pg_dist_transaction records and run recovery +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); +SELECT recover_prepared_transactions(); +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: ROLLBACK PREPARED 'citus_0_should_be_sorted_into_middle' +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: COMMIT PREPARED 'citus_0_should_commit' + recover_prepared_transactions +------------------------------- + 3 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Confirm that transactions were correctly rolled forward +\c - - - :worker_1_port +SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; + count +------- + 1 +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5881b4792..acb53e49a 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -135,6 +135,7 @@ test: multi_repartition_udt test: multi_repartitioned_subquery_udf test: multi_modifying_xacts test: multi_metadata_snapshot +test: multi_transaction_recovery # --------- # multi_copy creates hash and range-partitioned tables and performs COPY diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 19c81e27c..bf9a0e7c4 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -40,6 +40,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-6'; ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; +ALTER EXTENSION citus UPDATE TO '6.0-10'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql new file mode 100644 index 000000000..b21e297f6 --- /dev/null +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -0,0 +1,37 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; + +-- Tests for prepared transaction recovery + +-- Ensure pg_dist_transaction is empty for test +SELECT recover_prepared_transactions(); + +SELECT * FROM pg_dist_transaction; + +-- Create some "fake" prepared transactions to recover +\c - - - :worker_1_port + +BEGIN; +CREATE TABLE should_abort (value int); +PREPARE TRANSACTION 'citus_0_should_abort'; + +BEGIN; +CREATE TABLE should_commit (value int); +PREPARE TRANSACTION 'citus_0_should_commit'; + +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; + +\c - - - :master_port +-- Add "fake" pg_dist_transaction records and run recovery +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); + +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- Confirm that transactions were correctly rolled forward +\c - - - :worker_1_port +SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; +SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';