mirror of https://github.com/citusdata/citus.git
Addressing PR comments
parent
1d16a7ae62
commit
bb65ac9f7d
|
@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
|
||||||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||||
static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
||||||
cachedConnectionCount);
|
cachedConnectionCount);
|
||||||
static void ResetConnection(MultiConnection *connection);
|
|
||||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||||
static int EventSetSizeForConnectionList(List *connections);
|
static int EventSetSizeForConnectionList(List *connections);
|
||||||
|
|
||||||
|
@ -252,12 +251,9 @@ MultiConnection *
|
||||||
GetLocalConnectionForSubtransactionAsUser(char *userName)
|
GetLocalConnectionForSubtransactionAsUser(char *userName)
|
||||||
{
|
{
|
||||||
int connectionFlag = OUTSIDE_TRANSACTION;
|
int connectionFlag = OUTSIDE_TRANSACTION;
|
||||||
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
|
MultiConnection *connection =
|
||||||
LocalHostName,
|
GetNodeUserDatabaseConnection(connectionFlag, LocalHostName,
|
||||||
PostPortNumber,
|
PostPortNumber, userName, NULL);
|
||||||
userName,
|
|
||||||
get_database_name(
|
|
||||||
MyDatabaseId));
|
|
||||||
|
|
||||||
/* Don't cache connection for the lifetime of the entire session. */
|
/* Don't cache connection for the lifetime of the entire session. */
|
||||||
connection->forceCloseAtTransactionEnd = true;
|
connection->forceCloseAtTransactionEnd = true;
|
||||||
|
@ -1508,7 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
* ResetConnection preserves the given connection for later usage by
|
* ResetConnection preserves the given connection for later usage by
|
||||||
* resetting its states.
|
* resetting its states.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
ResetConnection(MultiConnection *connection)
|
ResetConnection(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
/* reset per-transaction state */
|
/* reset per-transaction state */
|
||||||
|
|
|
@ -1015,11 +1015,17 @@ static void
|
||||||
CreateObjectOnPlacement(List *objectCreationCommandList,
|
CreateObjectOnPlacement(List *objectCreationCommandList,
|
||||||
WorkerNode *workerPlacementNode)
|
WorkerNode *workerPlacementNode)
|
||||||
{
|
{
|
||||||
char *currentUser = CurrentUserName();
|
int connectionFlags = OUTSIDE_TRANSACTION;
|
||||||
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
|
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
workerPlacementNode->workerPort,
|
workerPlacementNode
|
||||||
currentUser,
|
->workerName,
|
||||||
objectCreationCommandList);
|
workerPlacementNode
|
||||||
|
->workerPort,
|
||||||
|
CurrentUserName(),
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
|
||||||
|
objectCreationCommandList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1150,11 +1150,19 @@ CreatePartitioningHierarchy(List *logicalRepTargetList)
|
||||||
* parallel, so create them sequentially. Also attaching partition
|
* parallel, so create them sequentially. Also attaching partition
|
||||||
* is a quick operation, so it is fine to execute sequentially.
|
* is a quick operation, so it is fine to execute sequentially.
|
||||||
*/
|
*/
|
||||||
SendCommandListToWorkerOutsideTransaction(
|
int connectionFlags = OUTSIDE_TRANSACTION;
|
||||||
target->superuserConnection->hostname,
|
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(
|
||||||
target->superuserConnection->port,
|
connectionFlags,
|
||||||
|
target->
|
||||||
|
superuserConnection->hostname,
|
||||||
|
target->
|
||||||
|
superuserConnection->port,
|
||||||
tableOwner,
|
tableOwner,
|
||||||
list_make1(attachPartitionCommand));
|
NULL);
|
||||||
|
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
|
||||||
|
list_make1(
|
||||||
|
attachPartitionCommand));
|
||||||
|
|
||||||
MemoryContextReset(localContext);
|
MemoryContextReset(localContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -766,17 +766,6 @@ CloseRemoteTransaction(struct MultiConnection *connection)
|
||||||
/* XXX: Should we error out for a critical transaction? */
|
/* XXX: Should we error out for a critical transaction? */
|
||||||
|
|
||||||
dlist_delete(&connection->transactionNode);
|
dlist_delete(&connection->transactionNode);
|
||||||
|
|
||||||
/*
|
|
||||||
* If the transaction was completed, we have now cleaned it up, so we
|
|
||||||
* can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to
|
|
||||||
* start a new transaction without running into errors.
|
|
||||||
*/
|
|
||||||
if (transaction->transactionState == REMOTE_TRANS_ABORTED ||
|
|
||||||
transaction->transactionState == REMOTE_TRANS_COMMITTED)
|
|
||||||
{
|
|
||||||
transaction->transactionState = REMOTE_TRANS_NOT_STARTED;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -370,7 +370,7 @@ SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerC
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteTransactionCommit(workerConnection);
|
RemoteTransactionCommit(workerConnection);
|
||||||
CloseRemoteTransaction(workerConnection);
|
ResetConnection(workerConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -488,7 +488,7 @@ SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
|
||||||
RemoteTransactionCommit(workerConnection);
|
RemoteTransactionCommit(workerConnection);
|
||||||
}
|
}
|
||||||
|
|
||||||
CloseRemoteTransaction(workerConnection);
|
ResetConnection(workerConnection);
|
||||||
|
|
||||||
return !failed;
|
return !failed;
|
||||||
}
|
}
|
||||||
|
|
|
@ -300,6 +300,7 @@ extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort);
|
||||||
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
|
||||||
const char *userName,
|
const char *userName,
|
||||||
const char *database);
|
const char *database);
|
||||||
|
extern void ResetConnection(MultiConnection *connection);
|
||||||
extern void CloseConnection(MultiConnection *connection);
|
extern void CloseConnection(MultiConnection *connection);
|
||||||
extern void ShutdownAllConnections(void);
|
extern void ShutdownAllConnections(void);
|
||||||
extern void ShutdownConnection(MultiConnection *connection);
|
extern void ShutdownConnection(MultiConnection *connection);
|
||||||
|
|
Loading…
Reference in New Issue