Add regression tests and refactor transaction management functions

pull/366/head
Marco Slot 2016-02-25 21:13:05 +01:00 committed by Metin Doslu
parent 834c87f73a
commit 83c83fdf1a
10 changed files with 792 additions and 498 deletions

View File

@ -43,9 +43,9 @@
#include "distributed/master_protocol.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_transaction.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_manager.h"
#include "distributed/worker_protocol.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
@ -97,7 +97,7 @@
int CopyTransactionManager = TRANSACTION_MANAGER_1PC;
/* Data structures copy.c, to keep track of COPY processing state */
/* Data structures from copy.c, to keep track of COPY processing state */
typedef enum CopyDest
{
COPY_FILE, /* to/from file (or a piped program) */
@ -227,14 +227,7 @@ typedef struct CopyStateData
} CopyStateData;
/* Data structures for keeping track of connections to placements */
typedef struct PlacementConnection
{
int64 shardId;
bool prepared;
PGconn* connection;
} PlacementConnection;
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
@ -259,15 +252,13 @@ static ShardInterval * SearchCachedShardInterval(Datum partitionColumnValue,
static void OpenShardConnections(CopyStmt *copyStatement,
ShardConnections *shardConnections,
int64 shardId);
static char * ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void AppendColumnNames(StringInfo buf, List *columnList);
static void AppendCopyOptions(StringInfo buf, List *copyOptionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void AppendColumnNames(StringInfo command, List *columnList);
static void AppendCopyOptions(StringInfo command, List *copyOptionList);
static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections);
static List * ConnectionList(HTAB *connectionHash);
static void PrepareCopyTransaction(List *connectionList);
static bool EndRemoteCopy(PGconn *connection);
static void AbortCopyTransaction(List *connectionList);
static void CommitCopyTransaction(List *connectionList);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
/*
@ -323,18 +314,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
}
}
/* load the list of shards and verify that we have shards to copy into */
shardIntervalList = LoadShardIntervalList(tableId);
if (shardIntervalList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for query"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
partitionColumn = PartitionColumn(tableId, 0);
partitionMethod = PartitionMethod(tableId);
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
{
@ -342,8 +322,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
errmsg("COPY is only supported for hash- and range-partitioned tables")));
}
partitionColumn = PartitionColumn(tableId, 0);
/* resolve hash function for partition column */
typeEntry = lookup_type_cache(partitionColumn->vartype, TYPECACHE_HASH_PROC_FINFO);
hashFunction = &(typeEntry->hash_proc_finfo);
@ -358,22 +336,58 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
/*
* We create a new memory context called tuple context, and read and write
* each row's values within this memory context. After each read and write,
* we reset the memory context. That way, we immediately release memory
* allocated for each row, and don't bloat memory usage with large input
* files.
*/
tupleContext = AllocSetContextCreate(CurrentMemoryContext,
"COPY Row Memory Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* load the list of shards and verify that we have shards to copy into */
shardIntervalList = LoadShardIntervalList(tableId);
if (shardIntervalList == NIL)
{
if (partitionMethod == DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for query"),
errdetail("No shards exist for distributed table \"%s\".",
relationName),
errhint("Run master_create_worker_shards to create shards "
"and try again.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not find any shards for query"),
errdetail("No shards exist for distributed table \"%s\".",
relationName)));
}
}
/* create a mapping of shard id to a connection for each of its placements */
shardConnectionHash = CreateShardConnectionHash();
/* lock shards in order of shard id to prevent deadlock */
shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int64 shardId = shardInterval->shardId;
/* prevent concurrent changes to number of placements */
LockShardDistributionMetadata(shardId, ShareLock);
/* prevent concurrent update/delete statements */
LockShardResource(shardId, ShareLock);
}
/* initialize the shard interval cache */
shardCount = list_length(shardIntervalList);
shardIntervalCache = SortedShardIntervalArray(shardIntervalList);
/* determine whether to use binary search */
if (partitionMethod != DISTRIBUTE_BY_HASH ||
!IsUniformHashDistribution(shardIntervalCache, shardCount))
{
useBinarySearch = true;
}
/* initialize copy state to read from COPY data source */
copyState = BeginCopyFrom(rel, copyStatement->filename,
copyStatement->is_program,
@ -392,32 +406,18 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback;
/* lock shards in order of shard id to prevent deadlock */
shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int64 shardId = shardInterval->shardId;
/* prevent concurrent changes to number of placements */
LockShardDistributionMetadata(shardId, ShareLock);
/* prevent concurrent update/delete statements */
LockShardResource(shardId, ShareLock);
}
/* initialize the shard interval cache */
shardCount = list_length(shardIntervalList);
shardIntervalCache = SortedShardIntervalArray(shardIntervalList);
/* determine whether to use binary search */
if (partitionMethod != DISTRIBUTE_BY_HASH ||
!IsUniformHashDistribution(shardIntervalCache, shardCount))
{
useBinarySearch = true;
}
/*
* We create a new memory context called tuple context, and read and write
* each row's values within this memory context. After each read and write,
* we reset the memory context. That way, we immediately release memory
* allocated for each row, and don't bloat memory usage with large input
* files.
*/
tupleContext = AllocSetContextCreate(CurrentMemoryContext,
"COPY Row Memory Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY();
@ -494,9 +494,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
MemoryContextReset(tupleContext);
}
/* prepare two phase commit in replicas */
connectionList = ConnectionList(shardConnectionHash);
PrepareCopyTransaction(connectionList);
EndRemoteCopy(connectionList, true);
if (CopyTransactionManager == TRANSACTION_MANAGER_2PC)
{
PrepareTransactions(connectionList);
}
}
PG_CATCH();
{
@ -505,7 +510,10 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
/* roll back all transactions */
connectionList = ConnectionList(shardConnectionHash);
AbortCopyTransaction(connectionList);
EndRemoteCopy(connectionList, false);
AbortTransactions(connectionList);
CloseConnections(connectionList);
PG_RE_THROW();
}
PG_END_TRY();
@ -517,15 +525,17 @@ CitusCopyFrom(CopyStmt *copyStatement, char* completionTag)
if (QueryCancelPending)
{
AbortCopyTransaction(connectionList);
AbortTransactions(connectionList);
ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to user request")));
}
else
{
CommitCopyTransaction(connectionList);
CommitTransactions(connectionList);
}
CloseConnections(connectionList);
if (completionTag != NULL)
{
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
@ -703,9 +713,6 @@ static void
OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
int64 shardId)
{
CitusTransactionManager const *transactionManager =
&CitusTransactionManagerImpl[CopyTransactionManager];
List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *placementCell = NULL;
@ -719,43 +726,42 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
TransactionConnection *transactionConnection = NULL;
StringInfo copyCommand = NULL;
PGresult *result = NULL;
PGconn *connection = ConnectToNode(nodeName, nodePort);
if (connection != NULL)
{
char *copyCommand = ConstructCopyStatement(copyStatement, shardId);
/*
* New connection: start transaction with copy command on it.
* Append shard id to table name.
*/
if (transactionManager->Begin(connection) &&
ExecuteCommand(connection, PGRES_COPY_IN, copyCommand))
{
PlacementConnection *placementConnection =
(PlacementConnection *) palloc0(sizeof(PlacementConnection));
placementConnection->shardId = shardId;
placementConnection->prepared = false;
placementConnection->connection = connection;
connectionList = lappend(connectionList, placementConnection);
}
else
if (connection == NULL)
{
failedPlacementList = lappend(failedPlacementList, placement);
ereport(WARNING, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to start '%s' on node %s:%d",
copyCommand, nodeName, nodePort)));
continue;
}
}
else
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReportRemoteError(connection, result);
failedPlacementList = lappend(failedPlacementList, placement);
ereport(WARNING, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to connect to node %s:%d",
nodeName, nodePort)));
continue;
}
copyCommand = ConstructCopyStatement(copyStatement, shardId);
result = PQexec(connection, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportRemoteError(connection, result);
failedPlacementList = lappend(failedPlacementList, placement);
continue;
}
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->connectionId = shardId;
transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED;
transactionConnection->connection = connection;
connectionList = lappend(connectionList, transactionConnection);
}
/* if all placements failed, error out */
@ -778,7 +784,6 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections
shardConnections->shardId = shardId;
shardConnections->connectionList = connectionList;
}
@ -786,98 +791,80 @@ OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections
* ConstructCopyStattement constructs the text of a COPY statement for a particular
* shard.
*/
static char *
static StringInfo
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
{
StringInfo buf = makeStringInfo();
StringInfo command = makeStringInfo();
char *qualifiedName = NULL;
qualifiedName = quote_qualified_identifier(copyStatement->relation->schemaname,
copyStatement->relation->relname);
appendStringInfo(buf, "COPY %s_%ld ", qualifiedName, (long) shardId);
appendStringInfo(command, "COPY %s_%ld ", qualifiedName, shardId);
if (copyStatement->attlist != NIL)
{
AppendColumnNames(buf, copyStatement->attlist);
AppendColumnNames(command, copyStatement->attlist);
}
appendStringInfoString(buf, "FROM STDIN");
appendStringInfoString(command, "FROM STDIN");
if (copyStatement->options)
{
appendStringInfoString(buf, " WITH ");
appendStringInfoString(command, " WITH ");
AppendCopyOptions(buf, copyStatement->options);
AppendCopyOptions(command, copyStatement->options);
}
return buf->data;
return command;
}
/*
* AppendCopyOptions deparses a list of CopyStmt options and appends them to buf.
* AppendCopyOptions deparses a list of CopyStmt options and appends them to command.
*/
static void
AppendCopyOptions(StringInfo buf, List *copyOptionList)
AppendCopyOptions(StringInfo command, List *copyOptionList)
{
ListCell *optionCell = NULL;
char separator = '(';
foreach(optionCell, copyOptionList)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
DefElem *option = (DefElem *) lfirst(optionCell);
if (strcmp(defel->defname, "header") == 0 && defGetBoolean(defel))
if (strcmp(option->defname, "header") == 0 && defGetBoolean(option))
{
/* worker should not skip header again */
continue;
}
appendStringInfo(buf, "%c%s ", separator, defel->defname);
appendStringInfo(command, "%c%s ", separator, option->defname);
if (strcmp(defel->defname, "force_quote") == 0)
if (strcmp(option->defname, "force_not_null") == 0 ||
strcmp(option->defname, "force_null") == 0)
{
if (!defel->arg)
if (!option->arg)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname)));
}
else if (IsA(defel->arg, A_Star))
{
appendStringInfoString(buf, "*");
option->defname)));
}
else
{
AppendColumnNames(buf, (List *) defel->arg);
}
}
else if (strcmp(defel->defname, "force_not_null") == 0 ||
strcmp(defel->defname, "force_null") == 0)
{
if (!defel->arg)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname)));
}
else
{
AppendColumnNames(buf, (List *) defel->arg);
AppendColumnNames(command, (List *) option->arg);
}
}
else
{
appendStringInfoString(buf, defGetString(defel));
appendStringInfo(command, "'%s'", defGetString(option));
}
separator = ',';
}
appendStringInfoChar(buf, ')');
appendStringInfoChar(command, ')');
}
@ -885,7 +872,7 @@ AppendCopyOptions(StringInfo buf, List *copyOptionList)
* AppendColumnList deparses a list of column names into a StringInfo.
*/
static void
AppendColumnNames(StringInfo buf, List *columnList)
AppendColumnNames(StringInfo command, List *columnList)
{
ListCell *attributeCell = NULL;
char separator = '(';
@ -893,11 +880,11 @@ AppendColumnNames(StringInfo buf, List *columnList)
foreach(attributeCell, columnList)
{
char *columnName = strVal(lfirst(attributeCell));
appendStringInfo(buf, "%c%s", separator, quote_identifier(columnName));
appendStringInfo(command, "%c%s", separator, quote_identifier(columnName));
separator = ',';
}
appendStringInfoChar(buf, ')');
appendStringInfoChar(command, ')');
}
@ -910,9 +897,9 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
ListCell *connectionCell = NULL;
foreach(connectionCell, shardConnections->connectionList)
{
PlacementConnection *placementConnection =
(PlacementConnection *) lfirst(connectionCell);
PGconn *connection = placementConnection->connection;
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = shardConnections->shardId;
/* copy the line buffer into the placement */
@ -922,8 +909,8 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("COPY to shard %ld on %s:%s failed",
(long) shardId, nodeName, nodePort)));
errmsg("Failed to COPY to shard %ld on %s:%s",
shardId, nodeName, nodePort)));
}
}
}
@ -945,10 +932,10 @@ ConnectionList(HTAB *connectionHash)
ListCell *connectionCell = NULL;
foreach(connectionCell, shardConnections->connectionList)
{
PlacementConnection *placementConnection =
(PlacementConnection *) lfirst(connectionCell);
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
connectionList = lappend(connectionList, placementConnection);
connectionList = lappend(connectionList, transactionConnection);
}
}
@ -957,145 +944,91 @@ ConnectionList(HTAB *connectionHash)
/*
* End copy and prepare transaction.
* This function is applied for each shard placement unless some error happen.
* Status of this function is stored in ShardConnections::status field
* EndRemoteCopy ends the COPY input on all connections. If stopOnFailure
* is true, then EndRemoteCopy reports an error on failure, otherwise it
* reports a warning.
*/
static void
PrepareCopyTransaction(List *connectionList)
EndRemoteCopy(List *connectionList, bool stopOnFailure)
{
CitusTransactionManager const *transactionManager =
&CitusTransactionManagerImpl[CopyTransactionManager];
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
PlacementConnection *placementConnection =
(PlacementConnection *) lfirst(connectionCell);
PGconn *connection = placementConnection->connection;
int64 shardId = placementConnection->shardId;
if (EndRemoteCopy(connection) &&
transactionManager->Prepare(connection, BuildTransactionId(shardId)))
{
placementConnection->prepared = true;
}
else
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to prepare transaction for shard %ld",
(long) shardId)));
}
}
}
/*
* EndRemoteCopy sends PQputCopyEnd command to the client and checks the result.
*/
static bool
EndRemoteCopy(PGconn *connection)
{
PGresult *result = NULL;
int copyEndResult = PQputCopyEnd(connection, NULL);
if (copyEndResult != 1)
{
return false;
}
while ((result = PQgetResult(connection)) != NULL)
{
int resultStatus = PQresultStatus(result);
if (resultStatus != PGRES_COMMAND_OK)
{
ReportRemoteError(connection, result);
return false;
}
PQclear(result);
}
return true;
}
/*
* AbortCopyTransaction aborts a two-phase commit. It attempts to roll back
* all transactions even if some of them fail, in which case a warning is given
* for each of them.
*/
static void
AbortCopyTransaction(List *connectionList)
{
CitusTransactionManager const *transactionManager =
&CitusTransactionManagerImpl[CopyTransactionManager];
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
PlacementConnection *placementConnection =
(PlacementConnection *) lfirst(connectionCell);
PGconn *connection = placementConnection->connection;
int64 shardId = placementConnection->shardId;
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
int copyEndResult = 0;
PGresult *result = NULL;
if (placementConnection->prepared)
if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED)
{
char *transactionId = BuildTransactionId(shardId);
/* COPY already ended during the prepare phase */
continue;
}
if (!transactionManager->RollbackPrepared(connection, transactionId))
/* end the COPY input */
copyEndResult = PQputCopyEnd(connection, NULL);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
if (copyEndResult != 1)
{
ereport(WARNING, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to roll back transaction '%s' on %s:%s",
transactionId, nodeName, nodePort)));
}
}
else if (!EndRemoteCopy(connection) &&
!transactionManager->Rollback(connection))
if (stopOnFailure)
{
ereport(WARNING, (errcode(ERRCODE_IO_ERROR),
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %ld on %s:%s",
shardId, nodeName, nodePort)));
}
PQfinish(connection);
continue;
}
/* check whether there were any COPY errors */
result = PQgetResult(connection);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{
ReportCopyError(connection, result);
}
PQclear(result);
}
}
/*
* CommitCopyTransaction commits a two-phase commit. It attempts to commit all
* transactionsm even if some of them fail, in which case a warning is given
* for each of them.
* ReportCopyError tries to report a useful error message for the user from
* the remote COPY error messages.
*/
static void
CommitCopyTransaction(List *connectionList)
ReportCopyError(PGconn *connection, PGresult *result)
{
CitusTransactionManager const *transactionManager =
&CitusTransactionManagerImpl[CopyTransactionManager];
char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
if (remoteMessage != NULL)
{
PlacementConnection *placementConnection =
(PlacementConnection *) lfirst(connectionCell);
PGconn *connection = placementConnection->connection;
int64 shardId = placementConnection->shardId;
char *transactionId = BuildTransactionId(shardId);
/* probably a constraint violation, show remote message and detail */
char *remoteDetail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
Assert(placementConnection->prepared);
if (!transactionManager->CommitPrepared(connection, transactionId))
ereport(ERROR, (errmsg("%s", remoteMessage),
errdetail("%s", remoteDetail)));
}
else
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to commit transaction '%s' on %s:%s",
transactionId, nodeName, nodePort)));
/* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL;
remoteMessage = PQerrorMessage(connection);
lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
PQfinish(connection);
ereport(ERROR, (errmsg("%s", remoteMessage)));
}
}

View File

@ -28,9 +28,9 @@
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_transaction.h"
#include "distributed/multi_utility.h"
#include "distributed/task_tracker.h"
#include "distributed/transaction_manager.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "postmaster/postmaster.h"

View File

@ -0,0 +1,208 @@
/*-------------------------------------------------------------------------
*
* multi_transaction.c
* This file contains functions for managing 1PC or 2PC transactions
* across many shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/connection_cache.h"
#include "distributed/multi_transaction.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/* Local functions forward declarations */
static StringInfo BuildTransactionName(int connectionId);
/*
* PrepareTransactions prepares all transactions on connections in
* connectionList for commit if the 2PC transaction manager is enabled.
* On failure, it reports an error and stops.
*/
void
PrepareTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
PGresult *result = NULL;
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
/* a failure to prepare is an implicit rollback */
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
ReportRemoteError(connection, result);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to prepare transaction")));
}
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
}
}
/*
* AbortTransactions aborts all transactions on connections in connectionList.
* On failure, it reports a warning and continues to abort all of them.
*/
void
AbortTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
{
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
/* log a warning so the user may abort the transaction later */
ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'",
transactionName->data),
errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s",
transactionName->data, nodeName, nodePort)));
}
PQclear(result);
}
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
{
/* try to roll back cleanly, if it fails then we won't commit anyway */
result = PQexec(connection, "ROLLBACK");
PQclear(result);
}
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
}
}
/*
* CommitTransactions commits all transactions on connections in connectionList.
* On failure, it reports a warning and continues committing all of them.
*/
void
CommitTransactions(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 connectionId = transactionConnection->connectionId;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
PGresult *result = NULL;
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
{
StringInfo command = makeStringInfo();
StringInfo transactionName = BuildTransactionName(connectionId);
/* we shouldn't be committing if any transactions are not prepared */
Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED);
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data);
result = PQexec(connection, command->data);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
/* log a warning so the user may commit the transaction later */
ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'",
transactionName->data),
errhint("Run COMMIT TRANSACTION '%s' on %s:%s",
transactionName->data, nodeName, nodePort)));
}
}
else
{
/* we shouldn't be committing if any transactions are not open */
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
/* try to commit, if it fails then the user might lose data */
result = PQexec(connection, "COMMIT");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ereport(WARNING, (errmsg("Failed to commit transaction on %s:%s",
nodeName, nodePort)));
}
}
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
}
}
/*
* BuildTransactionName constructs a unique transaction name from an ID.
*/
static StringInfo
BuildTransactionName(int connectionId)
{
StringInfo commandString = makeStringInfo();
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
GetCurrentTransactionId(), connectionId);
return commandString;
}
/*
* CloseConnections closes all connections in connectionList.
*/
void
CloseConnections(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PQfinish(connection);
}
}

View File

@ -1,173 +0,0 @@
/*-------------------------------------------------------------------------
*
* transaction_manager.c
* This file contains functions that comprise a pluggable API for
* managing transactions across many worker nodes using 1PC or 2PC.
*
* Contributed by Konstantin Knizhnik, Postgres Professional
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/connection_cache.h"
#include "distributed/transaction_manager.h"
static bool BeginTransaction(PGconn *connection);
static bool Prepare1PC(PGconn *connection, char *transactionId);
static bool CommitPrepared1PC(PGconn *connection, char *transactionId);
static bool RollbackPrepared1PC(PGconn *connection, char *transactionId);
static bool RollbackTransaction(PGconn *connection);
static bool Prepare2PC(PGconn *connection, char *transactionId);
static bool CommitPrepared2PC(PGconn *connection, char *transactionId);
static bool RollbackPrepared2PC(PGconn *connection, char *transactionId);
static char * Build2PCCommand(char const *command, char *transactionId);
CitusTransactionManager const CitusTransactionManagerImpl[] =
{
{ BeginTransaction, Prepare1PC, CommitPrepared1PC,
RollbackPrepared1PC, RollbackTransaction },
{ BeginTransaction, Prepare2PC, CommitPrepared2PC,
RollbackPrepared2PC, RollbackTransaction }
};
/*
* BeginTransaction sends a BEGIN command to start a transaction.
*/
static bool
BeginTransaction(PGconn *connection)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK, "BEGIN");
}
/*
* Prepare1PC does nothing since 1PC mode does not have a prepare phase.
* This function is provided for compatibility with the 2PC API.
*/
static bool
Prepare1PC(PGconn *connection, char *transactionId)
{
return true;
}
/*
* Commit1PC sends a COMMIT command to commit a transaction.
*/
static bool
CommitPrepared1PC(PGconn *connection, char *transactionId)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK, "COMMIT");
}
/*
* RollbackPrepared1PC sends a ROLLBACK command to roll a transaction
* back. This function is provided for compatibility with the 2PC API.
*/
static bool
RollbackPrepared1PC(PGconn *connection, char *transactionId)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK");
}
/*
* Prepare2PC sends a PREPARE TRANSACTION command to prepare a 2PC.
*/
static bool
Prepare2PC(PGconn *connection, char *transactionId)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK,
Build2PCCommand("PREPARE TRANSACTION", transactionId));
}
/*
* CommitPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC.
*/
static bool
CommitPrepared2PC(PGconn *connection, char *transactionId)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK,
Build2PCCommand("COMMIT PREPARED", transactionId));
}
/*
* RollbackPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC.
*/
static bool
RollbackPrepared2PC(PGconn *connection, char *transactionId)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK,
Build2PCCommand("ROLLBACK PREPARED", transactionId));
}
/*
* RollbackTransaction sends a ROLLBACK command to roll a transaction back.
*/
static bool
RollbackTransaction(PGconn *connection)
{
return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK");
}
/*
* Build2PCCommand builds a command with a unique transaction ID for a two-phase commit.
*/
static char *
Build2PCCommand(char const *command, char *transactionId)
{
StringInfo commandString = makeStringInfo();
appendStringInfo(commandString, "%s '%s'", transactionId);
return commandString->data;
}
/*
* BuildTransactionId helps users construct a unique transaction id from an
* application-specific id.
*/
char *
BuildTransactionId(int localId)
{
StringInfo commandString = makeStringInfo();
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
GetCurrentTransactionId(), localId);
return commandString->data;
}
/*
* ExecuteCommand executes a statement on a remote node and checks its result.
*/
bool
ExecuteCommand(PGconn *connection, ExecStatusType expectedResult, char const *command)
{
bool ret = true;
PGresult *result = PQexec(connection, command);
if (PQresultStatus(result) != expectedResult)
{
ReportRemoteError(connection, result);
ret = false;
}
PQclear(result);
return ret;
}

View File

@ -1,7 +1,6 @@
/*-------------------------------------------------------------------------
*
* multi_copy.h
*
* Declarations for public functions and variables used in COPY for
* distributed tables.
*
@ -14,8 +13,7 @@
#define MULTI_COPY_H
#include "libpq-fe.h"
#include "distributed/transaction_manager.h"
#include "nodes/parsenodes.h"
/* config variable managed via guc.c */

View File

@ -0,0 +1,57 @@
/*-------------------------------------------------------------------------
*
* multi_transaction.h
* Type and function declarations used in performing transactions across
* shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_TRANSACTION_H
#define MULTI_TRANSACTION_H
#include "libpq-fe.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
/* Enumeration that defines the different transaction managers available */
typedef enum
{
TRANSACTION_MANAGER_1PC = 0,
TRANSACTION_MANAGER_2PC = 1
} TransactionManagerType;
/* Enumeration that defines different remote transaction states */
typedef enum
{
TRANSACTION_STATE_INVALID = 0,
TRANSACTION_STATE_OPEN,
TRANSACTION_STATE_COPY_STARTED,
TRANSACTION_STATE_PREPARED,
TRANSACTION_STATE_CLOSED
} TransactionState;
/*
* TransactionConnection represents a connection to a remote node which is
* used to perform a transaction on shard placements.
*/
typedef struct TransactionConnection
{
int64 connectionId;
TransactionState transactionState;
PGconn* connection;
} TransactionConnection;
/* Functions declarations for transaction and connection management */
extern void PrepareTransactions(List *connectionList);
extern void AbortTransactions(List *connectionList);
extern void CommitTransactions(List *connectionList);
extern void CloseConnections(List *connectionList);
#endif /* MULTI_TRANSACTION_H */

View File

@ -1,49 +0,0 @@
/*-------------------------------------------------------------------------
*
* transaction_manager.h
*
* Transaction manager API.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef TRANSACTION_MANAGER_H
#define TRANSACTION_MANAGER_H
#include "libpq-fe.h"
#include "utils/guc.h"
/* pluggable transaction manager API */
typedef struct CitusTransactionManager
{
bool (*Begin)(PGconn *conn);
bool (*Prepare)(PGconn *conn, char *transactionId);
bool (*CommitPrepared)(PGconn *conn, char *transactionId);
bool (*RollbackPrepared)(PGconn *conn, char *transactionId);
bool (*Rollback)(PGconn *conn);
} CitusTransactionManager;
/* Enumeration that defines the transaction manager to use */
typedef enum
{
TRANSACTION_MANAGER_1PC = 0,
TRANSACTION_MANAGER_2PC = 1
} TransactionManagerType;
/* Implementations of the transaction manager API */
extern CitusTransactionManager const CitusTransactionManagerImpl[];
/* Function declarations for copying into a distributed table */
extern bool ExecuteCommand(PGconn *connection, ExecStatusType expectedResult,
char const *command);
extern char * BuildTransactionId(int localId);
#endif /* TRANSACTION_MANAGER_H */

View File

@ -0,0 +1,141 @@
--
-- MULTI_COPY
--
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
-- Test empty copy
COPY customer_copy_hash FROM STDIN;
\.
-- Test syntax error
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
1,customer1
2,customer2,
notinteger,customernot
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
1,customer1
2,customer2
2,customer2
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test headers option
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
# header
1,customer1
2,customer2
3,customer3
\.
-- Confirm that only first row was skipped
SELECT count(*) FROM customer_copy_hash;
-- Test force_not_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
"4","customer4",""
\.
-- Confirm that value is not null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
-- Test force_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
"5","customer5",""
\.
-- Confirm that value is null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
-- Test null violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
6,customer6
7,customer7
8,
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test server-side copy from program
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
WITH (DELIMITER ' ');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
-- Test server-side copy from file
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test client-side copy from file
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,
c_name varchar(25),
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
WHERE shardid = :new_shard_id;
-- Test copy into range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
-- Check whether data went into the right shard (maybe)
SELECT count(*) FROM customer_copy_range WHERE c_custkey = 1;
-- Check whether data was copied
SELECT count(*) FROM customer_copy_range;

View File

@ -99,6 +99,7 @@ test: multi_append_table_to_shard
# ---------
test: multi_outer_join
#
# ---
# Tests covering mostly modification queries and required preliminary
# functionality related to metadata, shard creation, shard pruning and
@ -119,6 +120,11 @@ test: multi_utilities
test: multi_create_insert_proxy
test: multi_data_types
# ---------
# multi_copy creates hash and range-partitioned tables and performs COPY
# ---------
test: multi_copy
# ----------
# multi_large_shardid stages more shards into lineitem
# ----------

View File

@ -0,0 +1,173 @@
--
-- MULTI_COPY
--
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
DETAIL: No shards exist for distributed table "customer_copy_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- Test empty copy
COPY customer_copy_hash FROM STDIN;
-- Test syntax error
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
ERROR: invalid input syntax for integer: "1,customer1"
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
0
(1 row)
-- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
DETAIL: Key (c_custkey)=(2) already exists.
CONTEXT: COPY customer_copy_hash, line 4: ""
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
0
(1 row)
-- Test headers option
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
-- Confirm that only first row was skipped
SELECT count(*) FROM customer_copy_hash;
count
-------
3
(1 row)
-- Test force_not_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
-- Confirm that value is not null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
count
-------
1
(1 row)
-- Test force_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
-- Confirm that value is null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
count
-------
0
(1 row)
-- Test null violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: null value in column "c_name" violates not-null constraint
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
CONTEXT: COPY customer_copy_hash, line 4: ""
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
5
(1 row)
-- Test server-side copy from program
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
WITH (DELIMITER ' ');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
count
-------
1
(1 row)
-- Test server-side copy from file
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
1006
(1 row)
-- Test client-side copy from file
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
2006
(1 row)
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,
c_name varchar(25),
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards for query
DETAIL: No shards exist for distributed table "customer_copy_range".
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
WHERE shardid = :new_shard_id;
-- Test copy into range-partitioned table
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
-- Check whether data went into the right shard (maybe)
SELECT count(*) FROM customer_copy_range WHERE c_custkey = 1;
count
-------
1
(1 row)
-- Check whether data was copied
SELECT count(*) FROM customer_copy_range;
count
-------
1000
(1 row)