mirror of https://github.com/citusdata/citus.git
Merge pull request #656 from citusdata/cleanup/commit-handler
Move CompleteShardPlacementTransactions to multi_shard_transaction.cpull/658/head
commit
082b6b9416
|
@ -45,66 +45,6 @@ InitializeDistributedTransaction(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CompleteShardPlacementTransactions commits or aborts pending shard placement
|
|
||||||
* transactions when the local transaction commits or aborts.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
|
||||||
{
|
|
||||||
if (shardPlacementConnectionList == NIL)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (event == XACT_EVENT_PRE_COMMIT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Any failure here will cause local changes to be rolled back,
|
|
||||||
* and remote changes to either roll back (1PC) or, in case of
|
|
||||||
* connection or node failure, leave a prepared transaction
|
|
||||||
* (2PC).
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
|
||||||
{
|
|
||||||
PrepareRemoteTransactions(shardPlacementConnectionList);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else if (event == XACT_EVENT_COMMIT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* A failure here will cause some remote changes to either
|
|
||||||
* roll back (1PC) or, in case of connection or node failure,
|
|
||||||
* leave a prepared transaction (2PC). However, the local
|
|
||||||
* changes have already been committed.
|
|
||||||
*/
|
|
||||||
|
|
||||||
CommitRemoteTransactions(shardPlacementConnectionList, false);
|
|
||||||
}
|
|
||||||
else if (event == XACT_EVENT_ABORT)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* A failure here will cause some remote changes to either
|
|
||||||
* roll back (1PC) or, in case of connection or node failure,
|
|
||||||
* leave a prepared transaction (2PC). The local changes have
|
|
||||||
* already been rolled back.
|
|
||||||
*/
|
|
||||||
|
|
||||||
AbortRemoteTransactions(shardPlacementConnectionList);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseConnections(shardPlacementConnectionList);
|
|
||||||
shardPlacementConnectionList = NIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PrepareRemoteTransactions prepares all transactions on connections in
|
* PrepareRemoteTransactions prepares all transactions on connections in
|
||||||
* connectionList for commit if the 2PC commit protocol is enabled.
|
* connectionList for commit if the 2PC commit protocol is enabled.
|
||||||
|
|
|
@ -23,10 +23,12 @@
|
||||||
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
#define INITIAL_CONNECTION_CACHE_SIZE 1001
|
||||||
|
|
||||||
|
|
||||||
List *shardPlacementConnectionList = NIL;
|
/* Local functions forward declarations */
|
||||||
|
|
||||||
static void RegisterShardPlacementXactCallback(void);
|
static void RegisterShardPlacementXactCallback(void);
|
||||||
|
|
||||||
|
|
||||||
|
/* Global variables used in commit handler */
|
||||||
|
static List *shardPlacementConnectionList = NIL;
|
||||||
static bool isXactCallbackRegistered = false;
|
static bool isXactCallbackRegistered = false;
|
||||||
|
|
||||||
|
|
||||||
|
@ -224,6 +226,66 @@ RegisterShardPlacementXactCallback(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CompleteShardPlacementTransactions commits or aborts pending shard placement
|
||||||
|
* transactions when the local transaction commits or aborts.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CompleteShardPlacementTransactions(XactEvent event, void *arg)
|
||||||
|
{
|
||||||
|
if (shardPlacementConnectionList == NIL)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (event == XACT_EVENT_PRE_COMMIT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Any failure here will cause local changes to be rolled back,
|
||||||
|
* and remote changes to either roll back (1PC) or, in case of
|
||||||
|
* connection or node failure, leave a prepared transaction
|
||||||
|
* (2PC).
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||||
|
{
|
||||||
|
PrepareRemoteTransactions(shardPlacementConnectionList);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else if (event == XACT_EVENT_COMMIT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* A failure here will cause some remote changes to either
|
||||||
|
* roll back (1PC) or, in case of connection or node failure,
|
||||||
|
* leave a prepared transaction (2PC). However, the local
|
||||||
|
* changes have already been committed.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CommitRemoteTransactions(shardPlacementConnectionList, false);
|
||||||
|
}
|
||||||
|
else if (event == XACT_EVENT_ABORT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* A failure here will cause some remote changes to either
|
||||||
|
* roll back (1PC) or, in case of connection or node failure,
|
||||||
|
* leave a prepared transaction (2PC). The local changes have
|
||||||
|
* already been rolled back.
|
||||||
|
*/
|
||||||
|
|
||||||
|
AbortRemoteTransactions(shardPlacementConnectionList);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CloseConnections(shardPlacementConnectionList);
|
||||||
|
shardPlacementConnectionList = NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CloseConnections closes all connections in connectionList.
|
* CloseConnections closes all connections in connectionList.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
#define MULTI_SHARD_TRANSACTION_H
|
#define MULTI_SHARD_TRANSACTION_H
|
||||||
|
|
||||||
|
|
||||||
#include "access/xact.h"
|
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
@ -26,9 +25,6 @@ typedef struct ShardConnections
|
||||||
} ShardConnections;
|
} ShardConnections;
|
||||||
|
|
||||||
|
|
||||||
extern List *shardPlacementConnectionList;
|
|
||||||
|
|
||||||
|
|
||||||
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
|
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
|
||||||
char *relationOwner);
|
char *relationOwner);
|
||||||
extern HTAB * CreateShardConnectionHash(void);
|
extern HTAB * CreateShardConnectionHash(void);
|
||||||
|
@ -40,4 +36,5 @@ extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
|
||||||
extern List * ConnectionList(HTAB *connectionHash);
|
extern List * ConnectionList(HTAB *connectionHash);
|
||||||
extern void CloseConnections(List *connectionList);
|
extern void CloseConnections(List *connectionList);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_SHARD_TRANSACTION_H */
|
#endif /* MULTI_SHARD_TRANSACTION_H */
|
||||||
|
|
Loading…
Reference in New Issue