mirror of https://github.com/citusdata/citus.git
Fix valgrind issue: ResetRemoteTransaction
parent
47ff03123b
commit
33abfa0838
|
@ -705,7 +705,7 @@ CloseConnection(MultiConnection *connection)
|
||||||
|
|
||||||
/* same for transaction state and shard/placement machinery */
|
/* same for transaction state and shard/placement machinery */
|
||||||
CloseShardPlacementAssociation(connection);
|
CloseShardPlacementAssociation(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
|
|
||||||
/* we leave the per-host entry alive */
|
/* we leave the per-host entry alive */
|
||||||
pfree(connection);
|
pfree(connection);
|
||||||
|
@ -1464,7 +1464,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
/*
|
/*
|
||||||
* reset healthy session lifespan connections.
|
* reset healthy session lifespan connections.
|
||||||
*/
|
*/
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, false);
|
||||||
|
|
||||||
UnclaimConnection(connection);
|
UnclaimConnection(connection);
|
||||||
|
|
||||||
|
|
|
@ -684,7 +684,7 @@ TryDropSubscriptionOutsideTransaction(char *subscriptionName,
|
||||||
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
|
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
|
||||||
{
|
{
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -695,7 +695,7 @@ TryDropSubscriptionOutsideTransaction(char *subscriptionName,
|
||||||
{
|
{
|
||||||
ReportConnectionError(connection, WARNING);
|
ReportConnectionError(connection, WARNING);
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -714,7 +714,7 @@ TryDropSubscriptionOutsideTransaction(char *subscriptionName,
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -723,7 +723,7 @@ TryDropSubscriptionOutsideTransaction(char *subscriptionName,
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -731,7 +731,7 @@ TryDropSubscriptionOutsideTransaction(char *subscriptionName,
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
RemoteTransactionCommit(connection);
|
RemoteTransactionCommit(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
|
|
||||||
StringInfo alterQuery = makeStringInfo();
|
StringInfo alterQuery = makeStringInfo();
|
||||||
appendStringInfo(alterQuery,
|
appendStringInfo(alterQuery,
|
||||||
|
@ -803,7 +803,7 @@ TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
|
||||||
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
|
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
|
||||||
{
|
{
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -820,7 +820,7 @@ TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
|
||||||
{
|
{
|
||||||
ReportConnectionError(connection, WARNING);
|
ReportConnectionError(connection, WARNING);
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -832,7 +832,7 @@ TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
RemoteTransactionCommit(connection);
|
RemoteTransactionCommit(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -846,7 +846,7 @@ TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
RemoteTransactionAbort(connection);
|
RemoteTransactionAbort(connection);
|
||||||
ResetRemoteTransaction(connection);
|
ResetRemoteTransaction(connection, true);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -860,12 +860,12 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection)
|
||||||
* the main transaction, if the connection is being reused.
|
* the main transaction, if the connection is being reused.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ResetRemoteTransaction(struct MultiConnection *connection)
|
ResetRemoteTransaction(struct MultiConnection *connection, bool deleteTxNode)
|
||||||
{
|
{
|
||||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
|
|
||||||
/* unlink from list of open transactions, if necessary */
|
/* unlink from list of open transactions, if necessary */
|
||||||
if (transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
|
if (deleteTxNode && transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
|
||||||
{
|
{
|
||||||
/* XXX: Should we error out for a critical transaction? */
|
/* XXX: Should we error out for a critical transaction? */
|
||||||
|
|
||||||
|
|
|
@ -370,7 +370,7 @@ SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerC
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteTransactionCommit(workerConnection);
|
RemoteTransactionCommit(workerConnection);
|
||||||
ResetRemoteTransaction(workerConnection);
|
ResetRemoteTransaction(workerConnection, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -488,7 +488,7 @@ SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
|
||||||
RemoteTransactionCommit(workerConnection);
|
RemoteTransactionCommit(workerConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
ResetRemoteTransaction(workerConnection);
|
ResetRemoteTransaction(workerConnection, true);
|
||||||
|
|
||||||
return !failed;
|
return !failed;
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,7 +130,7 @@ extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
|
||||||
* transaction managment code.
|
* transaction managment code.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
extern void ResetRemoteTransaction(struct MultiConnection *connection);
|
extern void ResetRemoteTransaction(struct MultiConnection *connection, bool deleteTxNode);
|
||||||
|
|
||||||
/* perform handling for all in-progress transactions */
|
/* perform handling for all in-progress transactions */
|
||||||
extern void CoordinatedRemoteTransactionsPrepare(void);
|
extern void CoordinatedRemoteTransactionsPrepare(void);
|
||||||
|
|
Loading…
Reference in New Issue