Merge pull request #1063 from citusdata/multi_shard_multi_connection

Convert multi_shard_transaction to the new connection API

cr: @jasonmp85
pull/1068/head
Jason Petersen 2016-12-30 14:56:17 -07:00 committed by GitHub
commit 3182287574
3 changed files with 92 additions and 111 deletions

View File

@ -950,7 +950,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
TransactionConnection *transactionConnection = NULL;
MultiConnection *multiConnection = NULL;
PGconn *connection = NULL;
bool queryOK = false;
@ -963,9 +963,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
continue;
}
transactionConnection =
(TransactionConnection *) list_nth(connectionList, placementIndex);
connection = transactionConnection->connection;
multiConnection =
(MultiConnection *) list_nth(connectionList, placementIndex);
connection = multiConnection->pgConn;
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
@ -982,7 +982,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
TransactionConnection *transactionConnection = NULL;
MultiConnection *multiConnection = NULL;
PGconn *connection = NULL;
int64 currentAffectedTupleCount = 0;
bool failOnError = true;
@ -1001,9 +1001,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
continue;
}
transactionConnection =
(TransactionConnection *) list_nth(connectionList, placementIndex);
connection = transactionConnection->connection;
multiConnection =
(MultiConnection *) list_nth(connectionList, placementIndex);
connection = multiConnection->pgConn;
/*
* If caller is interested, store query results the first time

View File

@ -43,18 +43,95 @@ void
OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
{
ListCell *shardIntervalCell = NULL;
List *newConnectionList = NIL;
ListCell *connectionCell = NULL;
if (shardConnectionHash == NULL)
{
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
}
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* open connections to shards which don't have connections yet */
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
List *shardPlacementList = NIL;
ListCell *placementCell = NULL;
BeginTransactionOnShardPlacements(shardId, userName);
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
if (shardConnectionsFound)
{
continue;
}
shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
/* going to have to have some placements to do any work */
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
MultiConnection *connection = NULL;
MemoryContext oldContext = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
shardPlacement->nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node %s:%d",
shardPlacement->nodeName,
shardPlacement->nodePort)));
}
connection = StartNodeUserDatabaseConnection(FORCE_NEW_CONNECTION,
shardPlacement->nodeName,
shardPlacement->nodePort,
userName,
NULL);
/* we need to preserve the connection list for the next statement */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
shardConnections->connectionList = lappend(shardConnections->connectionList,
connection);
MemoryContextSwitchTo(oldContext);
newConnectionList = lappend(newConnectionList, connection);
/*
* Every individual failure should cause entire distributed
* transaction to fail.
*/
MarkRemoteTransactionCritical(connection);
}
}
/* finish connection establishment newly opened connections */
foreach(connectionCell, newConnectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
FinishConnectionEstablishment(connection);
}
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
{
RemoteTransactionsBeginIfNecessary(newConnectionList);
}
}
@ -85,107 +162,6 @@ CreateShardConnectionHash(MemoryContext memoryContext)
}
/*
* BeginTransactionOnShardPlacements opens new connections (if necessary) to
* all placements of a shard (specified by shard identifier). After sending a
* BEGIN command on all connections, they are added to shardConnectionHash for
* use within this transaction. Exits early if connections already exist for
* the specified shard, and errors if no placements can be found, a connection
* cannot be made, or if the BEGIN command fails.
*/
void
BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
{
List *shardPlacementList = NIL;
ListCell *placementCell = NULL;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
MemoryContext oldContext = NULL;
shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
/* going to have to have some placements to do any work */
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* get existing connections to the shard placements, if any */
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
if (shardConnectionsFound)
{
/* exit early if we've already established shard transactions */
return;
}
foreach(placementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
MultiConnection *connection = NULL;
TransactionConnection *transactionConnection = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
shardPlacement->nodePort);
int connectionFlags = FORCE_NEW_CONNECTION;
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("could not find worker node %s:%d",
shardPlacement->nodeName, shardPlacement->nodePort)));
}
/* XXX: It'd be nicer to establish connections asynchronously here */
connection = GetNodeUserDatabaseConnection(connectionFlags,
shardPlacement->nodeName,
shardPlacement->nodePort,
userName,
NULL);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(ERROR, (errmsg("could not establish a connection to all "
"placements of shard %lu", shardId)));
}
/* entries must last through the whole top-level transaction */
oldContext = MemoryContextSwitchTo(TopTransactionContext);
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerNode->groupId;
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
transactionConnection->connection = connection->pgConn;
transactionConnection->nodeName = shardPlacement->nodeName;
transactionConnection->nodePort = shardPlacement->nodePort;
shardConnections->connectionList = lappend(shardConnections->connectionList,
transactionConnection);
MemoryContextSwitchTo(oldContext);
/*
* Every individual failure should cause entire distributed
* transaction to fail.
*/
MarkRemoteTransactionCritical(connection);
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
{
/* issue BEGIN */
RemoteTransactionBegin(connection);
}
}
}
/*
* GetShardConnections finds existing connections for a shard in the global
* connection hash. If not found, then a ShardConnections structure with empty

View File

@ -21,13 +21,18 @@
typedef struct ShardConnections
{
int64 shardId;
/*
* XXX: this list contains MultiConnection for multi-shard transactions
* or TransactionConnection for COPY, the latter should be converted to
* use MultiConnection as well.
*/
List *connectionList;
} ShardConnections;
extern void OpenTransactionsToAllShardPlacements(List *shardIdList, char *relationOwner);
extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern void BeginTransactionOnShardPlacements(uint64 shardId, char *nodeUser);
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound);