reuse-connections-for-logical-ref-fkeys_ond
Onder Kalaci 2022-09-16 17:18:21 +03:00
parent 1d16a7ae62
commit 2edfea05ce
6 changed files with 28 additions and 39 deletions

View File

@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount); cachedConnectionCount);
static void ResetConnection(MultiConnection *connection);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -252,15 +251,9 @@ MultiConnection *
GetLocalConnectionForSubtransactionAsUser(char *userName) GetLocalConnectionForSubtransactionAsUser(char *userName)
{ {
int connectionFlag = OUTSIDE_TRANSACTION; int connectionFlag = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, MultiConnection *connection =
LocalHostName, GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber,
PostPortNumber, userName, get_database_name(MyDatabaseId));
userName,
get_database_name(
MyDatabaseId));
/* Don't cache connection for the lifetime of the entire session. */
connection->forceCloseAtTransactionEnd = true;
return connection; return connection;
} }
@ -1467,6 +1460,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
*/ */
ResetConnection(connection); ResetConnection(connection);
UnclaimConnection(connection);
cachedConnectionCount++; cachedConnectionCount++;
} }
} }
@ -1508,7 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
* ResetConnection preserves the given connection for later usage by * ResetConnection preserves the given connection for later usage by
* resetting its states. * resetting its states.
*/ */
static void void
ResetConnection(MultiConnection *connection) ResetConnection(MultiConnection *connection)
{ {
/* reset per-transaction state */ /* reset per-transaction state */
@ -1517,8 +1513,6 @@ ResetConnection(MultiConnection *connection)
/* reset copy state */ /* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0; connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
} }

View File

@ -670,8 +670,8 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
nodeGroupId, nodeGroupId,
policy); policy);
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser( MultiConnection *connection =
CitusExtensionOwnerName()); GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection, SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data)); list_make1(command->data));
} }
@ -761,8 +761,6 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName
nodeName, nodePort, nodeName, nodePort,
CurrentUserName(), CurrentUserName(),
NULL); NULL);
workerConnection->forceCloseAtTransactionEnd = true;
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection, workerConnection,
dropCommandList); dropCommandList);

View File

@ -1015,11 +1015,12 @@ static void
CreateObjectOnPlacement(List *objectCreationCommandList, CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerPlacementNode) WorkerNode *workerPlacementNode)
{ {
char *currentUser = CurrentUserName(); MultiConnection *c =
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
workerPlacementNode->workerPort, workerPlacementNode->workerName,
currentUser, workerPlacementNode->workerPort,
objectCreationCommandList); NULL, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(c, objectCreationCommandList);
} }

View File

@ -1150,11 +1150,15 @@ CreatePartitioningHierarchy(List *logicalRepTargetList)
* parallel, so create them sequentially. Also attaching partition * parallel, so create them sequentially. Also attaching partition
* is a quick operation, so it is fine to execute sequentially. * is a quick operation, so it is fine to execute sequentially.
*/ */
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname, MultiConnection *c =
target->superuserConnection->port, GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
tableOwner, target->superuserConnection->hostname,
list_make1(attachPartitionCommand)); target->superuserConnection->port,
tableOwner, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(c, list_make1(
attachPartitionCommand));
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
} }
@ -1587,7 +1591,7 @@ DropUser(MultiConnection *connection, char *username)
connection, connection,
list_make2( list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;", "SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s", psprintf("DROP USER IF EXISTS %s;",
quote_identifier(username)))); quote_identifier(username))));
} }

View File

@ -766,18 +766,9 @@ CloseRemoteTransaction(struct MultiConnection *connection)
/* XXX: Should we error out for a critical transaction? */ /* XXX: Should we error out for a critical transaction? */
dlist_delete(&connection->transactionNode); dlist_delete(&connection->transactionNode);
/*
* If the transaction was completed, we have now cleaned it up, so we
* can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to
* start a new transaction without running into errors.
*/
if (transaction->transactionState == REMOTE_TRANS_ABORTED ||
transaction->transactionState == REMOTE_TRANS_COMMITTED)
{
transaction->transactionState = REMOTE_TRANS_NOT_STARTED;
}
} }
ResetConnection(connection);
} }

View File

@ -303,6 +303,7 @@ extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort,
extern void CloseConnection(MultiConnection *connection); extern void CloseConnection(MultiConnection *connection);
extern void ShutdownAllConnections(void); extern void ShutdownAllConnections(void);
extern void ShutdownConnection(MultiConnection *connection); extern void ShutdownConnection(MultiConnection *connection);
extern void ResetConnection(MultiConnection *connection);
/* dealing with a connection */ /* dealing with a connection */
extern void FinishConnectionListEstablishment(List *multiConnectionList); extern void FinishConnectionListEstablishment(List *multiConnectionList);