Merge pull request #818 from citusdata/fix_xact_callbacks

Directly register transaction callbacks in PG_init

cr: @anarazel
pull/815/head
Jason Petersen 2016-09-29 11:52:03 -06:00 committed by GitHub
commit 9c19ad8b78
5 changed files with 7 additions and 48 deletions

View File

@ -80,7 +80,6 @@ bool AllModificationsCommutative = false;
*/ */
static HTAB *xactParticipantHash = NULL; static HTAB *xactParticipantHash = NULL;
static List *xactShardConnSetList = NIL; static List *xactShardConnSetList = NIL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool subXactAbortAttempted = false; static bool subXactAbortAttempted = false;
/* functions needed during start phase */ /* functions needed during start phase */
@ -113,7 +112,6 @@ static void RecordShardIdParticipant(uint64 affectedShardId,
NodeConnectionEntry *participantEntry); NodeConnectionEntry *participantEntry);
/* functions needed by callbacks and hooks */ /* functions needed by callbacks and hooks */
static void RegisterRouterExecutorXactCallbacks(void);
static void RouterTransactionCallback(XactEvent event, void *arg); static void RouterTransactionCallback(XactEvent event, void *arg);
static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId, static void RouterSubtransactionCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg); SubTransactionId parentSubid, void *arg);
@ -1202,32 +1200,13 @@ RouterExecutorEnd(QueryDesc *queryDesc)
/* /*
* InstallRouterExecutorShmemHook simply installs a hook (intended to be called * RegisterRouterExecutorXactCallbacks registers this executor's callbacks.
* once during backend startup), which will itself register all the transaction
* callbacks needed by this executor.
*/ */
void void
InstallRouterExecutorShmemHook(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = RegisterRouterExecutorXactCallbacks;
}
/*
* RegisterRouterExecutorXactCallbacks registers (sub-)transaction callbacks
* needed by this executor before calling any previous shmem startup hooks.
*/
static void
RegisterRouterExecutorXactCallbacks(void) RegisterRouterExecutorXactCallbacks(void)
{ {
RegisterXactCallback(RouterTransactionCallback, NULL); RegisterXactCallback(RouterTransactionCallback, NULL);
RegisterSubXactCallback(RouterSubtransactionCallback, NULL); RegisterSubXactCallback(RouterSubtransactionCallback, NULL);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
} }

View File

@ -154,8 +154,8 @@ _PG_init(void)
WorkerNodeRegister(); WorkerNodeRegister();
/* initialize transaction callbacks */ /* initialize transaction callbacks */
InstallRouterExecutorShmemHook(); RegisterRouterExecutorXactCallbacks();
InstallMultiShardXactShmemHook(); RegisterShardPlacementXactCallbacks();
} }

View File

@ -27,11 +27,9 @@
/* Global variables used in commit handler */ /* Global variables used in commit handler */
static HTAB *shardConnectionHash = NULL; static HTAB *shardConnectionHash = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool subXactAbortAttempted = false; static bool subXactAbortAttempted = false;
/* functions needed by callbacks and hooks */ /* functions needed by callbacks and hooks */
static void RegisterShardPlacementXactCallbacks(void);
static void CompleteShardPlacementTransactions(XactEvent event, void *arg); static void CompleteShardPlacementTransactions(XactEvent event, void *arg);
static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId, static void MultiShardSubXactCallback(SubXactEvent event, SubTransactionId subId,
SubTransactionId parentSubid, void *arg); SubTransactionId parentSubid, void *arg);
@ -229,32 +227,14 @@ ConnectionList(HTAB *connectionHash)
/* /*
* InstallMultiShardXactShmemHook simply installs a hook (intended to be called * RegisterShardPlacementXactCallbacks registers transaction callbacks needed
* once during backend startup), which will itself register all the transaction * for multi-shard transactions.
* callbacks needed by multi-shard transaction logic.
*/ */
void void
InstallMultiShardXactShmemHook(void)
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = RegisterShardPlacementXactCallbacks;
}
/*
* RegisterShardPlacementXactCallbacks registers transaction callbacks needed
* for multi-shard transactions before calling previous shmem startup hooks.
*/
static void
RegisterShardPlacementXactCallbacks(void) RegisterShardPlacementXactCallbacks(void)
{ {
RegisterXactCallback(CompleteShardPlacementTransactions, NULL); RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
RegisterSubXactCallback(MultiShardSubXactCallback, NULL); RegisterSubXactCallback(MultiShardSubXactCallback, NULL);
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
} }

View File

@ -37,6 +37,6 @@ extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task);
extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
extern void RouterExecutorFinish(QueryDesc *queryDesc); extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void InstallRouterExecutorShmemHook(void); extern void RegisterRouterExecutorXactCallbacks(void);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */ #endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -33,7 +33,7 @@ extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 sh
bool *connectionsFound); bool *connectionsFound);
extern List * ConnectionList(HTAB *connectionHash); extern List * ConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList); extern void CloseConnections(List *connectionList);
extern void InstallMultiShardXactShmemHook(void); extern void RegisterShardPlacementXactCallbacks(void);
#endif /* MULTI_SHARD_TRANSACTION_H */ #endif /* MULTI_SHARD_TRANSACTION_H */