mirror of https://github.com/citusdata/citus.git
Merge pull request #1104 from citusdata/connmgr_cleanup
Minor connection/transaction management related cleanupspull/1053/merge
commit
aca3770364
|
@ -163,7 +163,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
ConnectionHashEntry *entry = NULL;
|
ConnectionHashEntry *entry = NULL;
|
||||||
MultiConnection *connection;
|
MultiConnection *connection;
|
||||||
MemoryContext oldContext;
|
|
||||||
bool found;
|
bool found;
|
||||||
|
|
||||||
/* do some minimal input checks */
|
/* do some minimal input checks */
|
||||||
|
@ -234,11 +233,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
*/
|
*/
|
||||||
connection = StartConnectionEstablishment(&key);
|
connection = StartConnectionEstablishment(&key);
|
||||||
|
|
||||||
oldContext = MemoryContextSwitchTo(ConnectionContext);
|
|
||||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
if (flags & SESSION_LIFESPAN)
|
if (flags & SESSION_LIFESPAN)
|
||||||
{
|
{
|
||||||
connection->sessionLifespan = true;
|
connection->sessionLifespan = true;
|
||||||
|
|
|
@ -44,7 +44,6 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
|
||||||
{
|
{
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
List *newConnectionList = NIL;
|
List *newConnectionList = NIL;
|
||||||
ListCell *connectionCell = NULL;
|
|
||||||
|
|
||||||
if (shardConnectionHash == NULL)
|
if (shardConnectionHash == NULL)
|
||||||
{
|
{
|
||||||
|
@ -121,12 +120,7 @@ OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* finish connection establishment newly opened connections */
|
/* finish connection establishment newly opened connections */
|
||||||
foreach(connectionCell, newConnectionList)
|
FinishConnectionListEstablishment(newConnectionList);
|
||||||
{
|
|
||||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
|
||||||
|
|
||||||
FinishConnectionEstablishment(connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
|
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
|
||||||
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
|
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
|
||||||
|
|
|
@ -23,14 +23,12 @@
|
||||||
#include "access/relscan.h"
|
#include "access/relscan.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "distributed/commit_protocol.h"
|
|
||||||
#include "distributed/connection_cache.h"
|
#include "distributed/connection_cache.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/pg_dist_transaction.h"
|
#include "distributed/pg_dist_transaction.h"
|
||||||
#include "distributed/transaction_recovery.h"
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "storage/lock.h"
|
#include "storage/lock.h"
|
||||||
|
@ -71,35 +69,6 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* LogPreparedTransactions logs a commit record for all prepared transactions
|
|
||||||
* on connections in connectionList. The remote transaction is safe to commit
|
|
||||||
* once the record has been durably stored (i.e. the local transaction is
|
|
||||||
* committed).
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
LogPreparedTransactions(List *connectionList)
|
|
||||||
{
|
|
||||||
ListCell *connectionCell = NULL;
|
|
||||||
|
|
||||||
foreach(connectionCell, connectionList)
|
|
||||||
{
|
|
||||||
TransactionConnection *transactionConnection =
|
|
||||||
(TransactionConnection *) lfirst(connectionCell);
|
|
||||||
|
|
||||||
char transactionState PG_USED_FOR_ASSERTS_ONLY =
|
|
||||||
transactionConnection->transactionState;
|
|
||||||
int groupId = transactionConnection->groupId;
|
|
||||||
int64 connectionId = transactionConnection->connectionId;
|
|
||||||
StringInfo transactionName = BuildTransactionName(connectionId);
|
|
||||||
|
|
||||||
Assert(transactionState == TRANSACTION_STATE_PREPARED);
|
|
||||||
|
|
||||||
LogTransactionRecord(groupId, transactionName->data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LogTransactionRecord registers the fact that a transaction has been
|
* LogTransactionRecord registers the fact that a transaction has been
|
||||||
* prepared on a worker. The presence of this record indicates that the
|
* prepared on a worker. The presence of this record indicates that the
|
||||||
|
|
|
@ -12,11 +12,7 @@
|
||||||
#define TRANSACTION_RECOVERY_H
|
#define TRANSACTION_RECOVERY_H
|
||||||
|
|
||||||
|
|
||||||
#include "nodes/pg_list.h"
|
|
||||||
|
|
||||||
|
|
||||||
/* Functions declarations for worker transactions */
|
/* Functions declarations for worker transactions */
|
||||||
extern void LogPreparedTransactions(List *connectionList);
|
|
||||||
extern void LogTransactionRecord(int groupId, char *transactionName);
|
extern void LogTransactionRecord(int groupId, char *transactionName);
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue